2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
35 import TransactionalIO.core.TransactionalFile;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
42 import dstm2.util.StringKeyHashMap;
43 import java.util.concurrent.Callable;
46 * The table stores a group of rows.
47 * Every row must have a unique id within a table.
49 public class DualFileTableTransactional implements TableTransactional{
52 private int INDEXCACHESIZE=8192;
54 private String filealock="filea-dummy";
55 private String fileblock="fileb-dummy";
57 // private DataOutputStream fileastream=null;
58 // private DataOutputStream filebstream=null;
59 private TransactionalFile fileastream=null;
60 private TransactionalFile filebstream=null;
61 // private RandomAccessFile filearandom=null;
62 private TransactionalFile filearandom=null;
63 private TransactionalFile filebrandom=null;
64 // FileChannel fca=null;
65 // FileChannel fcb=null;
66 private TableIndexTransactional index=null;
68 // private long fileaposition=0;
69 // private long filebposition=0;
71 private boolean rowswitch=true;
74 private String location;
76 private TableIndexNodeTransactional indexcachefirst;
77 private TableIndexNodeTransactional indexcachelast;
78 //private int indexcacheusage;
80 private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
81 //private Hashtable<String,TableIndexNode> indexcache;
84 DualFileTableTSInf atomicfields;
86 public @atomic interface DualFileTableTSInf{
88 long getstat_update();
89 long getstat_delete();
90 long getstat_add_size();
91 long getstat_update_size();
92 long getstat_read_size();
94 long getstat_cache_hit();
95 long getstat_cache_miss();
96 long getstat_cache_drop();
97 long getFileaposition();
98 long getFilebposition();
99 int getIndexcacheusage();
101 void setstat_add(long val);
102 void setstat_update(long val);
103 void setstat_delete(long val);
104 void setstat_add_size(long val);
105 void setstat_update_size(long val);
106 void setstat_read_size(long val);
107 void setstat_read(long val);
108 void setstat_cache_hit(long val);
109 void setstat_cache_miss(long val);
110 void setstat_cache_drop(long val);
111 void setIndexcacheusage(int val);
112 void setFileaposition(long val);
113 void setFilebposition(long val);
119 long stat_add_size=0;
120 long stat_update_size=0;
121 long stat_read_size=0;
123 long stat_cache_hit=0;
124 long stat_cache_miss=0;
125 long stat_cache_drop=0;*/
127 protected String statlock="stat-dummy";
130 * Return the current values of the statistic counters and reset them.
131 * The current counters are:
134 * <li>stat_table_update
135 * <li>stat_table_delete
136 * <li>stat_table_add_size
137 * <li>stat_table_update_size
138 * <li>stat_table_read_size
139 * <li>stat_table_read
140 * <li>stat_table_cache_hit
141 * <li>stat_table_cache_miss
142 * <li>stat_table_cache_drop
144 * Furthermore, the index will be asked to deliver separate index specific counters
146 public Hashtable<String,Long> readStatistics(){
148 // synchronized(statlock){
149 return Thread.doIt(new Callable<Hashtable<String,Long>>() {
150 public Hashtable<String,Long> call() throws Exception{
151 Hashtable<String,Long> hash=new Hashtable<String,Long>();
152 hash.put("stat_table_add",atomicfields.getstat_add());
153 hash.put("stat_table_update",atomicfields.getstat_update());
154 hash.put("stat_table_delete",atomicfields.getstat_delete());
155 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
156 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
157 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
158 hash.put("stat_table_read",atomicfields.getstat_read());
159 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
160 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
161 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
162 atomicfields.setstat_add(0);
163 atomicfields.setstat_update(0);
164 atomicfields.setstat_delete(0);
165 atomicfields.setstat_add_size(0);
166 atomicfields.setstat_update_size(0);
167 atomicfields.setstat_read_size(0);
168 atomicfields.setstat_read(0);
169 atomicfields.setstat_cache_hit(0);
170 atomicfields.setstat_cache_miss(0);
171 atomicfields.setstat_cache_drop(0);
172 Hashtable<String,Long> ihash=index.readStatistics();
181 * Create a new table object with the default flat index model
186 * Create a new table object with a specific index model
188 public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
190 this.location=location;
191 if(!this.location.endsWith(File.separator))this.location+=File.separator;
193 case PAGED : index=new PagedIndexTransactional(getFileName(INDEX));
197 indexcachefirst=null;
199 atomicfields.setFileaposition(0);
200 atomicfields.setFilebposition(0);
201 atomicfields.setIndexcacheusage(0);
202 indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
206 * Set the maximal allowable size of the index cache.
208 public void setIndexCacheSize(int newsize){
209 INDEXCACHESIZE=newsize;
213 * Close all open file streams
217 if(fileastream!=null)fileastream.close();
218 if(filebstream!=null)filebstream.close();
219 if(filearandom!=null)filearandom.close();
220 if(filebrandom!=null)filebrandom.close();
222 }catch(Exception e){}
226 * Returns the name of this table
228 public String getTitle(){
233 * Returns the location of this tables datafiles
235 public String getLocation(){
239 protected String getFileName(int type){
241 case FILEB : return location+title+".a";
242 case FILEA : return location+title+".b";
243 case INDEX : return location+title+".index";
249 * Delete the files created by this table object.
250 * Be aware that this will delete any data stored in this table!
252 public void deleteFiles(){
254 File ftest=new File(getFileName(FILEA));
256 }catch(Exception e){}
258 File ftest=new File(getFileName(FILEB));
260 }catch(Exception e){}
262 File ftest=new File(getFileName(INDEX));
264 }catch(Exception e){}
267 private /*synchronized*/ void openFile(int type) throws IOException{
269 case FILEA : if(fileastream==null){
270 // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
271 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
273 //File ftest=new File(getFileName(FILEA));
274 //atomicfields.setFileaposition(ftest.length());
275 atomicfields.setFileaposition(fileastream.length());
276 fileastream.seek(fileastream.length());
279 case FILEB : if(filebstream==null){
280 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
281 //File ftest=new File(getFileName(FILEB));
282 //atomicfields.setFilebposition(ftest.length());
283 atomicfields.setFilebposition(filebstream.length());
284 filebstream.seek(filebstream.length());
291 * Adds a row of data to this table.
293 public void addRow(RowTransactional row) throws IOException{
294 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
300 rowswitch=!rowswitch;
303 private void addCacheEntry(TableIndexEntryTransactional entry){
304 // synchronized(indexcache){
305 if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
306 // remove first entry
307 TableIndexNodeTransactional node=indexcachefirst;
308 indexcache.remove(node.getData().getId());
309 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
310 // synchronized(statlock){
311 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
313 indexcachefirst=node.getNext();
314 if(indexcachefirst==null){
317 indexcachefirst.setPrevious(null);
320 TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
321 if(indexcachelast!=null){
322 indexcachelast.setNext(node);
324 if(indexcachefirst==null){
325 indexcachefirst=node;
328 indexcache.put(entry.getId(),node);
329 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
333 private void addRowA(RowTransactional row) throws IOException{
334 //synchronized(filealock){
335 final Vector args = new Vector();
337 Thread.doIt(new Callable<Boolean>() {
338 public Boolean call() throws Exception{
340 //int pre=fileastream.size();
341 int pre= (int)fileastream.getFilePointer();
342 //row.writeToStream(fileastream);
343 ((RowTransactional)args.get(0)).writeToFile(fileastream);
344 //int post= fileastream.size();
345 int post= (int)fileastream.getFilePointer();
346 //fileastream.flush();
348 //synchronized(statlock){
349 atomicfields.setstat_add(atomicfields.getstat_add()+1);
350 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
353 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
354 if(INDEXCACHESIZE>0){
355 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
356 addCacheEntry(entry);
358 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
363 private void addRowB(RowTransactional row) throws IOException{
364 // synchronized(fileblock){
365 final Vector args = new Vector();
367 Thread.doIt(new Callable<Boolean>() {
368 public Boolean call() throws Exception{
370 //int pre=filebstream.size();
371 int pre= (int)filebstream.getFilePointer();
372 //row.writeToStream(filebstream);
373 ((RowTransactional)args.get(0)).writeToFile(filebstream);
374 int post=(int)filebstream.getFilePointer();
375 //int post=filebstream.size();
376 //filebstream.flush();
377 //synchronized(statlock){
378 atomicfields.setstat_add(atomicfields.getstat_add()+1);
379 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
381 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
382 if(INDEXCACHESIZE>0){
383 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
384 addCacheEntry(entry);
386 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
393 private void updateCacheEntry(TableIndexEntryTransactional entry){
394 final Vector args = new Vector();
396 Thread.doIt(new Callable<Boolean>() {
397 public Boolean call() throws Exception{
398 //synchronized(indexcache){
399 if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
400 TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
401 node.setData(((TableIndexEntryTransactional)(args.get(0))));
402 if(node!=indexcachelast){
403 if(node==indexcachefirst){
404 indexcachefirst=node.getNext();
407 indexcachelast.setNext(node);
408 node.setPrevious(indexcachelast);
413 addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
420 private void removeCacheEntry(String id){
421 //synchronized(indexcache){
422 final Vector args = new Vector();
424 Thread.doIt(new Callable<Boolean>() {
425 public Boolean call() throws Exception{
427 if(indexcache.containsKey((String)(args.get(0)))){
428 TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
429 indexcache.remove((String)(args.get(0)));
430 if(atomicfields.getIndexcacheusage()==1){
431 indexcachefirst=null;
433 atomicfields.setIndexcacheusage(0);
436 if(node==indexcachefirst){
437 indexcachefirst=node.getNext();
438 indexcachefirst.setPrevious(null);
439 }else if(node==indexcachelast){
440 indexcachelast=node.getPrevious();
441 indexcachelast.setNext(null);
445 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
446 // synchronized(statlock){
447 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
456 private TableIndexEntryTransactional getCacheEntry(String id){
457 final Vector args = new Vector();
460 TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
461 public TableIndexEntryTransactional call() throws Exception{
462 //synchronized(indexcache){
463 if(indexcache.containsKey((String)(args.get(0)))){
464 TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
465 if(node!=indexcachelast){
466 if(node==indexcachefirst){
467 indexcachefirst=node.getNext();
470 indexcachelast.setNext(node);
471 node.setPrevious(indexcachelast);
475 // synchronized(statlock){
476 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
478 return node.getData();
486 Thread.doIt(new Callable<Boolean>() {
487 public Boolean call() throws Exception{
488 //synchronized(statlock){
489 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
498 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
499 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
501 public void addOrUpdateRow(RowTransactional row) throws IOException{
502 RowTransactional tmprow=getRow(row.getId());
511 * Updates a row stored in this table.
513 public void updateRow(RowTransactional row) throws IOException{
514 TableIndexEntryTransactional entry=null;
515 final Vector args = new Vector();
518 // Handle index entry caching
519 if(INDEXCACHESIZE>0){
520 Thread.doIt(new Callable<Boolean>() {
521 public Boolean call() throws Exception {
522 TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
523 RowTransactional row = (RowTransactional) (args.get(0));
524 entry = getCacheEntry(row.getId());
526 entry=index.scanIndex(row.getId());
527 addCacheEntry(entry);
532 /* synchronized(indexcache){
533 entry=getCacheEntry(row.getId());
535 entry=index.scanIndex(row.getId());
536 addCacheEntry(entry);
540 entry=index.scanIndexTransactional(row.getId());
542 if(entry.getRowSize()>=row.getSize()){
543 // Add to the existing location
544 switch(entry.getLocation()){
546 // synchronized(filealock){
547 Thread.doIt(new Callable<Boolean>() {
548 public Boolean call() throws Exception {
549 if(filearandom==null){
550 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
551 // fca=filearandom.getChannel();
553 filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
554 ((RowTransactional) (args.get(0))).writeToFile(filearandom);
562 // synchronized(fileblock){
563 Thread.doIt(new Callable<Boolean>() {
564 public Boolean call() throws Exception {
565 if(filebrandom==null){
566 filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
567 // fcb=filebrandom.getChannel();
569 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
570 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
583 rowswitch=!rowswitch;
585 //synchronized(statlock){
586 Thread.doIt(new Callable<Boolean>() {
587 public Boolean call() throws Exception{
588 atomicfields.setstat_update(atomicfields.getstat_update()+1);
589 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
595 private void updateRowA(RowTransactional row) throws IOException{
596 final Vector args = new Vector();
598 Thread.doIt(new Callable<Boolean>() {
599 public Boolean call() throws Exception{
600 //synchronized(filealock){
602 //int pre=filebstream.size();
603 int pre=(int)fileastream.getFilePointer();
604 //row.writeToStream(filebstream);
605 ((RowTransactional)(args.get(0))).writeToFile(fileastream);
606 //int post=filebstream.size();
607 int post=(int)fileastream.getFilePointer();
608 //fileastream.flush();
609 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
611 // Handle index entry caching
612 if(INDEXCACHESIZE>0){
613 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
615 atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
621 private void updateRowB(RowTransactional row) throws IOException{
622 final Vector args = new Vector();
624 Thread.doIt(new Callable<Boolean>() {
625 public Boolean call() throws Exception{
627 //synchronized(fileblock){
629 //int pre=filebstream.size();
630 int pre=(int)filebstream.getFilePointer();
631 //row.writeToStream(filebstream);
632 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
633 //int post=filebstream.size();
634 int post=(int)filebstream.getFilePointer();
635 //filebstream.flush();
636 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
637 // Handle index entry caching
638 // Handle index entry caching
639 if(INDEXCACHESIZE>0){
640 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
642 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
649 * Marks a row as deleted in the index.
650 * Be aware that the space consumed by the row is not actually reclaimed.
652 public void deleteRow(RowTransactional row) throws IOException{
653 // Handle index entry caching
654 if(INDEXCACHESIZE>0){
655 removeCacheEntry(row.getId());
657 index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
658 Thread.doIt(new Callable<Boolean>() {
659 public Boolean call() throws Exception{
660 //synchronized(statlock){
661 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
669 * Returns a tuplestream containing the given list of rows
671 public TupleStreamTransactional getRows(List<String> rows) throws IOException{
672 return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
676 * Returns a tuplestream containing the rows matching the given rowmatcher
678 public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
679 return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
683 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
685 public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
686 return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
690 * Returns a tuplestream of all rows in this table.
692 public TupleStreamTransactional getRows() throws IOException{
693 // return new TableReader(this);
694 return new IndexedTableReaderTransactional(this,index.scanIndex());
698 * Returns a single row stored in this table.
699 * If the row does not exist in the table, null will be returned.
701 public RowTransactional getRow(String id) throws IOException{
702 TableIndexEntryTransactional entry=null;
703 // Handle index entry caching
704 if(INDEXCACHESIZE>0){
705 final Vector args = new Vector();
708 //synchronized(indexcache){
709 Thread.doIt(new Callable<Boolean>() {
710 public Boolean call() throws Exception{
711 TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
712 String id = (String) (args.get(0));
713 entry=getCacheEntry(id);
715 entry=index.scanIndex(id);
717 addCacheEntry(entry);
724 entry=index.scanIndexTransactional(id);
728 DataInputStream data=null;
729 if(entry.getLocation()==Table.FILEA){
730 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
731 }else if(entry.getLocation()==Table.FILEB){
732 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
735 while(dataoffset!=entry.getPosition()){
736 dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
738 RowTransactional row=RowTransactional.readFromStream(data);
740 final Vector args = new Vector();
742 Thread.doIt(new Callable<Boolean>() {
743 public Boolean call() throws Exception{
744 //synchronized(statlock){
745 atomicfields.setstat_read(atomicfields.getstat_read()+1);
746 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());