* Furthermore, the index will be asked to deliver separate index specific counters
*/
public Hashtable<String,Long> readStatistics(){
- Hashtable<String,Long> hash=new Hashtable<String,Long>();
- synchronized(statlock){
- hash.put("stat_table_add",atomicfields.getstat_add());
- hash.put("stat_table_update",atomicfields.getstat_update());
- hash.put("stat_table_delete",atomicfields.getstat_delete());
- hash.put("stat_table_add_size",atomicfields.getstat_add_size());
- hash.put("stat_table_update_size",atomicfields.getstat_update_size());
- hash.put("stat_table_read_size",atomicfields.getstat_read_size());
- hash.put("stat_table_read",atomicfields.getstat_read());
- hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
- hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
- hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
- atomicfields.setstat_add(0);
- atomicfields.setstat_update(0);
- atomicfields.setstat_delete(0);
- atomicfields.setstat_add_size(0);
- atomicfields.setstat_update_size(0);
- atomicfields.setstat_read_size(0);
- atomicfields.setstat_read(0);
- atomicfields.setstat_cache_hit(0);
- atomicfields.setstat_cache_miss(0);
- atomicfields.setstat_cache_drop(0);
- Hashtable<String,Long> ihash=index.readStatistics();
- hash.putAll(ihash);
- }
- return hash;
+
+ // synchronized(statlock){
+ return Thread.doIt(new Callable<Hashtable<String,Long>>() {
+ public Hashtable<String,Long> call() throws Exception{
+ Hashtable<String,Long> hash=new Hashtable<String,Long>();
+ hash.put("stat_table_add",atomicfields.getstat_add());
+ hash.put("stat_table_update",atomicfields.getstat_update());
+ hash.put("stat_table_delete",atomicfields.getstat_delete());
+ hash.put("stat_table_add_size",atomicfields.getstat_add_size());
+ hash.put("stat_table_update_size",atomicfields.getstat_update_size());
+ hash.put("stat_table_read_size",atomicfields.getstat_read_size());
+ hash.put("stat_table_read",atomicfields.getstat_read());
+ hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
+ hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
+ hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
+ atomicfields.setstat_add(0);
+ atomicfields.setstat_update(0);
+ atomicfields.setstat_delete(0);
+ atomicfields.setstat_add_size(0);
+ atomicfields.setstat_update_size(0);
+ atomicfields.setstat_read_size(0);
+ atomicfields.setstat_read(0);
+ atomicfields.setstat_cache_hit(0);
+ atomicfields.setstat_cache_miss(0);
+ atomicfields.setstat_cache_drop(0);
+ Hashtable<String,Long> ihash=index.readStatistics();
+ hash.putAll(ihash);
+ return hash;
+ }
+ });
+
}
/**
}catch(Exception e){}
}
- private synchronized void openFile(int type) throws IOException{
+ private /*synchronized*/ void openFile(int type) throws IOException{
switch(type){
case FILEA : if(fileastream==null){
// fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
}
private void addCacheEntry(TableIndexEntryTransactional entry){
- synchronized(indexcache){
+ // synchronized(indexcache){
if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
// remove first entry
TableIndexNodeTransactional node=indexcachefirst;
indexcache.remove(node.getData().getId());
atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
- synchronized(statlock){
+ // synchronized(statlock){
atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
- }
+ // }
indexcachefirst=node.getNext();
if(indexcachefirst==null){
indexcachelast=null;
indexcachelast=node;
indexcache.put(entry.getId(),node);
atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
- }
+ // }
}
private void addRowA(RowTransactional row) throws IOException{
addCacheEntry(entry);
}
atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
+ return true;
}
});
}
addCacheEntry(entry);
}
atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
+ return true;
}
});
}
}else{
addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
}
+ return true;
}
});
}
}
}*/
}else{
- entry=index.scanIndex(row.getId());
+ entry=index.scanIndexTransactional(row.getId());
}
if(entry.getRowSize()>=row.getSize()){
// Add to the existing location
updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
}
atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
+ return true;
}
});
}
if(INDEXCACHESIZE>0){
removeCacheEntry(row.getId());
}
- index.updateEntry(row.getId(),row.getSize(),DELETE,0);
+ index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
Thread.doIt(new Callable<Boolean>() {
public Boolean call() throws Exception{
//synchronized(statlock){
TableIndexEntryTransactional entry=null;
// Handle index entry caching
if(INDEXCACHESIZE>0){
- synchronized(indexcache){
+ final Vector args = new Vector();
+ args.add(id);
+ args.add(entry);
+ //synchronized(indexcache){
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
+ String id = (String) (args.get(0));
entry=getCacheEntry(id);
if(entry==null){
entry=index.scanIndex(id);
addCacheEntry(entry);
}
}
- }
+ return true;
+ }
+ });
}else{
- entry=index.scanIndex(id);
+ entry=index.scanIndexTransactional(id);
}
if(entry!=null){
long dataoffset=0;
//synchronized(statlock){
atomicfields.setstat_read(atomicfields.getstat_read()+1);
atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
+ return true;
}
});
return row;
import com.solidosystems.tuplesoup.filter.*;
import java.io.*;
import java.util.*;
+ import java.util.concurrent.Callable;
+ import dstm2.Thread;
public class IndexedTableReaderTransactional extends TupleStreamTransactional{
private DataInputStream fileastream=null;
while(fileaposition!=nextfilea.getPosition()){
fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
}
+
RowTransactional row=RowTransactional.readFromStream(fileastream);
- synchronized(table.statlock){
- table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
- table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
- }
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(table.statlock){
+ table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
+ table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
+ return true;
+ }
+ //}
+ });
+
fileaposition+=row.getSize();
if(row.getId().equals(entry.getId())){
next=row;
filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
}
RowTransactional row=RowTransactional.readFromStream(filebstream);
- synchronized(table.statlock){
- table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
- table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
- }
+
+ final Vector args = new Vector();
+ args.add(row);
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ //synchronized(table.statlock){
+ table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
+ table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
+ return true;
+ }
+ });
+
filebposition+=row.getSize();
if(row.getId().equals(entry.getId())){
next=row;
import TransactionalIO.core.TransactionalFile;
import dstm2.AtomicArray;
import dstm2.atomic;
+import dstm2.Thread;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.Callable;
/**
*
return id.hashCode() & (INITIALPAGEHASH-1);
}
- private synchronized TableIndexPageTransactional getFirstFreePage(String id) throws IOException{
+ private /*synchronized*/ TableIndexPageTransactional getFirstFreePage(String id) throws IOException{
return atomicfields.getRoots().get(rootHash(id)).getFirstFreePage(id, id.hashCode());
}
- private synchronized long getOffset(String id) throws IOException{
+ private /*synchronized*/ long getOffset(String id) throws IOException{
if(atomicfields.getRoots()==null)return -1;
return atomicfields.getRoots().get(rootHash(id)).getOffset(id,id.hashCode());
}
- public synchronized void updateEntry(String id,int rowsize,int location,long position) throws IOException{
+ public /*synchronized*/ void updateEntry(String id,int rowsize,int location,long position) throws IOException{
long offset=getOffset(id);
out.seek(offset);
TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id,rowsize,location,position);
entry.updateData(out);
atomicfields.setStat_write(atomicfields.getStat_write()+1);
}
- public synchronized void addEntry(String id,int rowsize,int location,long position) throws IOException{
+
+ public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException{
+ final String id2 = id;
+ final int rowsize2 = rowsize;
+ final int location2 = location;
+ final long position2 = position;
+
+ Thread.doIt(new Callable<Boolean>() {
+ public Boolean call() throws Exception{
+ long offset=getOffset(id2);
+ out.seek(offset);
+ TableIndexEntryTransactional entry=new TableIndexEntryTransactional(id2,rowsize2,location2,position2);
+ entry.updateData(out);
+ atomicfields.setStat_write(atomicfields.getStat_write()+1);
+ return true;
+ }
+ });
+ }
+
+ public /*synchronized*/ void addEntry(String id,int rowsize,int location,long position) throws IOException{
TableIndexPageTransactional page=getFirstFreePage(id);
page.addEntry(id,rowsize,location,position);
atomicfields.setStat_write(atomicfields.getStat_write()+1);
}
- public synchronized TableIndexEntryTransactional scanIndex(String id) throws IOException{
+ public /*synchronized*/ TableIndexEntryTransactional scanIndex(String id) throws IOException{
if(atomicfields.getRoots()==null)return null;
return atomicfields.getRoots().get(rootHash(id)).scanIndex(id,id.hashCode());
}
- public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{
- List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
- for(int i=0;i<rows.size();i++){
- String id=rows.get(i);
- TableIndexEntryTransactional entry=scanIndex(id);
- if(entry!=null){
- if(entry.getLocation()!=Table.DELETE)lst.add(entry);
+
+ public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException{
+ final String id2 = id;
+ return Thread.doIt(new Callable<TableIndexEntryTransactional>() {
+ public TableIndexEntryTransactional call() throws Exception{
+ if(atomicfields.getRoots()==null)return null;
+ return atomicfields.getRoots().get(rootHash(id2)).scanIndex(id2,id2.hashCode());
}
- }
- return lst;
+ });
}
- public synchronized List<TableIndexEntryTransactional> scanIndex() throws IOException{
- ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
- for(int i=0;i<INITIALPAGEHASH;i++){
- atomicfields.getRoots().get(i).addEntriesToList(lst);
- }
- return lst;
+
+ public synchronized List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException{
+ final List<String> rows2 = rows;
+ return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() {
+ public List<TableIndexEntryTransactional> call() throws Exception{
+ List<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
+ for(int i=0;i<rows2.size();i++){
+ String id=rows2.get(i);
+ TableIndexEntryTransactional entry=scanIndex(id);
+ if(entry!=null){
+ if(entry.getLocation()!=Table.DELETE)lst.add(entry);
+ }
+ }
+ return lst;
+ }
+ });
+ }
+ public /*synchronized*/ List<TableIndexEntryTransactional> scanIndex() throws IOException{
+ return Thread.doIt(new Callable<List<TableIndexEntryTransactional>>() {
+ public List<TableIndexEntryTransactional> call() throws Exception{
+ ArrayList<TableIndexEntryTransactional> lst=new ArrayList<TableIndexEntryTransactional>();
+ for(int i=0;i<INITIALPAGEHASH;i++){
+ atomicfields.getRoots().get(i).addEntriesToList(lst);
+ }
+ return lst;
+ }
+ });
}
public void close(){
try{
public interface TableIndexTransactional extends AtomicSuperClass{
public Hashtable<String,Long> readStatistics();
public void updateEntry(String id,int rowsize,int location,long position) throws IOException;
+ public void updateEntryTransactional(String id,int rowsize,int location,long position) throws IOException;
public void addEntry(String id,int rowsize,int location,long position) throws IOException;
public TableIndexEntryTransactional scanIndex(String id) throws IOException;
+ public TableIndexEntryTransactional scanIndexTransactional(String id) throws IOException;
public List<TableIndexEntryTransactional> scanIndex(List<String> rows) throws IOException;
public List<TableIndexEntryTransactional> scanIndex() throws IOException;
public void close();