From: navid <navid> Date: Fri, 23 Jan 2009 19:42:02 +0000 (+0000) Subject: *** empty log message *** X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=7dbbfc14b310c29e18e7d14ce6e649c87d95e13a;p=IRC.git *** empty log message *** --- diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java index 3037100f..b55c46c6 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/DualFileTableTransactional.java @@ -144,32 +144,37 @@ public class DualFileTableTransactional implements TableTransactional{ * Furthermore, the index will be asked to deliver separate index specific counters */ public Hashtable<String,Long> readStatistics(){ - Hashtable<String,Long> hash=new Hashtable<String,Long>(); - synchronized(statlock){ - hash.put("stat_table_add",atomicfields.getstat_add()); - hash.put("stat_table_update",atomicfields.getstat_update()); - hash.put("stat_table_delete",atomicfields.getstat_delete()); - hash.put("stat_table_add_size",atomicfields.getstat_add_size()); - hash.put("stat_table_update_size",atomicfields.getstat_update_size()); - hash.put("stat_table_read_size",atomicfields.getstat_read_size()); - hash.put("stat_table_read",atomicfields.getstat_read()); - hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit()); - hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss()); - hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop()); - atomicfields.setstat_add(0); - atomicfields.setstat_update(0); - atomicfields.setstat_delete(0); - atomicfields.setstat_add_size(0); - atomicfields.setstat_update_size(0); - atomicfields.setstat_read_size(0); - atomicfields.setstat_read(0); - atomicfields.setstat_cache_hit(0); - atomicfields.setstat_cache_miss(0); - atomicfields.setstat_cache_drop(0); - Hashtable<String,Long> ihash=index.readStatistics(); - hash.putAll(ihash); - } - return hash; + + // synchronized(statlock){ + return Thread.doIt(new Callable<Hashtable<String,Long>>() { + public Hashtable<String,Long> call() throws Exception{ + Hashtable<String,Long> hash=new Hashtable<String,Long>(); + hash.put("stat_table_add",atomicfields.getstat_add()); + hash.put("stat_table_update",atomicfields.getstat_update()); + hash.put("stat_table_delete",atomicfields.getstat_delete()); + hash.put("stat_table_add_size",atomicfields.getstat_add_size()); + hash.put("stat_table_update_size",atomicfields.getstat_update_size()); + hash.put("stat_table_read_size",atomicfields.getstat_read_size()); + hash.put("stat_table_read",atomicfields.getstat_read()); + hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit()); + hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss()); + hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop()); + atomicfields.setstat_add(0); + atomicfields.setstat_update(0); + atomicfields.setstat_delete(0); + atomicfields.setstat_add_size(0); + atomicfields.setstat_update_size(0); + atomicfields.setstat_read_size(0); + atomicfields.setstat_read(0); + atomicfields.setstat_cache_hit(0); + atomicfields.setstat_cache_miss(0); + atomicfields.setstat_cache_drop(0); + Hashtable<String,Long> ihash=index.readStatistics(); + hash.putAll(ihash); + return hash; + } + }); + } /** @@ -259,7 +264,7 @@ public class DualFileTableTransactional implements TableTransactional{ }catch(Exception e){} } - private synchronized void openFile(int type) throws IOException{ + private /*synchronized*/ void openFile(int type) throws IOException{ switch(type){ case FILEA : if(fileastream==null){ // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true))); @@ -296,15 +301,15 @@ public class DualFileTableTransactional implements TableTransactional{ } private void addCacheEntry(TableIndexEntryTransactional entry){ - synchronized(indexcache){ + // synchronized(indexcache){ if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){ // remove first entry TableIndexNodeTransactional node=indexcachefirst; indexcache.remove(node.getData().getId()); atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1); - synchronized(statlock){ + // synchronized(statlock){ atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1); - } + // } indexcachefirst=node.getNext(); if(indexcachefirst==null){ indexcachelast=null; @@ -322,7 +327,7 @@ public class DualFileTableTransactional implements TableTransactional{ indexcachelast=node; indexcache.put(entry.getId(),node); atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1); - } + // } } private void addRowA(RowTransactional row) throws IOException{ @@ -351,6 +356,7 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post)); + return true; } }); } @@ -378,6 +384,7 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post)); + return true; } }); } @@ -405,6 +412,7 @@ public class DualFileTableTransactional implements TableTransactional{ }else{ addCacheEntry(((TableIndexEntryTransactional)(args.get(0)))); } + return true; } }); } @@ -529,7 +537,7 @@ public class DualFileTableTransactional implements TableTransactional{ } }*/ }else{ - entry=index.scanIndex(row.getId()); + entry=index.scanIndexTransactional(row.getId()); } if(entry.getRowSize()>=row.getSize()){ // Add to the existing location @@ -605,6 +613,7 @@ public class DualFileTableTransactional implements TableTransactional{ updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition())); } atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post)); + return true; } }); } @@ -645,7 +654,7 @@ public class DualFileTableTransactional implements TableTransactional{ if(INDEXCACHESIZE>0){ removeCacheEntry(row.getId()); } - index.updateEntry(row.getId(),row.getSize(),DELETE,0); + index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0); Thread.doIt(new Callable<Boolean>() { public Boolean call() throws Exception{ //synchronized(statlock){ @@ -693,7 +702,14 @@ public class DualFileTableTransactional implements TableTransactional{ TableIndexEntryTransactional entry=null; // Handle index entry caching if(INDEXCACHESIZE>0){ - synchronized(indexcache){ + final Vector args = new Vector(); + args.add(id); + args.add(entry); + //synchronized(indexcache){ + Thread.doIt(new Callable<Boolean>() { + public Boolean call() throws Exception{ + TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1)); + String id = (String) (args.get(0)); entry=getCacheEntry(id); if(entry==null){ entry=index.scanIndex(id); @@ -701,9 +717,11 @@ public class DualFileTableTransactional implements TableTransactional{ addCacheEntry(entry); } } - } + return true; + } + }); }else{ - entry=index.scanIndex(id); + entry=index.scanIndexTransactional(id); } if(entry!=null){ long dataoffset=0; @@ -726,6 +744,7 @@ public class DualFileTableTransactional implements TableTransactional{ //synchronized(statlock){ atomicfields.setstat_read(atomicfields.getstat_read()+1); atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize()); + return true; } }); return row; diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java index 2a038807..c034aa1a 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/IndexedTableReaderTransactional.java @@ -34,6 +34,8 @@ import com.solidosystems.tuplesoup.filter.*; import java.io.*; import java.util.*; + import java.util.concurrent.Callable; + import dstm2.Thread; public class IndexedTableReaderTransactional extends TupleStreamTransactional{ private DataInputStream fileastream=null; @@ -141,11 +143,20 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{ while(fileaposition!=nextfilea.getPosition()){ fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition)); } + RowTransactional row=RowTransactional.readFromStream(fileastream); - synchronized(table.statlock){ - table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize()); - table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); - } + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable<Boolean>() { + public Boolean call() throws Exception{ + //synchronized(table.statlock){ + table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize()); + table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); + return true; + } + //} + }); + fileaposition+=row.getSize(); if(row.getId().equals(entry.getId())){ next=row; @@ -183,10 +194,18 @@ public class IndexedTableReaderTransactional extends TupleStreamTransactional{ filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition)); } RowTransactional row=RowTransactional.readFromStream(filebstream); - synchronized(table.statlock){ - table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize()); - table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); - } + + final Vector args = new Vector(); + args.add(row); + Thread.doIt(new Callable<Boolean>() { + public Boolean call() throws Exception{ + //synchronized(table.statlock){ + table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize()); + table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1); + return true; + } + }); + filebposition+=row.getSize(); if(row.getId().equals(entry.getId())){ next=row; diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java index 90602ed0..b33122c1 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/PagedIndexTransactional.java @@ -8,11 +8,14 @@ package com.solidosystems.tuplesoup.core; import TransactionalIO.core.TransactionalFile; import dstm2.AtomicArray; import dstm2.atomic; +import dstm2.Thread; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Hashtable; import java.util.List; +import java.util.Vector; +import java.util.concurrent.Callable; /** * @@ -87,48 +90,87 @@ public class PagedIndexTransactional implements TableIndexTransactional{ return id.hashCode() & (INITIALPAGEHASH-1); } - private synchronized TableIndexPageTransactional getFirstFreePage(String id) throws IOException{ + private /*synchronized*/ TableIndexPageTransactional getFirstFreePage(String id) throws IOException{ return atomicfields.getRoots().get(rootHash(id)).getFirstFreePage(id, id.hashCode()); } - private synchronized long getOffset(String id) throws IOException{ + private /*synchronized*/ long getOffset(String id) throws IOException{ if(atomicfields.getRoots()==null)return -1; return atomicfields.getRoots().get(rootHash(id)).getOffset(id,id.hashCode()); } - public synchronized void updateEntry(String id,int rowsize,int location,long position) throws IOException{ + public /*synchronized*/ void updateEntry(String id,int rowsize,int location,long position) throws IOException{ long offset=getOffset(id); out.seek(offset); TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id,rowsize,location,position); entry.updateData(out); atomicfields.setStat_write(atomicfields.getStat_write()+1); } - public synchronized void addEntry(String id,int rowsize,int location,long position) throws IOException{ + + public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException{ + final String id2 = id; + final int rowsize2 = rowsize; + final int location2 = location; + final long position2 = position; + + Thread.doIt(new Callable<Boolean>() { + public Boolean call() throws Exception{ + long offset=getOffset(id2); + out.seek(offset); + TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id2,rowsize2,location2,position2); + entry.updateData(out); + atomicfields.setStat_write(atomicfields.getStat_write()+1); + return true; + } + }); + } + + public /*synchronized*/ void addEntry(String id,int rowsize,int location,long position) throws IOException{ TableIndexPageTransactional page=getFirstFreePage(id); page.addEntry(id,rowsize,location,position); atomicfields.setStat_write(atomicfields.getStat_write()+1); } - public synchronized TableIndexEntryTransactional scanIndex(String id) throws IOException{ + public /*synchronized*/ TableIndexEntryTransactional scanIndex(String id) throws IOException{ if(atomicfields.getRoots()==null)return null; return atomicfields.getRoots().get(rootHash(id)).scanIndex(id,id.hashCode()); } - public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{ - List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>(); - for(int i=0;i<rows.size();i++){ - String id=rows.get(i); - TableIndexEntryTransactional entry=scanIndex(id); - if(entry!=null){ - if(entry.getLocation()!=Table.DELETE)lst.add(entry); + + public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException{ + final String id2 = id; + return Thread.doIt(new Callable<TableIndexEntryTransactional>() { + public TableIndexEntryTransactional call() throws Exception{ + if(atomicfields.getRoots()==null)return null; + return atomicfields.getRoots().get(rootHash(id2)).scanIndex(id2,id2.hashCode()); } - } - return lst; + }); } - public synchronized List<TableIndexEntryTransactional> scanIndex() throws IOException{ - ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>(); - for(int i=0;i<INITIALPAGEHASH;i++){ - atomicfields.getRoots().get(i).addEntriesToList(lst); - } - return lst; + + public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{ + final List<String> rows2 = rows; + return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() { + public List<TableIndexEntryTransactional> call() throws Exception{ + List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>(); + for(int i=0;i<rows2.size();i++){ + String id=rows2.get(i); + TableIndexEntryTransactional entry=scanIndex(id); + if(entry!=null){ + if(entry.getLocation()!=Table.DELETE)lst.add(entry); + } + } + return lst; + } + }); + } + public /*synchronized*/ List<TableIndexEntryTransactional> scanIndex() throws IOException{ + return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() { + public List<TableIndexEntryTransactional> call() throws Exception{ + ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>(); + for(int i=0;i<INITIALPAGEHASH;i++){ + atomicfields.getRoots().get(i).addEntriesToList(lst); + } + return lst; + } + }); } public void close(){ try{ diff --git a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/TableIndexTransactional.java b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/TableIndexTransactional.java index 27ed2a65..b859cc2c 100644 --- a/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/TableIndexTransactional.java +++ b/Robust/Transactions/mytuplesoup/src/com/solidosystems/tuplesoup/core/TableIndexTransactional.java @@ -12,8 +12,10 @@ import java.io.*; public interface TableIndexTransactional extends AtomicSuperClass{ public Hashtable<String,Long> readStatistics(); public void updateEntry(String id,int rowsize,int location,long position) throws IOException; + public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException; public void addEntry(String id,int rowsize,int location,long position) throws IOException; public TableIndexEntryTransactional scanIndex(String id) throws IOException; + public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException; public List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException; public List<TableIndexEntryTransactional> scanIndex() throws IOException; public void close();