2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
35 import TransactionalIO.core.TransactionalFile;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
42 import dstm2.util.StringKeyHashMap;
43 import java.util.concurrent.Callable;
46 * The table stores a group of rows.
47 * Every row must have a unique id within a table.
49 public class DualFileTableTransactional implements TableTransactional{
52 private int INDEXCACHESIZE=8192;
54 private String filealock="filea-dummy";
55 private String fileblock="fileb-dummy";
57 // private DataOutputStream fileastream=null;
58 // private DataOutputStream filebstream=null;
59 private TransactionalFile fileastream=null;
60 private TransactionalFile filebstream=null;
61 // private RandomAccessFile filearandom=null;
62 private TransactionalFile filearandom=null;
63 private TransactionalFile filebrandom=null;
64 // FileChannel fca=null;
65 // FileChannel fcb=null;
66 private TableIndexTransactional index=null;
68 // private long fileaposition=0;
69 // private long filebposition=0;
71 private boolean rowswitch=true;
74 private String location;
76 private TableIndexNodeTransactional indexcachefirst;
77 private TableIndexNodeTransactional indexcachelast;
78 //private int indexcacheusage;
80 private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
81 //private Hashtable<String,TableIndexNode> indexcache;
84 DualFileTableTSInf atomicfields;
86 public @atomic interface DualFileTableTSInf{
88 long getstat_update();
89 long getstat_delete();
90 long getstat_add_size();
91 long getstat_update_size();
92 long getstat_read_size();
94 long getstat_cache_hit();
95 long getstat_cache_miss();
96 long getstat_cache_drop();
97 long getFileaposition();
98 long getFilebposition();
99 int getIndexcacheusage();
101 void setstat_add(long val);
102 void setstat_update(long val);
103 void setstat_delete(long val);
104 void setstat_add_size(long val);
105 void setstat_update_size(long val);
106 void setstat_read_size(long val);
107 void setstat_read(long val);
108 void setstat_cache_hit(long val);
109 void setstat_cache_miss(long val);
110 void setstat_cache_drop(long val);
111 void setIndexcacheusage(int val);
112 void setFileaposition(long val);
113 void setFilebposition(long val);
119 long stat_add_size=0;
120 long stat_update_size=0;
121 long stat_read_size=0;
123 long stat_cache_hit=0;
124 long stat_cache_miss=0;
125 long stat_cache_drop=0;*/
127 protected String statlock="stat-dummy";
130 * Return the current values of the statistic counters and reset them.
131 * The current counters are:
134 * <li>stat_table_update
135 * <li>stat_table_delete
136 * <li>stat_table_add_size
137 * <li>stat_table_update_size
138 * <li>stat_table_read_size
139 * <li>stat_table_read
140 * <li>stat_table_cache_hit
141 * <li>stat_table_cache_miss
142 * <li>stat_table_cache_drop
144 * Furthermore, the index will be asked to deliver separate index specific counters
146 public Hashtable<String,Long> readStatistics(){
147 Hashtable<String,Long> hash=new Hashtable<String,Long>();
148 synchronized(statlock){
149 hash.put("stat_table_add",atomicfields.getstat_add());
150 hash.put("stat_table_update",atomicfields.getstat_update());
151 hash.put("stat_table_delete",atomicfields.getstat_delete());
152 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
153 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
154 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
155 hash.put("stat_table_read",atomicfields.getstat_read());
156 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
157 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
158 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
159 atomicfields.setstat_add(0);
160 atomicfields.setstat_update(0);
161 atomicfields.setstat_delete(0);
162 atomicfields.setstat_add_size(0);
163 atomicfields.setstat_update_size(0);
164 atomicfields.setstat_read_size(0);
165 atomicfields.setstat_read(0);
166 atomicfields.setstat_cache_hit(0);
167 atomicfields.setstat_cache_miss(0);
168 atomicfields.setstat_cache_drop(0);
169 Hashtable<String,Long> ihash=index.readStatistics();
176 * Create a new table object with the default flat index model
181 * Create a new table object with a specific index model
183 public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
185 this.location=location;
186 if(!this.location.endsWith(File.separator))this.location+=File.separator;
188 case PAGED : index=new PagedIndexTransactional(getFileName(INDEX));
192 indexcachefirst=null;
194 atomicfields.setFileaposition(0);
195 atomicfields.setFilebposition(0);
196 atomicfields.setIndexcacheusage(0);
197 indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
201 * Set the maximal allowable size of the index cache.
203 public void setIndexCacheSize(int newsize){
204 INDEXCACHESIZE=newsize;
208 * Close all open file streams
212 if(fileastream!=null)fileastream.close();
213 if(filebstream!=null)filebstream.close();
214 if(filearandom!=null)filearandom.close();
215 if(filebrandom!=null)filebrandom.close();
217 }catch(Exception e){}
221 * Returns the name of this table
223 public String getTitle(){
228 * Returns the location of this tables datafiles
230 public String getLocation(){
234 protected String getFileName(int type){
236 case FILEB : return location+title+".a";
237 case FILEA : return location+title+".b";
238 case INDEX : return location+title+".index";
244 * Delete the files created by this table object.
245 * Be aware that this will delete any data stored in this table!
247 public void deleteFiles(){
249 File ftest=new File(getFileName(FILEA));
251 }catch(Exception e){}
253 File ftest=new File(getFileName(FILEB));
255 }catch(Exception e){}
257 File ftest=new File(getFileName(INDEX));
259 }catch(Exception e){}
262 private synchronized void openFile(int type) throws IOException{
264 case FILEA : if(fileastream==null){
265 // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
266 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
268 //File ftest=new File(getFileName(FILEA));
269 //atomicfields.setFileaposition(ftest.length());
270 atomicfields.setFileaposition(fileastream.length());
271 fileastream.seek(fileastream.length());
274 case FILEB : if(filebstream==null){
275 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
276 //File ftest=new File(getFileName(FILEB));
277 //atomicfields.setFilebposition(ftest.length());
278 atomicfields.setFilebposition(filebstream.length());
279 filebstream.seek(filebstream.length());
286 * Adds a row of data to this table.
288 public void addRow(RowTransactional row) throws IOException{
289 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
295 rowswitch=!rowswitch;
298 private void addCacheEntry(TableIndexEntryTransactional entry){
299 synchronized(indexcache){
300 if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
301 // remove first entry
302 TableIndexNodeTransactional node=indexcachefirst;
303 indexcache.remove(node.getData().getId());
304 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
305 synchronized(statlock){
306 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
308 indexcachefirst=node.getNext();
309 if(indexcachefirst==null){
312 indexcachefirst.setPrevious(null);
315 TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
316 if(indexcachelast!=null){
317 indexcachelast.setNext(node);
319 if(indexcachefirst==null){
320 indexcachefirst=node;
323 indexcache.put(entry.getId(),node);
324 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
328 private void addRowA(RowTransactional row) throws IOException{
329 //synchronized(filealock){
330 final Vector args = new Vector();
332 Thread.doIt(new Callable<Boolean>() {
333 public Boolean call() throws Exception{
335 //int pre=fileastream.size();
336 int pre= (int)fileastream.getFilePointer();
337 //row.writeToStream(fileastream);
338 ((RowTransactional)args.get(0)).writeToFile(fileastream);
339 //int post= fileastream.size();
340 int post= (int)fileastream.getFilePointer();
341 //fileastream.flush();
343 //synchronized(statlock){
344 atomicfields.setstat_add(atomicfields.getstat_add()+1);
345 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
348 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
349 if(INDEXCACHESIZE>0){
350 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
351 addCacheEntry(entry);
353 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
357 private void addRowB(RowTransactional row) throws IOException{
358 // synchronized(fileblock){
359 final Vector args = new Vector();
361 Thread.doIt(new Callable<Boolean>() {
362 public Boolean call() throws Exception{
364 //int pre=filebstream.size();
365 int pre= (int)filebstream.getFilePointer();
366 //row.writeToStream(filebstream);
367 ((RowTransactional)args.get(0)).writeToFile(filebstream);
368 int post=(int)filebstream.getFilePointer();
369 //int post=filebstream.size();
370 //filebstream.flush();
371 //synchronized(statlock){
372 atomicfields.setstat_add(atomicfields.getstat_add()+1);
373 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
375 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
376 if(INDEXCACHESIZE>0){
377 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
378 addCacheEntry(entry);
380 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
386 private void updateCacheEntry(TableIndexEntryTransactional entry){
387 final Vector args = new Vector();
389 Thread.doIt(new Callable<Boolean>() {
390 public Boolean call() throws Exception{
391 //synchronized(indexcache){
392 if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
393 TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
394 node.setData(((TableIndexEntryTransactional)(args.get(0))));
395 if(node!=indexcachelast){
396 if(node==indexcachefirst){
397 indexcachefirst=node.getNext();
400 indexcachelast.setNext(node);
401 node.setPrevious(indexcachelast);
406 addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
412 private void removeCacheEntry(String id){
413 //synchronized(indexcache){
414 final Vector args = new Vector();
416 Thread.doIt(new Callable<Boolean>() {
417 public Boolean call() throws Exception{
419 if(indexcache.containsKey((String)(args.get(0)))){
420 TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
421 indexcache.remove((String)(args.get(0)));
422 if(atomicfields.getIndexcacheusage()==1){
423 indexcachefirst=null;
425 atomicfields.setIndexcacheusage(0);
428 if(node==indexcachefirst){
429 indexcachefirst=node.getNext();
430 indexcachefirst.setPrevious(null);
431 }else if(node==indexcachelast){
432 indexcachelast=node.getPrevious();
433 indexcachelast.setNext(null);
437 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
438 // synchronized(statlock){
439 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
448 private TableIndexEntryTransactional getCacheEntry(String id){
449 final Vector args = new Vector();
452 TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
453 public TableIndexEntryTransactional call() throws Exception{
454 //synchronized(indexcache){
455 if(indexcache.containsKey((String)(args.get(0)))){
456 TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
457 if(node!=indexcachelast){
458 if(node==indexcachefirst){
459 indexcachefirst=node.getNext();
462 indexcachelast.setNext(node);
463 node.setPrevious(indexcachelast);
467 // synchronized(statlock){
468 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
470 return node.getData();
478 Thread.doIt(new Callable<Boolean>() {
479 public Boolean call() throws Exception{
480 //synchronized(statlock){
481 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
490 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
491 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
493 public void addOrUpdateRow(RowTransactional row) throws IOException{
494 RowTransactional tmprow=getRow(row.getId());
503 * Updates a row stored in this table.
505 public void updateRow(RowTransactional row) throws IOException{
506 TableIndexEntryTransactional entry=null;
507 final Vector args = new Vector();
510 // Handle index entry caching
511 if(INDEXCACHESIZE>0){
512 Thread.doIt(new Callable<Boolean>() {
513 public Boolean call() throws Exception {
514 TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
515 RowTransactional row = (RowTransactional) (args.get(0));
516 entry = getCacheEntry(row.getId());
518 entry=index.scanIndex(row.getId());
519 addCacheEntry(entry);
524 /* synchronized(indexcache){
525 entry=getCacheEntry(row.getId());
527 entry=index.scanIndex(row.getId());
528 addCacheEntry(entry);
532 entry=index.scanIndex(row.getId());
534 if(entry.getRowSize()>=row.getSize()){
535 // Add to the existing location
536 switch(entry.getLocation()){
538 // synchronized(filealock){
539 Thread.doIt(new Callable<Boolean>() {
540 public Boolean call() throws Exception {
541 if(filearandom==null){
542 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
543 // fca=filearandom.getChannel();
545 filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
546 ((RowTransactional) (args.get(0))).writeToFile(filearandom);
554 // synchronized(fileblock){
555 Thread.doIt(new Callable<Boolean>() {
556 public Boolean call() throws Exception {
557 if(filebrandom==null){
558 filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
559 // fcb=filebrandom.getChannel();
561 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
562 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
575 rowswitch=!rowswitch;
577 //synchronized(statlock){
578 Thread.doIt(new Callable<Boolean>() {
579 public Boolean call() throws Exception{
580 atomicfields.setstat_update(atomicfields.getstat_update()+1);
581 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
587 private void updateRowA(RowTransactional row) throws IOException{
588 final Vector args = new Vector();
590 Thread.doIt(new Callable<Boolean>() {
591 public Boolean call() throws Exception{
592 //synchronized(filealock){
594 //int pre=filebstream.size();
595 int pre=(int)fileastream.getFilePointer();
596 //row.writeToStream(filebstream);
597 ((RowTransactional)(args.get(0))).writeToFile(fileastream);
598 //int post=filebstream.size();
599 int post=(int)fileastream.getFilePointer();
600 //fileastream.flush();
601 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
603 // Handle index entry caching
604 if(INDEXCACHESIZE>0){
605 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
607 atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
612 private void updateRowB(RowTransactional row) throws IOException{
613 final Vector args = new Vector();
615 Thread.doIt(new Callable<Boolean>() {
616 public Boolean call() throws Exception{
618 //synchronized(fileblock){
620 //int pre=filebstream.size();
621 int pre=(int)filebstream.getFilePointer();
622 //row.writeToStream(filebstream);
623 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
624 //int post=filebstream.size();
625 int post=(int)filebstream.getFilePointer();
626 //filebstream.flush();
627 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
628 // Handle index entry caching
629 // Handle index entry caching
630 if(INDEXCACHESIZE>0){
631 updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
633 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
640 * Marks a row as deleted in the index.
641 * Be aware that the space consumed by the row is not actually reclaimed.
643 public void deleteRow(RowTransactional row) throws IOException{
644 // Handle index entry caching
645 if(INDEXCACHESIZE>0){
646 removeCacheEntry(row.getId());
648 index.updateEntry(row.getId(),row.getSize(),DELETE,0);
649 Thread.doIt(new Callable<Boolean>() {
650 public Boolean call() throws Exception{
651 //synchronized(statlock){
652 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
660 * Returns a tuplestream containing the given list of rows
662 public TupleStreamTransactional getRows(List<String> rows) throws IOException{
663 return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
667 * Returns a tuplestream containing the rows matching the given rowmatcher
669 public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
670 return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
674 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
676 public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
677 return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
681 * Returns a tuplestream of all rows in this table.
683 public TupleStreamTransactional getRows() throws IOException{
684 // return new TableReader(this);
685 return new IndexedTableReaderTransactional(this,index.scanIndex());
689 * Returns a single row stored in this table.
690 * If the row does not exist in the table, null will be returned.
692 public RowTransactional getRow(String id) throws IOException{
693 TableIndexEntryTransactional entry=null;
694 // Handle index entry caching
695 if(INDEXCACHESIZE>0){
696 synchronized(indexcache){
697 entry=getCacheEntry(id);
699 entry=index.scanIndex(id);
701 addCacheEntry(entry);
706 entry=index.scanIndex(id);
710 DataInputStream data=null;
711 if(entry.getLocation()==Table.FILEA){
712 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
713 }else if(entry.getLocation()==Table.FILEB){
714 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
717 while(dataoffset!=entry.getPosition()){
718 dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
720 RowTransactional row=RowTransactional.readFromStream(data);
722 final Vector args = new Vector();
724 Thread.doIt(new Callable<Boolean>() {
725 public Boolean call() throws Exception{
726 //synchronized(statlock){
727 atomicfields.setstat_read(atomicfields.getstat_read()+1);
728 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());