*** empty log message ***
authornavid <navid>
Fri, 23 Jan 2009 03:54:45 +0000 (03:54 +0000)
committernavid <navid>
Fri, 23 Jan 2009 03:54:45 +0000 (03:54 +0000)
Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java

index af4be46f593ed2e15be8aefbecfee239c1fde310..3037100f0ee8b65e5b9b7a2994ea34001b38a771 100644 (file)
@@ -38,7 +38,9 @@ import java.util.*;
 import java.nio.channels.*;
 import com.solidosystems.tuplesoup.filter.*;
 import dstm2.atomic;
+import dstm2.Thread;
 import dstm2.util.StringKeyHashMap;
+import java.util.concurrent.Callable;
 
 /**
  * The table stores a group of rows.
@@ -59,8 +61,8 @@ public class DualFileTableTransactional implements TableTransactional{
 //    private RandomAccessFile filearandom=null;
     private TransactionalFile filearandom=null;
     private TransactionalFile filebrandom=null;
-    FileChannel fca=null;
-    FileChannel fcb=null;
+//   FileChannel fca=null;
+//   FileChannel fcb=null;
     private TableIndexTransactional index=null;
      
 //    private long fileaposition=0;
@@ -324,58 +326,72 @@ public class DualFileTableTransactional implements TableTransactional{
      }
      
      private void addRowA(RowTransactional row) throws IOException{
-         synchronized(filealock){
-             openFile(FILEA);             
+         //synchronized(filealock){
+         final Vector args = new Vector();
+         args.add(row);
+         Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+                 openFile(FILEA);             
              //int pre=fileastream.size();
-             int pre= (int)fileastream.getFilePointer();
+                 int pre= (int)fileastream.getFilePointer();
              //row.writeToStream(fileastream);
-             row.writeToFile(fileastream);
+                 ((RowTransactional)args.get(0)).writeToFile(fileastream);
              //int post= fileastream.size();
-             int post= (int)fileastream.getFilePointer();
+                 int post= (int)fileastream.getFilePointer();
              //fileastream.flush();
              
-             synchronized(statlock){
-                  atomicfields.setstat_add(atomicfields.getstat_add()+1);
-                  atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
-             }
+             //synchronized(statlock){
+                 atomicfields.setstat_add(atomicfields.getstat_add()+1);
+                 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
+             //}
              
-             index.addEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFilebposition());
-             if(INDEXCACHESIZE>0){
-                 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
-                 addCacheEntry(entry);
-             }
-             atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
-         }
+                index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
+                if(INDEXCACHESIZE>0){
+                    TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
+                    addCacheEntry(entry);
+                }
+                atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
+            }
+         });
      }
      private void addRowB(RowTransactional row) throws IOException{
-         synchronized(fileblock){
-             openFile(FILEB);
+        // synchronized(fileblock){
+         final Vector args = new Vector();
+         args.add(row);
+         Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+                openFile(FILEB);
              //int pre=filebstream.size();
-             int pre= (int)filebstream.getFilePointer();
+                int pre= (int)filebstream.getFilePointer();
              //row.writeToStream(filebstream);
-             row.writeToFile(filebstream);
-             int post=(int)filebstream.getFilePointer();
+                ((RowTransactional)args.get(0)).writeToFile(filebstream);
+                int post=(int)filebstream.getFilePointer();
              //int post=filebstream.size();
              //filebstream.flush();
-             synchronized(statlock){
-                  atomicfields.setstat_add(atomicfields.getstat_add()+1);
-                  atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
-              }
-             index.addEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
-             if(INDEXCACHESIZE>0){
-                 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
-                 addCacheEntry(entry);
-             }
-             atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
-         }
+             //synchronized(statlock){
+                atomicfields.setstat_add(atomicfields.getstat_add()+1);
+                atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
+             // }
+                index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
+                if(INDEXCACHESIZE>0){
+                  TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
+                  addCacheEntry(entry);
+                }
+                atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
+            }
+         });
      }
      
 
      private void updateCacheEntry(TableIndexEntryTransactional entry){
-          synchronized(indexcache){
-              if(indexcache.containsKey(entry.getId())){
-                  TableIndexNodeTransactional node=indexcache.get(entry.getId());
-                  node.setData(entry);
+          final Vector args = new Vector();
+          args.add(entry);
+          Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+          //synchronized(indexcache){
+              if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
+                  TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
+                  node.setData(((TableIndexEntryTransactional)(args.get(0))));
                   if(node!=indexcachelast){
                       if(node==indexcachefirst){
                           indexcachefirst=node.getNext();
@@ -387,21 +403,27 @@ public class DualFileTableTransactional implements TableTransactional{
                       indexcachelast=node;
                   }
               }else{
-                  addCacheEntry(entry);
+                  addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
               }
-         }
+            }
+        });
       }
 
       private void removeCacheEntry(String id){
-          synchronized(indexcache){
-                if(indexcache.containsKey(id)){
-                    TableIndexNodeTransactional node=indexcache.get(id);
-                    indexcache.remove(id);
+          //synchronized(indexcache){
+          final Vector args = new Vector();
+          args.add(id);
+          Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+          
+                if(indexcache.containsKey((String)(args.get(0)))){
+                    TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
+                    indexcache.remove((String)(args.get(0)));
                     if(atomicfields.getIndexcacheusage()==1){
                         indexcachefirst=null;
                         indexcachelast=null;
                         atomicfields.setIndexcacheusage(0);
-                        return;
+                        return true;
                     }
                     if(node==indexcachefirst){
                         indexcachefirst=node.getNext();
@@ -413,17 +435,25 @@ public class DualFileTableTransactional implements TableTransactional{
                         node.remove();
                     }
                     atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
-                    synchronized(statlock){
-                         atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
-                    }
+              //      synchronized(statlock){
+                     atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
+                     
+              //      }
                 }
-          }
+                return true;
+            }
+          });
       }
 
       private TableIndexEntryTransactional getCacheEntry(String id){
-          synchronized(indexcache){
-              if(indexcache.containsKey(id)){
-                  TableIndexNodeTransactional node=indexcache.get(id);
+          final Vector args = new Vector();
+          args.add(id);
+          
+          TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
+             public TableIndexEntryTransactional call() throws Exception{
+          //synchronized(indexcache){
+              if(indexcache.containsKey((String)(args.get(0)))){
+                  TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
                   if(node!=indexcachelast){
                       if(node==indexcachefirst){
                             indexcachefirst=node.getNext();
@@ -434,15 +464,25 @@ public class DualFileTableTransactional implements TableTransactional{
                         node.setNext(null);
                         indexcachelast=node;
                   }
-                  synchronized(statlock){
+                //  synchronized(statlock){
                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
-                   }
+                //   }
                   return node.getData();
               }
-          }
-          synchronized(statlock){
+              return null;
+             }
+          });
+          if (res != null)
+              return res;
+          
+          Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+          //synchronized(statlock){
                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
-           }
+               return true;
+             }
+          });
+           //}
           return null;
       }
 
@@ -464,42 +504,66 @@ public class DualFileTableTransactional implements TableTransactional{
       */
      public void updateRow(RowTransactional row) throws IOException{
          TableIndexEntryTransactional entry=null;
+         final Vector args = new Vector();
+         args.add(row);
+         args.add(entry);
          // Handle index entry caching
          if(INDEXCACHESIZE>0){
-             synchronized(indexcache){
+             Thread.doIt(new Callable<Boolean>() {
+                public Boolean call() throws Exception {
+                    TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
+                    RowTransactional row = (RowTransactional) (args.get(0));
+                    entry = getCacheEntry(row.getId());
+                    if(entry==null){
+                       entry=index.scanIndex(row.getId());
+                       addCacheEntry(entry);
+                    }
+                    return true;
+                }
+             });
+          /*   synchronized(indexcache){
                  entry=getCacheEntry(row.getId());
                  if(entry==null){
                      entry=index.scanIndex(row.getId());
                      addCacheEntry(entry);
                  }
-             }
+             }*/
          }else{
              entry=index.scanIndex(row.getId());
          }
          if(entry.getRowSize()>=row.getSize()){
             // Add to the existing location
             switch(entry.getLocation()){
-                case FILEA  :synchronized(filealock){
-                                if(filearandom==null){
-                                    filearandom=new TransactionalFile(getFileName(FILEA),"rw");
-                                   // fca=filearandom.getChannel();
-                                }
-                                filearandom.seek(entry.getPosition());
-                                row.writeToFile(filearandom);
-                                
-                                fca.force(false);
+                case FILEA  :
+               //     synchronized(filealock){
+                     Thread.doIt(new Callable<Boolean>() {
+                        public Boolean call() throws Exception {
+                            if(filearandom==null){
+                                filearandom=new TransactionalFile(getFileName(FILEA),"rw");
+                               // fca=filearandom.getChannel();
                             }
-                            break;
-                case FILEB :synchronized(fileblock){
+                            filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
+                            ((RowTransactional) (args.get(0))).writeToFile(filearandom);
+                            return true;
+                            //fca.force(false);
+                        }
+                     });    
+               //   }
+                    break;
+                case FILEB :
+              //      synchronized(fileblock){
+                    Thread.doIt(new Callable<Boolean>() {
+                        public Boolean call() throws Exception {
                                 if(filebrandom==null){
                                     filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
                                   //  fcb=filebrandom.getChannel();
                                 }
-                                filebrandom.seek(entry.getPosition());
-                                row.writeToFile(filebrandom);
-                                
-                                fcb.force(false);
-                            }
+                                filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
+                                ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
+                                return true;
+                                //fcb.force(false);
+                        }
+                    });
                     break;
             }
          }else{
@@ -510,50 +574,66 @@ public class DualFileTableTransactional implements TableTransactional{
               }
               rowswitch=!rowswitch;
          }
-         synchronized(statlock){
-              atomicfields.setstat_update(atomicfields.getstat_update()+1);
-              atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
-         }
+         //synchronized(statlock){
+         Thread.doIt(new Callable<Boolean>() {
+              public Boolean call() throws Exception{ 
+                atomicfields.setstat_update(atomicfields.getstat_update()+1);
+                atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
+                return true;
+              }
+         });
      }
      
      private void updateRowA(RowTransactional row) throws IOException{
-         synchronized(filealock){
-              openFile(FILEA);
-              //int pre=filebstream.size();
-              int pre=(int)fileastream.getFilePointer();
-              //row.writeToStream(filebstream);
-              row.writeToFile(fileastream);
-              //int post=filebstream.size();
-              int post=(int)fileastream.getFilePointer();
-              //fileastream.flush();
-              index.updateEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
-              
-              // Handle index entry caching
-              if(INDEXCACHESIZE>0){
-                  updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()));
+         final Vector args = new Vector();
+         args.add(row);
+         Thread.doIt(new Callable<Boolean>() {
+              public Boolean call() throws Exception{ 
+         //synchronized(filealock){
+                  openFile(FILEA);
+                  //int pre=filebstream.size();
+                  int pre=(int)fileastream.getFilePointer();
+                  //row.writeToStream(filebstream);
+                  ((RowTransactional)(args.get(0))).writeToFile(fileastream);
+                  //int post=filebstream.size();
+                  int post=(int)fileastream.getFilePointer();
+                  //fileastream.flush();
+                  index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
+
+                  // Handle index entry caching
+                  if(INDEXCACHESIZE>0){
+                      updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
+                  }
+                  atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
               }
-              atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
-          }
+          });
      }
 
      private void updateRowB(RowTransactional row) throws IOException{
-         synchronized(fileblock){
-              openFile(FILEB);
+         final Vector args = new Vector();
+         args.add(row);
+         Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{ 
+         
+         //synchronized(fileblock){
+                openFile(FILEB);
               //int pre=filebstream.size();
-              int pre=(int)filebstream.getFilePointer();
+                int pre=(int)filebstream.getFilePointer();
               //row.writeToStream(filebstream);
-              row.writeToFile(filebstream);
+                ((RowTransactional)(args.get(0))).writeToFile(filebstream);
               //int post=filebstream.size();
-              int post=(int)filebstream.getFilePointer();
+                int post=(int)filebstream.getFilePointer();
               //filebstream.flush();
-              index.updateEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
+                index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
               // Handle index entry caching
               // Handle index entry caching
                 if(INDEXCACHESIZE>0){
-                    updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()));
+                    updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
                 }
                 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
-          }
+                return true;
+            }
+        });
      }
 
      /**
@@ -566,9 +646,14 @@ public class DualFileTableTransactional implements TableTransactional{
               removeCacheEntry(row.getId());
           }
           index.updateEntry(row.getId(),row.getSize(),DELETE,0);
-          synchronized(statlock){
-              atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
-          }
+          Thread.doIt(new Callable<Boolean>() {
+             public Boolean call() throws Exception{
+          //synchronized(statlock){
+                atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
+                return true;
+             }
+          });
+         // }
      }
      
      /**
@@ -634,10 +719,15 @@ public class DualFileTableTransactional implements TableTransactional{
                   }
                   RowTransactional row=RowTransactional.readFromStream(data);
                   data.close();
-                  synchronized(statlock){
-                       atomicfields.setstat_read(atomicfields.getstat_read()+1);
-                       atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());
-                  }
+                  final Vector args = new Vector();
+                  args.add(row);
+                  Thread.doIt(new Callable<Boolean>() {
+                      public Boolean call() throws Exception{
+                  //synchronized(statlock){
+                          atomicfields.setstat_read(atomicfields.getstat_read()+1);
+                          atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
+                      }
+                  });
                   return row;
               }