From 7dbbfc14b310c29e18e7d14ce6e649c87d95e13a Mon Sep 17 00:00:00 2001 From: navid Date: Fri, 23 Jan 2009 19:42:02 +0000 Subject: [PATCH] *** empty log message *** --- .../core/DualFileTableTransactional.java | 91 +++++++++++-------- .../core/IndexedTableReaderTransactional.java | 35 +++++-- .../core/PagedIndexTransactional.java | 82 +++++++++++++---- .../core/TableIndexTransactional.java | 2 + 4 files changed, 146 insertions(+), 64 deletions(-) diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java index 3037100f..b55c46c6 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java @@ -144,32 +144,37 @@ public class DualFileTableTransactional implements TableTransactional{ * Furthermore, the index will be asked to deliver separate index specific counters */ public Hashtable readStatistics(){ - Hashtable hash=new Hashtable(); - synchronized(statlock){ - hash.put("stat_table_add",atomicfields.getstat_add()); - hash.put("stat_table_update",atomicfields.getstat_update()); - hash.put("stat_table_delete",atomicfields.getstat_delete()); - hash.put("stat_table_add_size",atomicfields.getstat_add_size()); - hash.put("stat_table_update_size",atomicfields.getstat_update_size()); - hash.put("stat_table_read_size",atomicfields.getstat_read_size()); - hash.put("stat_table_read",atomicfields.getstat_read()); - hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit()); - hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss()); - hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop()); - atomicfields.setstat_add(0); - atomicfields.setstat_update(0); - atomicfields.setstat_delete(0); - atomicfields.setstat_add_size(0); - atomicfields.setstat_update_size(0); - atomicfields.setstat_read_size(0); - atomicfields.setstat_read(0); - atomicfields.setstat_cache_hit(0); - atomicfields.setstat_cache_miss(0); - atomicfields.setstat_cache_drop(0); - Hashtable ihash=index.readStatistics(); - hash.putAll(ihash); - } - return hash; + + // synchronized(statlock){ + return Thread.doIt(new Callable>() { + public Hashtable call() throws Exception{ + Hashtable hash=new Hashtable(); + hash.put("stat_table_add",atomicfields.getstat_add()); + hash.put("stat_table_update",atomicfields.getstat_update()); + hash.put("stat_table_delete",atomicfields.getstat_delete()); + hash.put("stat_table_add_size",atomicfields.getstat_add_size()); + hash.put("stat_table_update_size",atomicfields.getstat_update_size()); + hash.put("stat_table_read_size",atomicfields.getstat_read_size()); + hash.put("stat_table_read",atomicfields.getstat_read()); + hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit()); + hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss()); + hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop()); + atomicfields.setstat_add(0); + atomicfields.setstat_update(0); + atomicfields.setstat_delete(0); + atomicfields.setstat_add_size(0); + atomicfields.setstat_update_size(0); + atomicfields.setstat_read_size(0); + atomicfields.setstat_read(0); + atomicfields.setstat_cache_hit(0); + atomicfields.setstat_cache_miss(0); + atomicfields.setstat_cache_drop(0); + Hashtable ihash=index.readStatistics(); + hash.putAll(ihash); + return hash; + } + }); + } /** @@ -259,7 +264,7 @@ public class DualFileTableTransactional implements TableTransactional{ }catch(Exception e){} } - private synchronized void openFile(int type) throws IOException{ + private /*synchronized*/ void openFile(int type) throws IOException{ switch(type){ case FILEA : if(fileastream==null){ // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true))); @@ -296,15 +301,15 @@ public class DualFileTableTransactional implements TableTransactional{ } private void addCacheEntry(TableIndexEntryTransactional entry){ - synchronized(indexcache){ + // synchronized(indexcache){ if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){ // remove first entry TableIndexNodeTransactional node=indexcachefirst; indexcache.remove(node.getData().getId()); atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1); - synchronized(statlock){ + // synchronized(statlock){ atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1); - } + // } indexcachefirst=node.getNext(); if(indexcachefirst==null){ indexcachelast=null; @@ -322,7 +327,7 @@ public class DualFileTableTransactional implements TableTransactional{ indexcachelast=node; indexcache.put(entry.getId(),node); atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1); - } + // } } private void addRowA(RowTransactional row) throws IOException{ @@ -351,6 +356,7 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post)); + return true; } }); } @@ -378,6 +384,7 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post)); + return true; } }); } @@ -405,6 +412,7 @@ public class DualFileTableTransactional implements TableTransactional{ }else{ addCacheEntry(((TableIndexEntryTransactional)(args.get(0)))); } + return true; } }); } @@ -529,7 +537,7 @@ public class DualFileTableTransactional implements TableTransactional{ } }*/ }else{ - entry=index.scanIndex(row.getId()); + entry=index.scanIndexTransactional(row.getId()); } if(entry.getRowSize()>=row.getSize()){ // Add to the existing location @@ -605,6 +613,7 @@ public class DualFileTableTransactional implements TableTransactional{ updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition())); } atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post)); + return true; } }); } @@ -645,7 +654,7 @@ public class DualFileTableTransactional implements TableTransactional{ if(INDEXCACHESIZE>0){ removeCacheEntry(row.getId()); } - index.updateEntry(row.getId(),row.getSize(),DELETE,0); + index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0); Thread.doIt(new Callable() { public Boolean call() throws Exception{ //synchronized(statlock){ @@ -693,7 +702,14 @@ public class DualFileTableTransactional implements TableTransactional{ TableIndexEntryTransactional entry=null; // Handle index entry caching if(INDEXCACHESIZE>0){ - synchronized(indexcache){ + final Vector args = new Vector(); + args.add(id); + args.add(entry); + //synchronized(indexcache){ + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1)); + String id = (String) (args.get(0)); entry=getCacheEntry(id); if(entry==null){ entry=index.scanIndex(id); @@ -701,9 +717,11 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } } - } + return true; + } + }); }else{ - entry=index.scanIndex(id); + entry=index.scanIndexTransactional(id); } if(entry!=null){ long dataoffset=0; @@ -726,6 +744,7 @@ public class DualFileTableTransactional implements TableTransactional{ //synchronized(statlock){ atomicfields.setstat_read(atomicfields.getstat_read()+1); atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize()); + return true; } }); return row; diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java index 2a038807..c034aa1a 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java @@ -34,6 +34,8 @@ import com.solidosystems.tuplesoup.filter.*; import java.io.*; import java.util.*; + import java.util.concurrent.Callable; + import dstm2.Thread; public class IndexedTableReaderTransactional extends TupleStreamTransactional{ private DataInputStream fileastream=null; @@ -141,11 +143,20 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{ while(fileaposition!=nextfilea.getPosition()){ fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition)); } + RowTransactional row=RowTransactional.readFromStream(fileastream); - synchronized(table.statlock){ - table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize()); - table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); - } + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(table.statlock){ + table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize()); + table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); + return true; + } + //} + }); + fileaposition+=row.getSize(); if(row.getId().equals(entry.getId())){ next=row; @@ -183,10 +194,18 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{ filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition)); } RowTransactional row=RowTransactional.readFromStream(filebstream); - synchronized(table.statlock){ - table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize()); - table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); - } + + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + //synchronized(table.statlock){ + table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize()); + table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); + return true; + } + }); + filebposition+=row.getSize(); if(row.getId().equals(entry.getId())){ next=row; diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java index 90602ed0..b33122c1 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java @@ -8,11 +8,14 @@ package com.solidosystems.tuplesoup.core; import TransactionalIO.core.TransactionalFile; import dstm2.AtomicArray; import dstm2.atomic; +import dstm2.Thread; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Hashtable; import java.util.List; +import java.util.Vector; +import java.util.concurrent.Callable; /** * @@ -87,48 +90,87 @@ public class PagedIndexTransactional implements TableIndexTransactional{ return id.hashCode() & (INITIALPAGEHASH-1); } - private synchronized TableIndexPageTransactional getFirstFreePage(String id) throws IOException{ + private /*synchronized*/ TableIndexPageTransactional getFirstFreePage(String id) throws IOException{ return atomicfields.getRoots().get(rootHash(id)).getFirstFreePage(id, id.hashCode()); } - private synchronized long getOffset(String id) throws IOException{ + private /*synchronized*/ long getOffset(String id) throws IOException{ if(atomicfields.getRoots()==null)return -1; return atomicfields.getRoots().get(rootHash(id)).getOffset(id,id.hashCode()); } - public synchronized void updateEntry(String id,int rowsize,int location,long position) throws IOException{ + public /*synchronized*/ void updateEntry(String id,int rowsize,int location,long position) throws IOException{ long offset=getOffset(id); out.seek(offset); TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id,rowsize,location,position); entry.updateData(out); atomicfields.setStat_write(atomicfields.getStat_write()+1); } - public synchronized void addEntry(String id,int rowsize,int location,long position) throws IOException{ + + public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException{ + final String id2 = id; + final int rowsize2 = rowsize; + final int location2 = location; + final long position2 = position; + + Thread.doIt(new Callable() { + public Boolean call() throws Exception{ + long offset=getOffset(id2); + out.seek(offset); + TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id2,rowsize2,location2,position2); + entry.updateData(out); + atomicfields.setStat_write(atomicfields.getStat_write()+1); + return true; + } + }); + } + + public /*synchronized*/ void addEntry(String id,int rowsize,int location,long position) throws IOException{ TableIndexPageTransactional page=getFirstFreePage(id); page.addEntry(id,rowsize,location,position); atomicfields.setStat_write(atomicfields.getStat_write()+1); } - public synchronized TableIndexEntryTransactional scanIndex(String id) throws IOException{ + public /*synchronized*/ TableIndexEntryTransactional scanIndex(String id) throws IOException{ if(atomicfields.getRoots()==null)return null; return atomicfields.getRoots().get(rootHash(id)).scanIndex(id,id.hashCode()); } - public synchronized List scanIndex(List rows) throws IOException{ - List lst=new ArrayList(); - for(int i=0;i() { + public TableIndexEntryTransactional call() throws Exception{ + if(atomicfields.getRoots()==null)return null; + return atomicfields.getRoots().get(rootHash(id2)).scanIndex(id2,id2.hashCode()); } - } - return lst; + }); } - public synchronized List scanIndex() throws IOException{ - ArrayList lst=new ArrayList(); - for(int i=0;i scanIndex(List rows) throws IOException{ + final List rows2 = rows; + return Thread.doIt(new Callable>() { + public List call() throws Exception{ + List lst=new ArrayList(); + for(int i=0;i scanIndex() throws IOException{ + return Thread.doIt(new Callable>() { + public List call() throws Exception{ + ArrayList lst=new ArrayList(); + for(int i=0;i readStatistics(); public void updateEntry(String id,int rowsize,int location,long position) throws IOException; + public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException; public void addEntry(String id,int rowsize,int location,long position) throws IOException; public TableIndexEntryTransactional scanIndex(String id) throws IOException; + public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException; public List scanIndex(List rows) throws IOException; public List scanIndex() throws IOException; public void close(); -- 2.34.1