From 78ae6028dcb461ad3a75a55564b5ff0663aeb793 Mon Sep 17 00:00:00 2001 From: navid Date: Fri, 23 Jan 2009 03:54:45 +0000 Subject: [PATCH] *** empty log message *** --- .../core/DualFileTableTransactional.java | 310 +++++++++++------- 1 file changed, 200 insertions(+), 110 deletions(-) diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java index af4be46f..3037100f 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java @@ -38,7 +38,9 @@ import java.util.*; import java.nio.channels.*; import com.solidosystems.tuplesoup.filter.*; import dstm2.atomic; +import dstm2.Thread; import dstm2.util.StringKeyHashMap; +import java.util.concurrent.Callable; /** * The table stores a group of rows. @@ -59,8 +61,8 @@ public class DualFileTableTransactional implements TableTransactional{ // private RandomAccessFile filearandom=null; private TransactionalFile filearandom=null; private TransactionalFile filebrandom=null; - FileChannel fca=null; - FileChannel fcb=null; +// FileChannel fca=null; +// FileChannel fcb=null; private TableIndexTransactional index=null; // private long fileaposition=0; @@ -324,58 +326,72 @@ public class DualFileTableTransactional implements TableTransactional{ } private void addRowA(RowTransactional row) throws IOException{ - synchronized(filealock){ - openFile(FILEA); + //synchronized(filealock){ + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + openFile(FILEA); //int pre=fileastream.size(); - int pre= (int)fileastream.getFilePointer(); + int pre= (int)fileastream.getFilePointer(); //row.writeToStream(fileastream); - row.writeToFile(fileastream); + ((RowTransactional)args.get(0)).writeToFile(fileastream); //int post= fileastream.size(); - int post= (int)fileastream.getFilePointer(); + int post= (int)fileastream.getFilePointer(); //fileastream.flush(); - synchronized(statlock){ - atomicfields.setstat_add(atomicfields.getstat_add()+1); - atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize()); - } + //synchronized(statlock){ + atomicfields.setstat_add(atomicfields.getstat_add()+1); + atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize()); + //} - index.addEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFilebposition()); - if(INDEXCACHESIZE>0){ - TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()); - addCacheEntry(entry); - } - atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post)); - } + index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition()); + if(INDEXCACHESIZE>0){ + TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition()); + addCacheEntry(entry); + } + atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post)); + } + }); } private void addRowB(RowTransactional row) throws IOException{ - synchronized(fileblock){ - openFile(FILEB); + // synchronized(fileblock){ + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + openFile(FILEB); //int pre=filebstream.size(); - int pre= (int)filebstream.getFilePointer(); + int pre= (int)filebstream.getFilePointer(); //row.writeToStream(filebstream); - row.writeToFile(filebstream); - int post=(int)filebstream.getFilePointer(); + ((RowTransactional)args.get(0)).writeToFile(filebstream); + int post=(int)filebstream.getFilePointer(); //int post=filebstream.size(); //filebstream.flush(); - synchronized(statlock){ - atomicfields.setstat_add(atomicfields.getstat_add()+1); - atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize()); - } - index.addEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()); - if(INDEXCACHESIZE>0){ - TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()); - addCacheEntry(entry); - } - atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post)); - } + //synchronized(statlock){ + atomicfields.setstat_add(atomicfields.getstat_add()+1); + atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize()); + // } + index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition()); + if(INDEXCACHESIZE>0){ + TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition()); + addCacheEntry(entry); + } + atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post)); + } + }); } private void updateCacheEntry(TableIndexEntryTransactional entry){ - synchronized(indexcache){ - if(indexcache.containsKey(entry.getId())){ - TableIndexNodeTransactional node=indexcache.get(entry.getId()); - node.setData(entry); + final Vector args = new Vector(); + args.add(entry); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(indexcache){ + if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){ + TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId()); + node.setData(((TableIndexEntryTransactional)(args.get(0)))); if(node!=indexcachelast){ if(node==indexcachefirst){ indexcachefirst=node.getNext(); @@ -387,21 +403,27 @@ public class DualFileTableTransactional implements TableTransactional{ indexcachelast=node; } }else{ - addCacheEntry(entry); + addCacheEntry(((TableIndexEntryTransactional)(args.get(0)))); } - } + } + }); } private void removeCacheEntry(String id){ - synchronized(indexcache){ - if(indexcache.containsKey(id)){ - TableIndexNodeTransactional node=indexcache.get(id); - indexcache.remove(id); + //synchronized(indexcache){ + final Vector args = new Vector(); + args.add(id); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + + if(indexcache.containsKey((String)(args.get(0)))){ + TableIndexNodeTransactional node=indexcache.get((String)(args.get(0))); + indexcache.remove((String)(args.get(0))); if(atomicfields.getIndexcacheusage()==1){ indexcachefirst=null; indexcachelast=null; atomicfields.setIndexcacheusage(0); - return; + return true; } if(node==indexcachefirst){ indexcachefirst=node.getNext(); @@ -413,17 +435,25 @@ public class DualFileTableTransactional implements TableTransactional{ node.remove(); } atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1); - synchronized(statlock){ - atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1); - } + // synchronized(statlock){ + atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1); + + // } } - } + return true; + } + }); } private TableIndexEntryTransactional getCacheEntry(String id){ - synchronized(indexcache){ - if(indexcache.containsKey(id)){ - TableIndexNodeTransactional node=indexcache.get(id); + final Vector args = new Vector(); + args.add(id); + + TableIndexEntryTransactional res = Thread.doIt(new Callable() { + public TableIndexEntryTransactional call() throws Exception{ + //synchronized(indexcache){ + if(indexcache.containsKey((String)(args.get(0)))){ + TableIndexNodeTransactional node=indexcache.get((String)(args.get(0))); if(node!=indexcachelast){ if(node==indexcachefirst){ indexcachefirst=node.getNext(); @@ -434,15 +464,25 @@ public class DualFileTableTransactional implements TableTransactional{ node.setNext(null); indexcachelast=node; } - synchronized(statlock){ + // synchronized(statlock){ atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1); - } + // } return node.getData(); } - } - synchronized(statlock){ + return null; + } + }); + if (res != null) + return res; + + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(statlock){ atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1); - } + return true; + } + }); + //} return null; } @@ -464,42 +504,66 @@ public class DualFileTableTransactional implements TableTransactional{ */ public void updateRow(RowTransactional row) throws IOException{ TableIndexEntryTransactional entry=null; + final Vector args = new Vector(); + args.add(row); + args.add(entry); // Handle index entry caching if(INDEXCACHESIZE>0){ - synchronized(indexcache){ + Thread.doIt(new Callable() { + public Boolean call() throws Exception { + TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1)); + RowTransactional row = (RowTransactional) (args.get(0)); + entry = getCacheEntry(row.getId()); + if(entry==null){ + entry=index.scanIndex(row.getId()); + addCacheEntry(entry); + } + return true; + } + }); + /* synchronized(indexcache){ entry=getCacheEntry(row.getId()); if(entry==null){ entry=index.scanIndex(row.getId()); addCacheEntry(entry); } - } + }*/ }else{ entry=index.scanIndex(row.getId()); } if(entry.getRowSize()>=row.getSize()){ // Add to the existing location switch(entry.getLocation()){ - case FILEA :synchronized(filealock){ - if(filearandom==null){ - filearandom=new TransactionalFile(getFileName(FILEA),"rw"); - // fca=filearandom.getChannel(); - } - filearandom.seek(entry.getPosition()); - row.writeToFile(filearandom); - - fca.force(false); + case FILEA : + // synchronized(filealock){ + Thread.doIt(new Callable() { + public Boolean call() throws Exception { + if(filearandom==null){ + filearandom=new TransactionalFile(getFileName(FILEA),"rw"); + // fca=filearandom.getChannel(); } - break; - case FILEB :synchronized(fileblock){ + filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition()); + ((RowTransactional) (args.get(0))).writeToFile(filearandom); + return true; + //fca.force(false); + } + }); + // } + break; + case FILEB : + // synchronized(fileblock){ + Thread.doIt(new Callable() { + public Boolean call() throws Exception { if(filebrandom==null){ filebrandom=new TransactionalFile(getFileName(FILEB),"rw"); // fcb=filebrandom.getChannel(); } - filebrandom.seek(entry.getPosition()); - row.writeToFile(filebrandom); - - fcb.force(false); - } + filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition()); + ((RowTransactional) (args.get(0))).writeToFile(filebrandom); + return true; + //fcb.force(false); + } + }); break; } }else{ @@ -510,50 +574,66 @@ public class DualFileTableTransactional implements TableTransactional{ } rowswitch=!rowswitch; } - synchronized(statlock){ - atomicfields.setstat_update(atomicfields.getstat_update()+1); - atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize()); - } + //synchronized(statlock){ + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + atomicfields.setstat_update(atomicfields.getstat_update()+1); + atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize()); + return true; + } + }); } private void updateRowA(RowTransactional row) throws IOException{ - synchronized(filealock){ - openFile(FILEA); - //int pre=filebstream.size(); - int pre=(int)fileastream.getFilePointer(); - //row.writeToStream(filebstream); - row.writeToFile(fileastream); - //int post=filebstream.size(); - int post=(int)fileastream.getFilePointer(); - //fileastream.flush(); - index.updateEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()); - - // Handle index entry caching - if(INDEXCACHESIZE>0){ - updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition())); + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(filealock){ + openFile(FILEA); + //int pre=filebstream.size(); + int pre=(int)fileastream.getFilePointer(); + //row.writeToStream(filebstream); + ((RowTransactional)(args.get(0))).writeToFile(fileastream); + //int post=filebstream.size(); + int post=(int)fileastream.getFilePointer(); + //fileastream.flush(); + index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()); + + // Handle index entry caching + if(INDEXCACHESIZE>0){ + updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition())); + } + atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post)); } - atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post)); - } + }); } private void updateRowB(RowTransactional row) throws IOException{ - synchronized(fileblock){ - openFile(FILEB); + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + + //synchronized(fileblock){ + openFile(FILEB); //int pre=filebstream.size(); - int pre=(int)filebstream.getFilePointer(); + int pre=(int)filebstream.getFilePointer(); //row.writeToStream(filebstream); - row.writeToFile(filebstream); + ((RowTransactional)(args.get(0))).writeToFile(filebstream); //int post=filebstream.size(); - int post=(int)filebstream.getFilePointer(); + int post=(int)filebstream.getFilePointer(); //filebstream.flush(); - index.updateEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()); + index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()); // Handle index entry caching // Handle index entry caching if(INDEXCACHESIZE>0){ - updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition())); + updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition())); } atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post)); - } + return true; + } + }); } /** @@ -566,9 +646,14 @@ public class DualFileTableTransactional implements TableTransactional{ removeCacheEntry(row.getId()); } index.updateEntry(row.getId(),row.getSize(),DELETE,0); - synchronized(statlock){ - atomicfields.setstat_delete(atomicfields.getstat_delete()+1); - } + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(statlock){ + atomicfields.setstat_delete(atomicfields.getstat_delete()+1); + return true; + } + }); + // } } /** @@ -634,10 +719,15 @@ public class DualFileTableTransactional implements TableTransactional{ } RowTransactional row=RowTransactional.readFromStream(data); data.close(); - synchronized(statlock){ - atomicfields.setstat_read(atomicfields.getstat_read()+1); - atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize()); - } + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(statlock){ + atomicfields.setstat_read(atomicfields.getstat_read()+1); + atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize()); + } + }); return row; } -- 2.34.1