2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
35 import TransactionalIO.core.TransactionalFile;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
42 import dstm2.benchmark.FinancialTransaction.FinancialTransactionDS;
43 import dstm2.factory.Factory;
44 import dstm2.util.StringKeyHashMap;
45 import java.util.concurrent.Callable;
48 * The table stores a group of rows.
49 * Every row must have a unique id within a table.
51 public class DualFileTableTransactional implements TableTransactional{
54 private int INDEXCACHESIZE=8192;
56 private String filealock="filea-dummy";
57 private String fileblock="fileb-dummy";
59 // private DataOutputStream fileastream=null;
60 // private DataOutputStream filebstream=null;
61 private TransactionalFile fileastream=null;
62 private TransactionalFile filebstream=null;
63 // private RandomAccessFile filearandom=null;
64 private TransactionalFile filearandom=null;
65 private TransactionalFile filebrandom=null;
66 // FileChannel fca=null;
67 // FileChannel fcb=null;
68 private TableIndexTransactional index=null;
70 // private long fileaposition=0;
71 // private long filebposition=0;
73 private boolean rowswitch=true;
76 private String location;
78 private TableIndexNodeTransactional indexcachefirst;
79 private TableIndexNodeTransactional indexcachelast;
80 //private int indexcacheusage;
82 private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
83 //private Hashtable<String,TableIndexNode> indexcache;
85 static Factory<DualFileTableTSInf> factory = Thread.makeFactory(DualFileTableTSInf.class);
86 // static Factory<FinancialTransactionDS> factory = Thread.makeFactory(FinancialTransactionDS.class);
87 DualFileTableTSInf atomicfields;
90 public @atomic interface DualFileTableTSInf{
92 Long getstat_update();
93 Long getstat_delete();
94 Long getstat_add_size();
95 Long getstat_update_size();
96 Long getstat_read_size();
98 Long getstat_cache_hit();
99 Long getstat_cache_miss();
100 Long getstat_cache_drop();
101 Long getFileaposition();
102 Long getFilebposition();
103 Integer getIndexcacheusage();
105 void setstat_add(Long val);
106 void setstat_update(Long val);
107 void setstat_delete(Long val);
108 void setstat_add_size(Long val);
109 void setstat_update_size(Long val);
110 void setstat_read_size(Long val);
111 void setstat_read(Long val);
112 void setstat_cache_hit(Long val);
113 void setstat_cache_miss(Long val);
114 void setstat_cache_drop(Long val);
115 void setIndexcacheusage(Integer val);
116 void setFileaposition(Long val);
117 void setFilebposition(Long val);
123 long stat_add_size=0;
124 long stat_update_size=0;
125 long stat_read_size=0;
127 long stat_cache_hit=0;
128 long stat_cache_miss=0;
129 long stat_cache_drop=0;*/
131 protected String statlock="stat-dummy";
134 * Return the current values of the statistic counters and reset them.
135 * The current counters are:
138 * <li>stat_table_update
139 * <li>stat_table_delete
140 * <li>stat_table_add_size
141 * <li>stat_table_update_size
142 * <li>stat_table_read_size
143 * <li>stat_table_read
144 * <li>stat_table_cache_hit
145 * <li>stat_table_cache_miss
146 * <li>stat_table_cache_drop
148 * Furthermore, the index will be asked to deliver separate index specific counters
150 public Hashtable<String,Long> readStatistics(){
152 // synchronized(statlock){
153 return Thread.doIt(new Callable<Hashtable<String,Long>>() {
154 public Hashtable<String,Long> call() throws Exception{
155 Hashtable<String,Long> hash=new Hashtable<String,Long>();
156 hash.put("stat_table_add",atomicfields.getstat_add());
157 hash.put("stat_table_update",atomicfields.getstat_update());
158 hash.put("stat_table_delete",atomicfields.getstat_delete());
159 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
160 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
161 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
162 hash.put("stat_table_read",atomicfields.getstat_read());
163 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
164 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
165 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
166 atomicfields.setstat_add(Long.valueOf(0));
167 atomicfields.setstat_update(Long.valueOf(0));
168 atomicfields.setstat_delete(Long.valueOf(0));
169 atomicfields.setstat_add_size(Long.valueOf(0));
170 atomicfields.setstat_update_size(Long.valueOf(0));
171 atomicfields.setstat_read_size(Long.valueOf(0));
172 atomicfields.getstat_read_size();
173 atomicfields.setstat_read(Long.valueOf(0));
174 atomicfields.setstat_cache_hit(Long.valueOf(0));
175 atomicfields.setstat_cache_miss(Long.valueOf(0));
176 atomicfields.setstat_cache_drop(Long.valueOf(0));
177 Hashtable<String,Long> ihash=index.readStatistics();
186 * Create a new table object with the default flat index model
191 * Create a new table object with a specific index model
193 public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
194 atomicfields = factory.create();
197 this.location=location;
198 if(!this.location.endsWith(File.separator))this.location+=File.separator;
200 case PAGED : index=new PagedIndexTransactional(getFileName(INDEX));
204 indexcachefirst=null;
206 atomicfields.setFileaposition(Long.valueOf(0));
207 atomicfields.setFilebposition(Long.valueOf(0));
208 atomicfields.setstat_update_size(Long.valueOf(0));
209 atomicfields.setstat_update(Long.valueOf(0));
210 atomicfields.setstat_read_size(Long.valueOf(0));
211 atomicfields.setstat_read(Long.valueOf(0));
212 atomicfields.setstat_delete(Long.valueOf(0));
213 atomicfields.setstat_cache_miss(Long.valueOf(0));
214 atomicfields.setstat_cache_hit(Long.valueOf(0));
215 atomicfields.setstat_cache_drop(Long.valueOf(0));
216 atomicfields.setstat_add_size(Long.valueOf(0));
217 atomicfields.setstat_add(Long.valueOf(0));
218 atomicfields.setIndexcacheusage(Integer.valueOf(0));
219 indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
223 * Set the maximal allowable size of the index cache.
225 public void setIndexCacheSize(int newsize){
226 INDEXCACHESIZE=newsize;
230 * Close all open file streams
234 if(fileastream!=null)fileastream.close();
235 if(filebstream!=null)filebstream.close();
236 if(filearandom!=null)filearandom.close();
237 if(filebrandom!=null)filebrandom.close();
239 }catch(Exception e){}
243 * Returns the name of this table
245 public String getTitle(){
250 * Returns the location of this tables datafiles
252 public String getLocation(){
256 protected String getFileName(int type){
258 case FILEB : return location+title+".a";
259 case FILEA : return location+title+".b";
260 case INDEX : return location+title+".index";
266 * Delete the files created by this table object.
267 * Be aware that this will delete any data stored in this table!
269 public void deleteFiles(){
271 File ftest=new File(getFileName(FILEA));
273 }catch(Exception e){}
275 File ftest=new File(getFileName(FILEB));
277 }catch(Exception e){}
279 File ftest=new File(getFileName(INDEX));
281 }catch(Exception e){}
284 private /*synchronized*/ void openFile(int type) throws IOException{
286 case FILEA : if(fileastream==null){
287 // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
288 System.out.println("file a " + getFileName(FILEA));
289 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
291 //File ftest=new File(getFileName(FILEA));
292 //atomicfields.setFileaposition(ftest.length());
293 atomicfields.setFileaposition(fileastream.length());
294 fileastream.seek(fileastream.length());
297 case FILEB : if(filebstream==null){
298 System.out.println("file a " + getFileName(FILEB));
299 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
300 //File ftest=new File(getFileName(FILEB));
301 //atomicfields.setFilebposition(ftest.length());
302 atomicfields.setFilebposition(filebstream.length());
303 filebstream.seek(filebstream.length());
310 * Adds a row of data to this table.
312 public void addRow(RowTransactional row) throws IOException{
313 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
319 rowswitch=!rowswitch;
322 private void addCacheEntry(TableIndexEntryTransactional entry){
323 // synchronized(indexcache){
324 if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
325 // remove first entry
326 TableIndexNodeTransactional node=indexcachefirst;
327 indexcache.remove(node.getData().getId());
328 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
329 // synchronized(statlock){
330 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
332 indexcachefirst=node.getNext();
333 if(indexcachefirst==null){
336 indexcachefirst.setPrevious(null);
339 TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
340 if(indexcachelast!=null){
341 indexcachelast.setNext(node);
343 if(indexcachefirst==null){
344 indexcachefirst=node;
347 indexcache.put(entry.getId(),node);
348 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
352 private void addRowA(RowTransactional row) throws IOException{
353 //synchronized(filealock){
354 final Vector args = new Vector();
356 Thread.doIt(new Callable<Boolean>() {
357 public Boolean call() throws Exception{
359 //int pre=fileastream.size();
360 int pre= (int)fileastream.getFilePointer();
361 //row.writeToStream(fileastream);
362 ((RowTransactional)args.get(0)).writeToFile(fileastream);
363 //int post= fileastream.size();
364 int post= (int)fileastream.getFilePointer();
365 //fileastream.flush();
367 //synchronized(statlock){
368 atomicfields.setstat_add(atomicfields.getstat_add()+1);
369 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
372 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
373 if(INDEXCACHESIZE>0){
374 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
375 addCacheEntry(entry);
377 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
382 private void addRowB(RowTransactional row) throws IOException{
383 // synchronized(fileblock){
384 final Vector args = new Vector();
386 Thread.doIt(new Callable<Boolean>() {
387 public Boolean call() throws Exception{
389 //int pre=filebstream.size();
390 int pre= (int)filebstream.getFilePointer();
391 //row.writeToStream(filebstream);
392 ((RowTransactional)args.get(0)).writeToFile(filebstream);
393 int post=(int)filebstream.getFilePointer();
394 //int post=filebstream.size();
395 //filebstream.flush();
396 //synchronized(statlock){
397 atomicfields.setstat_add(atomicfields.getstat_add()+1);
398 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
400 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
401 if(INDEXCACHESIZE>0){
402 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
403 addCacheEntry(entry);
405 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
412 private void updateCacheEntry(TableIndexEntryTransactional entry){
414 //synchronized(indexcache){
415 if(indexcache.containsKey(entry.getId())){
416 TableIndexNodeTransactional node=indexcache.get(entry.getId());
418 if(node!=indexcachelast){
419 if(node==indexcachefirst){
420 indexcachefirst=node.getNext();
423 indexcachelast.setNext(node);
424 node.setPrevious(indexcachelast);
429 addCacheEntry(entry);
436 private void removeCacheEntry(String id){
437 //synchronized(indexcache){
438 final Vector args = new Vector();
440 Thread.doIt(new Callable<Boolean>() {
441 public Boolean call() throws Exception{
443 if(indexcache.containsKey((String)(args.get(0)))){
444 TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
445 indexcache.remove((String)(args.get(0)));
446 if(atomicfields.getIndexcacheusage()==1){
447 indexcachefirst=null;
449 atomicfields.setIndexcacheusage(0);
452 if(node==indexcachefirst){
453 indexcachefirst=node.getNext();
454 indexcachefirst.setPrevious(null);
455 }else if(node==indexcachelast){
456 indexcachelast=node.getPrevious();
457 indexcachelast.setNext(null);
461 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
462 // synchronized(statlock){
463 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
472 private TableIndexEntryTransactional getCacheEntry(String id){
474 //synchronized(indexcache){
475 if(indexcache.containsKey(id)){
476 TableIndexNodeTransactional node=indexcache.get(id);
477 if(node!=indexcachelast){
478 if(node==indexcachefirst){
479 indexcachefirst=node.getNext();
482 indexcachelast.setNext(node);
483 node.setPrevious(indexcachelast);
487 // synchronized(statlock){
488 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
490 return node.getData();
493 //synchronized(statlock){
494 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
501 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
502 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
504 public void addOrUpdateRow(RowTransactional row) throws IOException{
505 RowTransactional tmprow=getRow(row.getId());
514 * Updates a row stored in this table.
516 public void updateRow(RowTransactional row) throws IOException{
517 TableIndexEntryTransactional entry=null;
518 final Vector args = new Vector();
521 // Handle index entry caching
522 if(INDEXCACHESIZE>0){
523 Thread.doIt(new Callable<Boolean>() {
524 public Boolean call() throws Exception {
525 TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
526 RowTransactional row = (RowTransactional) (args.get(0));
527 entry = getCacheEntry(row.getId());
529 entry=index.scanIndex(row.getId());
530 addCacheEntry(entry);
535 /* synchronized(indexcache){
536 entry=getCacheEntry(row.getId());
538 entry=index.scanIndex(row.getId());
539 addCacheEntry(entry);
543 entry=index.scanIndexTransactional(row.getId());
545 if(entry.getRowSize()>=row.getSize()){
546 // Add to the existing location
547 switch(entry.getLocation()){
549 // synchronized(filealock){
550 Thread.doIt(new Callable<Boolean>() {
551 public Boolean call() throws Exception {
552 if(filearandom==null){
553 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
554 // fca=filearandom.getChannel();
556 filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
557 ((RowTransactional) (args.get(0))).writeToFile(filearandom);
565 // synchronized(fileblock){
566 Thread.doIt(new Callable<Boolean>() {
567 public Boolean call() throws Exception {
568 if(filebrandom==null){
569 filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
570 // fcb=filebrandom.getChannel();
572 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
573 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
586 rowswitch=!rowswitch;
588 //synchronized(statlock){
589 Thread.doIt(new Callable<Boolean>() {
590 public Boolean call() throws Exception{
591 atomicfields.setstat_update(atomicfields.getstat_update()+1);
592 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
598 private void updateRowA(RowTransactional row) throws IOException{
599 final Vector args = new Vector();
601 Thread.doIt(new Callable<Boolean>() {
602 public Boolean call() throws Exception{
603 //synchronized(filealock){
605 //int pre=filebstream.size();
606 int pre=(int)fileastream.getFilePointer();
607 //row.writeToStream(filebstream);
608 ((RowTransactional)(args.get(0))).writeToFile(fileastream);
609 //int post=filebstream.size();
610 int post=(int)fileastream.getFilePointer();
611 //fileastream.flush();
612 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
614 // Handle index entry caching
615 if(INDEXCACHESIZE>0){
616 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
618 atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
624 private void updateRowB(RowTransactional row) throws IOException{
625 final Vector args = new Vector();
627 Thread.doIt(new Callable<Boolean>() {
628 public Boolean call() throws Exception{
630 //synchronized(fileblock){
632 //int pre=filebstream.size();
633 int pre=(int)filebstream.getFilePointer();
634 //row.writeToStream(filebstream);
635 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
636 //int post=filebstream.size();
637 int post=(int)filebstream.getFilePointer();
638 //filebstream.flush();
639 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
640 // Handle index entry caching
641 // Handle index entry caching
642 if(INDEXCACHESIZE>0){
643 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
645 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
652 * Marks a row as deleted in the index.
653 * Be aware that the space consumed by the row is not actually reclaimed.
655 public void deleteRow(RowTransactional row) throws IOException{
656 // Handle index entry caching
657 if(INDEXCACHESIZE>0){
658 removeCacheEntry(row.getId());
660 index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
661 Thread.doIt(new Callable<Boolean>() {
662 public Boolean call() throws Exception{
663 //synchronized(statlock){
664 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
672 * Returns a tuplestream containing the given list of rows
674 public TupleStreamTransactional getRows(List<String> rows) throws IOException{
675 return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
679 * Returns a tuplestream containing the rows matching the given rowmatcher
681 public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
682 return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
686 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
688 public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
689 return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
693 * Returns a tuplestream of all rows in this table.
695 public TupleStreamTransactional getRows() throws IOException{
696 // return new TableReader(this);
697 return new IndexedTableReaderTransactional(this,index.scanIndex());
701 * Returns a single row stored in this table.
702 * If the row does not exist in the table, null will be returned.
704 public RowTransactional getRow(String id) throws IOException{
705 TableIndexEntryTransactional entry=null;
706 // Handle index entry caching
707 if(INDEXCACHESIZE>0){
708 final Vector args = new Vector();
711 //synchronized(indexcache){
712 Thread.doIt(new Callable<Boolean>() {
713 public Boolean call() throws Exception{
714 TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
715 String id = (String) (args.get(0));
716 entry=getCacheEntry(id);
718 entry=index.scanIndex(id);
720 addCacheEntry(entry);
727 entry=index.scanIndexTransactional(id);
731 DataInputStream data=null;
732 if(entry.getLocation()==Table.FILEA){
733 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
734 }else if(entry.getLocation()==Table.FILEB){
735 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
738 while(dataoffset!=entry.getPosition()){
739 dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
741 RowTransactional row=RowTransactional.readFromStream(data);
743 final Vector args = new Vector();
745 Thread.doIt(new Callable<Boolean>() {
746 public Boolean call() throws Exception{
747 //synchronized(statlock){
748 atomicfields.setstat_read(atomicfields.getstat_read()+1);
749 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());