*** empty log message ***
authornavid <navid>
Fri, 23 Jan 2009 19:42:02 +0000 (19:42 +0000)
committernavid <navid>
Fri, 23 Jan 2009 19:42:02 +0000 (19:42 +0000)
Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java
Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java
Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java
Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/TableIndexTransactional.java

index 3037100f0ee8b65e5b9b7a2994ea34001b38a771..b55c46c6310495812e57310dd09d272d448007f0 100644 (file)
@@ -144,32 +144,37 @@ public class DualFileTableTransactional implements TableTransactional{
      * Furthermore, the index will be asked to deliver separate index specific counters
      */
     public Hashtable<String,Long> readStatistics(){
-        Hashtable<String,Long> hash=new Hashtable<String,Long>();
-        synchronized(statlock){
-            hash.put("stat_table_add",atomicfields.getstat_add());
-            hash.put("stat_table_update",atomicfields.getstat_update());
-            hash.put("stat_table_delete",atomicfields.getstat_delete());
-            hash.put("stat_table_add_size",atomicfields.getstat_add_size());
-            hash.put("stat_table_update_size",atomicfields.getstat_update_size());
-            hash.put("stat_table_read_size",atomicfields.getstat_read_size());
-            hash.put("stat_table_read",atomicfields.getstat_read());
-            hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
-            hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
-            hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
-            atomicfields.setstat_add(0);
-            atomicfields.setstat_update(0);
-            atomicfields.setstat_delete(0);
-            atomicfields.setstat_add_size(0);
-            atomicfields.setstat_update_size(0);
-            atomicfields.setstat_read_size(0);
-            atomicfields.setstat_read(0);
-            atomicfields.setstat_cache_hit(0);
-            atomicfields.setstat_cache_miss(0);
-            atomicfields.setstat_cache_drop(0);
-            Hashtable<String,Long> ihash=index.readStatistics();
-            hash.putAll(ihash);
-        }
-        return hash;
+        
+    //    synchronized(statlock){
+        return Thread.doIt(new Callable<Hashtable<String,Long>>() {
+            public Hashtable<String,Long> call() throws Exception{
+                Hashtable<String,Long> hash=new Hashtable<String,Long>();
+                hash.put("stat_table_add",atomicfields.getstat_add());
+                hash.put("stat_table_update",atomicfields.getstat_update());
+                hash.put("stat_table_delete",atomicfields.getstat_delete());
+                hash.put("stat_table_add_size",atomicfields.getstat_add_size());
+                hash.put("stat_table_update_size",atomicfields.getstat_update_size());
+                hash.put("stat_table_read_size",atomicfields.getstat_read_size());
+                hash.put("stat_table_read",atomicfields.getstat_read());
+                hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
+                hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
+                hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
+                atomicfields.setstat_add(0);
+                atomicfields.setstat_update(0);
+                atomicfields.setstat_delete(0);
+                atomicfields.setstat_add_size(0);
+                atomicfields.setstat_update_size(0);
+                atomicfields.setstat_read_size(0);
+                atomicfields.setstat_read(0);
+                atomicfields.setstat_cache_hit(0);
+                atomicfields.setstat_cache_miss(0);
+                atomicfields.setstat_cache_drop(0);
+                Hashtable<String,Long> ihash=index.readStatistics();
+                hash.putAll(ihash);
+                return hash;
+            }
+        });
+        
     }
     
     /**
@@ -259,7 +264,7 @@ public class DualFileTableTransactional implements TableTransactional{
          }catch(Exception e){}
     }
      
-     private synchronized void openFile(int type) throws IOException{
+     private /*synchronized*/ void openFile(int type) throws IOException{
          switch(type){
              case FILEA  : if(fileastream==null){
                                // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
@@ -296,15 +301,15 @@ public class DualFileTableTransactional implements TableTransactional{
     }
      
      private void addCacheEntry(TableIndexEntryTransactional entry){
-         synchronized(indexcache){
+      //   synchronized(indexcache){
              if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
                  // remove first entry
                  TableIndexNodeTransactional node=indexcachefirst;
                  indexcache.remove(node.getData().getId());
                  atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
-                 synchronized(statlock){
+              //   synchronized(statlock){
                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
-                 }
+             //    }
                  indexcachefirst=node.getNext();
                  if(indexcachefirst==null){
                     indexcachelast=null;
@@ -322,7 +327,7 @@ public class DualFileTableTransactional implements TableTransactional{
              indexcachelast=node;
              indexcache.put(entry.getId(),node);
              atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
-        }
+     //   }
      }
      
      private void addRowA(RowTransactional row) throws IOException{
@@ -351,6 +356,7 @@ public class DualFileTableTransactional implements TableTransactional{
                     addCacheEntry(entry);
                 }
                 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
+                return true;
             }
          });
      }
@@ -378,6 +384,7 @@ public class DualFileTableTransactional implements TableTransactional{
                   addCacheEntry(entry);
                 }
                 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
+                return true;
             }
          });
      }
@@ -405,6 +412,7 @@ public class DualFileTableTransactional implements TableTransactional{
               }else{
                   addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
               }
+              return true;
             }
         });
       }
@@ -529,7 +537,7 @@ public class DualFileTableTransactional implements TableTransactional{
                  }
              }*/
          }else{
-             entry=index.scanIndex(row.getId());
+             entry=index.scanIndexTransactional(row.getId());
          }
          if(entry.getRowSize()>=row.getSize()){
             // Add to the existing location
@@ -605,6 +613,7 @@ public class DualFileTableTransactional implements TableTransactional{
                       updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
                   }
                   atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
+                  return true;
               }
           });
      }
@@ -645,7 +654,7 @@ public class DualFileTableTransactional implements TableTransactional{
           if(INDEXCACHESIZE>0){
               removeCacheEntry(row.getId());
           }
-          index.updateEntry(row.getId(),row.getSize(),DELETE,0);
+          index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
           Thread.doIt(new Callable<Boolean>() {
              public Boolean call() throws Exception{
           //synchronized(statlock){
@@ -693,7 +702,14 @@ public class DualFileTableTransactional implements TableTransactional{
          TableIndexEntryTransactional entry=null;
           // Handle index entry caching
           if(INDEXCACHESIZE>0){
-              synchronized(indexcache){
+              final Vector args = new Vector();
+              args.add(id);
+              args.add(entry);
+              //synchronized(indexcache){
+              Thread.doIt(new Callable<Boolean>() {
+                public Boolean call() throws Exception{
+                  TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
+                  String id = (String) (args.get(0));  
                   entry=getCacheEntry(id);
                    if(entry==null){
                        entry=index.scanIndex(id);
@@ -701,9 +717,11 @@ public class DualFileTableTransactional implements TableTransactional{
                            addCacheEntry(entry);
                        }
                    }
-              }
+                  return true;
+                }
+              });
           }else{
-              entry=index.scanIndex(id);
+              entry=index.scanIndexTransactional(id);
           }
           if(entry!=null){
               long dataoffset=0;
@@ -726,6 +744,7 @@ public class DualFileTableTransactional implements TableTransactional{
                   //synchronized(statlock){
                           atomicfields.setstat_read(atomicfields.getstat_read()+1);
                           atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
+                          return true;
                       }
                   });
                   return row;
index 2a0388075a2f53a90fc7b5bd3805a669c95817d2..c034aa1a7bbfe48806ee2befeea6504dbdca81ae 100644 (file)
@@ -34,6 +34,8 @@
  import com.solidosystems.tuplesoup.filter.*;
  import java.io.*;
  import java.util.*;
+ import java.util.concurrent.Callable;
+ import dstm2.Thread;
 
 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
     private DataInputStream fileastream=null;
@@ -141,11 +143,20 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{
                     while(fileaposition!=nextfilea.getPosition()){
                         fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
                     }
+                    
                     RowTransactional row=RowTransactional.readFromStream(fileastream);
-                    synchronized(table.statlock){
-                       table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
-                       table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
-                    }
+                    final Vector args = new Vector();
+                    args.add(row);
+                    Thread.doIt(new Callable<Boolean>() {
+                       public Boolean call() throws Exception{
+                    //synchronized(table.statlock){
+                        table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
+                        table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
+                        return true;
+                      }
+                      //}
+                    });
+                    
                     fileaposition+=row.getSize();
                     if(row.getId().equals(entry.getId())){
                         next=row;
@@ -183,10 +194,18 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{
                         filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
                     }
                     RowTransactional row=RowTransactional.readFromStream(filebstream);
-                    synchronized(table.statlock){
-                        table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
-                        table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
-                    }
+                    
+                    final Vector args = new Vector();
+                    args.add(row);
+                    Thread.doIt(new Callable<Boolean>() {
+                       public Boolean call() throws Exception{
+                    //synchronized(table.statlock){
+                            table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
+                            table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
+                            return true;
+                        }
+                    });
+                    
                     filebposition+=row.getSize();
                     if(row.getId().equals(entry.getId())){
                         next=row;
index 90602ed04b5f4b71e8d30dc72f49b8d5bb1a2586..b33122c14a2725e8db7e900fde5b74140f4ecaa4 100644 (file)
@@ -8,11 +8,14 @@ package com.solidosystems.tuplesoup.core;
 import TransactionalIO.core.TransactionalFile;
 import dstm2.AtomicArray;
 import dstm2.atomic;
+import dstm2.Thread;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.Callable;
 
 /**
  *
@@ -87,48 +90,87 @@ public class PagedIndexTransactional implements TableIndexTransactional{
         return id.hashCode() & (INITIALPAGEHASH-1);
     }
     
-    private synchronized TableIndexPageTransactional getFirstFreePage(String id) throws IOException{
+    private /*synchronized*/ TableIndexPageTransactional getFirstFreePage(String id) throws IOException{
         return atomicfields.getRoots().get(rootHash(id)).getFirstFreePage(id, id.hashCode());
     }
     
-    private synchronized long getOffset(String id) throws IOException{
+    private /*synchronized*/ long getOffset(String id) throws IOException{
         if(atomicfields.getRoots()==null)return -1;
         return atomicfields.getRoots().get(rootHash(id)).getOffset(id,id.hashCode());
     }
     
-    public synchronized void updateEntry(String id,int rowsize,int location,long position) throws IOException{
+    public /*synchronized*/ void updateEntry(String id,int rowsize,int location,long position) throws IOException{
         long offset=getOffset(id);
         out.seek(offset);
         TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id,rowsize,location,position);
         entry.updateData(out);
         atomicfields.setStat_write(atomicfields.getStat_write()+1);
     }
-    public synchronized void addEntry(String id,int rowsize,int location,long position) throws IOException{
+    
+    public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException{
+        final String id2 = id;
+        final int rowsize2 = rowsize;
+        final int location2 = location;
+        final long position2 = position;
+        
+        Thread.doIt(new Callable<Boolean>() {
+           public Boolean call() throws Exception{
+                long offset=getOffset(id2);
+                out.seek(offset);
+                TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id2,rowsize2,location2,position2);
+                entry.updateData(out);
+                atomicfields.setStat_write(atomicfields.getStat_write()+1);
+                return true;
+           }
+        });
+    }
+    
+    public /*synchronized*/ void addEntry(String id,int rowsize,int location,long position) throws IOException{
         TableIndexPageTransactional page=getFirstFreePage(id);
         page.addEntry(id,rowsize,location,position);
         atomicfields.setStat_write(atomicfields.getStat_write()+1);
     }
-    public synchronized TableIndexEntryTransactional scanIndex(String id) throws IOException{
+    public /*synchronized*/ TableIndexEntryTransactional scanIndex(String id) throws IOException{
         if(atomicfields.getRoots()==null)return null;
         return atomicfields.getRoots().get(rootHash(id)).scanIndex(id,id.hashCode());
     }
-    public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{
-        List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
-        for(int i=0;i<rows.size();i++){
-            String id=rows.get(i);
-            TableIndexEntryTransactional entry=scanIndex(id);
-            if(entry!=null){
-                if(entry.getLocation()!=Table.DELETE)lst.add(entry);
+    
+    public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException{
+        final String id2 = id;
+        return Thread.doIt(new Callable<TableIndexEntryTransactional>() {
+           public TableIndexEntryTransactional call() throws Exception{
+                if(atomicfields.getRoots()==null)return null;
+                    return atomicfields.getRoots().get(rootHash(id2)).scanIndex(id2,id2.hashCode());
             }
-        }
-        return lst;
+        });
     }
-    public synchronized List<TableIndexEntryTransactional> scanIndex() throws IOException{
-        ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
-        for(int i=0;i<INITIALPAGEHASH;i++){
-            atomicfields.getRoots().get(i).addEntriesToList(lst);
-        }
-        return lst;
+    
+    public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{
+        final List<String> rows2 = rows;
+        return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() {
+           public List<TableIndexEntryTransactional> call() throws Exception{
+                List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
+                for(int i=0;i<rows2.size();i++){
+                    String id=rows2.get(i);
+                    TableIndexEntryTransactional entry=scanIndex(id);
+                    if(entry!=null){
+                        if(entry.getLocation()!=Table.DELETE)lst.add(entry);
+                    }
+                }
+                return lst;
+           }
+        });
+    }
+    public /*synchronized*/ List<TableIndexEntryTransactional> scanIndex() throws IOException{
+        return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() {
+           public List<TableIndexEntryTransactional> call() throws Exception{
+                ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
+                for(int i=0;i<INITIALPAGEHASH;i++){
+                    atomicfields.getRoots().get(i).addEntriesToList(lst);
+                }
+                return lst;
+           }
+        });
     }
     public void close(){
         try{
index 27ed2a6551abd9d8d80013dddba3a55432d9e99a..b859cc2ca0bcb2e6288a956646e8ceffe96f2f61 100644 (file)
@@ -12,8 +12,10 @@ import java.io.*;
 public interface TableIndexTransactional extends AtomicSuperClass{
     public Hashtable<String,Long> readStatistics();
     public void updateEntry(String id,int rowsize,int location,long position) throws IOException;
+    public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException;
     public void addEntry(String id,int rowsize,int location,long position) throws IOException;
     public TableIndexEntryTransactional scanIndex(String id) throws IOException;
+    public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException;
     public List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException;
     public List<TableIndexEntryTransactional> scanIndex() throws IOException;
     public void close();