import java.nio.channels.*;
import com.solidosystems.tuplesoup.filter.*;
import dstm2.atomic;
+import dstm2.Thread;
import dstm2.util.StringKeyHashMap;
+import java.util.concurrent.Callable;
/**
* The table stores a group of rows.
// private RandomAccessFile filearandom=null;
private TransactionalFile filearandom=null;
private TransactionalFile filebrandom=null;
- FileChannel fca=null;
- FileChannel fcb=null;
+// FileChannel fca=null;
+// FileChannel fcb=null;
private TableIndexTransactional index=null;
// private long fileaposition=0;
}
private void addRowA(RowTransactional row) throws IOException{
- synchronized(filealock){
- openFile(FILEA);
+ //synchronized(filealock){
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ openFile(FILEA);
//int pre=fileastream.size();
- int pre= (int)fileastream.getFilePointer();
+ int pre= (int)fileastream.getFilePointer();
//row.writeToStream(fileastream);
- row.writeToFile(fileastream);
+ ((RowTransactional)args.get(0)).writeToFile(fileastream);
//int post= fileastream.size();
- int post= (int)fileastream.getFilePointer();
+ int post= (int)fileastream.getFilePointer();
//fileastream.flush();
- synchronized(statlock){
- atomicfields.setstat_add(atomicfields.getstat_add()+1);
- atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
- }
+ //synchronized(statlock){
+ atomicfields.setstat_add(atomicfields.getstat_add()+1);
+ atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
+ //}
- index.addEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFilebposition());
- if(INDEXCACHESIZE>0){
- TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
- addCacheEntry(entry);
- }
- atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
- }
+ index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
+ if(INDEXCACHESIZE>0){
+ TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
+ addCacheEntry(entry);
+ }
+ atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
+ }
+ });
}
private void addRowB(RowTransactional row) throws IOException{
- synchronized(fileblock){
- openFile(FILEB);
+ // synchronized(fileblock){
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ openFile(FILEB);
//int pre=filebstream.size();
- int pre= (int)filebstream.getFilePointer();
+ int pre= (int)filebstream.getFilePointer();
//row.writeToStream(filebstream);
- row.writeToFile(filebstream);
- int post=(int)filebstream.getFilePointer();
+ ((RowTransactional)args.get(0)).writeToFile(filebstream);
+ int post=(int)filebstream.getFilePointer();
//int post=filebstream.size();
//filebstream.flush();
- synchronized(statlock){
- atomicfields.setstat_add(atomicfields.getstat_add()+1);
- atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
- }
- index.addEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
- if(INDEXCACHESIZE>0){
- TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
- addCacheEntry(entry);
- }
- atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
- }
+ //synchronized(statlock){
+ atomicfields.setstat_add(atomicfields.getstat_add()+1);
+ atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
+ // }
+ index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
+ if(INDEXCACHESIZE>0){
+ TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
+ addCacheEntry(entry);
+ }
+ atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
+ }
+ });
}
private void updateCacheEntry(TableIndexEntryTransactional entry){
- synchronized(indexcache){
- if(indexcache.containsKey(entry.getId())){
- TableIndexNodeTransactional node=indexcache.get(entry.getId());
- node.setData(entry);
+ final Vector args = new Vector();
+ args.add(entry);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(indexcache){
+ if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
+ TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
+ node.setData(((TableIndexEntryTransactional)(args.get(0))));
if(node!=indexcachelast){
if(node==indexcachefirst){
indexcachefirst=node.getNext();
indexcachelast=node;
}
}else{
- addCacheEntry(entry);
+ addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
}
- }
+ }
+ });
}
private void removeCacheEntry(String id){
- synchronized(indexcache){
- if(indexcache.containsKey(id)){
- TableIndexNodeTransactional node=indexcache.get(id);
- indexcache.remove(id);
+ //synchronized(indexcache){
+ final Vector args = new Vector();
+ args.add(id);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+
+ if(indexcache.containsKey((String)(args.get(0)))){
+ TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
+ indexcache.remove((String)(args.get(0)));
if(atomicfields.getIndexcacheusage()==1){
indexcachefirst=null;
indexcachelast=null;
atomicfields.setIndexcacheusage(0);
- return;
+ return true;
}
if(node==indexcachefirst){
indexcachefirst=node.getNext();
node.remove();
}
atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
- synchronized(statlock){
- atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
- }
+ // synchronized(statlock){
+ atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
+
+ // }
}
- }
+ return true;
+ }
+ });
}
private TableIndexEntryTransactional getCacheEntry(String id){
- synchronized(indexcache){
- if(indexcache.containsKey(id)){
- TableIndexNodeTransactional node=indexcache.get(id);
+ final Vector args = new Vector();
+ args.add(id);
+
+ TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
+ public TableIndexEntryTransactional call() throws Exception{
+ //synchronized(indexcache){
+ if(indexcache.containsKey((String)(args.get(0)))){
+ TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
if(node!=indexcachelast){
if(node==indexcachefirst){
indexcachefirst=node.getNext();
node.setNext(null);
indexcachelast=node;
}
- synchronized(statlock){
+ // synchronized(statlock){
atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
- }
+ // }
return node.getData();
}
- }
- synchronized(statlock){
+ return null;
+ }
+ });
+ if (res != null)
+ return res;
+
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(statlock){
atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
- }
+ return true;
+ }
+ });
+ //}
return null;
}
*/
public void updateRow(RowTransactional row) throws IOException{
TableIndexEntryTransactional entry=null;
+ final Vector args = new Vector();
+ args.add(row);
+ args.add(entry);
// Handle index entry caching
if(INDEXCACHESIZE>0){
- synchronized(indexcache){
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
+ RowTransactional row = (RowTransactional) (args.get(0));
+ entry = getCacheEntry(row.getId());
+ if(entry==null){
+ entry=index.scanIndex(row.getId());
+ addCacheEntry(entry);
+ }
+ return true;
+ }
+ });
+ /* synchronized(indexcache){
entry=getCacheEntry(row.getId());
if(entry==null){
entry=index.scanIndex(row.getId());
addCacheEntry(entry);
}
- }
+ }*/
}else{
entry=index.scanIndex(row.getId());
}
if(entry.getRowSize()>=row.getSize()){
// Add to the existing location
switch(entry.getLocation()){
- case FILEA :synchronized(filealock){
- if(filearandom==null){
- filearandom=new TransactionalFile(getFileName(FILEA),"rw");
- // fca=filearandom.getChannel();
- }
- filearandom.seek(entry.getPosition());
- row.writeToFile(filearandom);
-
- fca.force(false);
+ case FILEA :
+ // synchronized(filealock){
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ if(filearandom==null){
+ filearandom=new TransactionalFile(getFileName(FILEA),"rw");
+ // fca=filearandom.getChannel();
}
- break;
- case FILEB :synchronized(fileblock){
+ filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
+ ((RowTransactional) (args.get(0))).writeToFile(filearandom);
+ return true;
+ //fca.force(false);
+ }
+ });
+ // }
+ break;
+ case FILEB :
+ // synchronized(fileblock){
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
if(filebrandom==null){
filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
// fcb=filebrandom.getChannel();
}
- filebrandom.seek(entry.getPosition());
- row.writeToFile(filebrandom);
-
- fcb.force(false);
- }
+ filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
+ ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
+ return true;
+ //fcb.force(false);
+ }
+ });
break;
}
}else{
}
rowswitch=!rowswitch;
}
- synchronized(statlock){
- atomicfields.setstat_update(atomicfields.getstat_update()+1);
- atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
- }
+ //synchronized(statlock){
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ atomicfields.setstat_update(atomicfields.getstat_update()+1);
+ atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
+ return true;
+ }
+ });
}
private void updateRowA(RowTransactional row) throws IOException{
- synchronized(filealock){
- openFile(FILEA);
- //int pre=filebstream.size();
- int pre=(int)fileastream.getFilePointer();
- //row.writeToStream(filebstream);
- row.writeToFile(fileastream);
- //int post=filebstream.size();
- int post=(int)fileastream.getFilePointer();
- //fileastream.flush();
- index.updateEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
-
- // Handle index entry caching
- if(INDEXCACHESIZE>0){
- updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()));
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(filealock){
+ openFile(FILEA);
+ //int pre=filebstream.size();
+ int pre=(int)fileastream.getFilePointer();
+ //row.writeToStream(filebstream);
+ ((RowTransactional)(args.get(0))).writeToFile(fileastream);
+ //int post=filebstream.size();
+ int post=(int)fileastream.getFilePointer();
+ //fileastream.flush();
+ index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
+
+ // Handle index entry caching
+ if(INDEXCACHESIZE>0){
+ updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
+ }
+ atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
}
- atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
- }
+ });
}
private void updateRowB(RowTransactional row) throws IOException{
- synchronized(fileblock){
- openFile(FILEB);
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+
+ //synchronized(fileblock){
+ openFile(FILEB);
//int pre=filebstream.size();
- int pre=(int)filebstream.getFilePointer();
+ int pre=(int)filebstream.getFilePointer();
//row.writeToStream(filebstream);
- row.writeToFile(filebstream);
+ ((RowTransactional)(args.get(0))).writeToFile(filebstream);
//int post=filebstream.size();
- int post=(int)filebstream.getFilePointer();
+ int post=(int)filebstream.getFilePointer();
//filebstream.flush();
- index.updateEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
+ index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
// Handle index entry caching
// Handle index entry caching
if(INDEXCACHESIZE>0){
- updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()));
+ updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
}
atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
- }
+ return true;
+ }
+ });
}
/**
removeCacheEntry(row.getId());
}
index.updateEntry(row.getId(),row.getSize(),DELETE,0);
- synchronized(statlock){
- atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
- }
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(statlock){
+ atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
+ return true;
+ }
+ });
+ // }
}
/**
}
RowTransactional row=RowTransactional.readFromStream(data);
data.close();
- synchronized(statlock){
- atomicfields.setstat_read(atomicfields.getstat_read()+1);
- atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());
- }
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(statlock){
+ atomicfields.setstat_read(atomicfields.getstat_read()+1);
+ atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
+ }
+ });
return row;
}