2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
35 import TransactionalIO.core.TransactionalFile;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
41 import dstm2.util.StringKeyHashMap;
44 * The table stores a group of rows.
45 * Every row must have a unique id within a table.
47 public class DualFileTableTransactional implements TableTransactional{
50 private int INDEXCACHESIZE=8192;
52 private String filealock="filea-dummy";
53 private String fileblock="fileb-dummy";
55 // private DataOutputStream fileastream=null;
56 // private DataOutputStream filebstream=null;
57 private TransactionalFile fileastream=null;
58 private TransactionalFile filebstream=null;
59 // private RandomAccessFile filearandom=null;
60 private TransactionalFile filearandom=null;
61 private TransactionalFile filebrandom=null;
64 private TableIndexTransactional index=null;
66 // private long fileaposition=0;
67 // private long filebposition=0;
69 private boolean rowswitch=true;
72 private String location;
74 private TableIndexNodeTransactional indexcachefirst;
75 private TableIndexNodeTransactional indexcachelast;
76 //private int indexcacheusage;
78 private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
79 //private Hashtable<String,TableIndexNode> indexcache;
82 DualFileTableTSInf atomicfields;
84 public @atomic interface DualFileTableTSInf{
86 long getstat_update();
87 long getstat_delete();
88 long getstat_add_size();
89 long getstat_update_size();
90 long getstat_read_size();
92 long getstat_cache_hit();
93 long getstat_cache_miss();
94 long getstat_cache_drop();
95 long getFileaposition();
96 long getFilebposition();
97 int getIndexcacheusage();
99 void setstat_add(long val);
100 void setstat_update(long val);
101 void setstat_delete(long val);
102 void setstat_add_size(long val);
103 void setstat_update_size(long val);
104 void setstat_read_size(long val);
105 void setstat_read(long val);
106 void setstat_cache_hit(long val);
107 void setstat_cache_miss(long val);
108 void setstat_cache_drop(long val);
109 void setIndexcacheusage(int val);
110 void setFileaposition(long val);
111 void setFilebposition(long val);
117 long stat_add_size=0;
118 long stat_update_size=0;
119 long stat_read_size=0;
121 long stat_cache_hit=0;
122 long stat_cache_miss=0;
123 long stat_cache_drop=0;*/
125 protected String statlock="stat-dummy";
128 * Return the current values of the statistic counters and reset them.
129 * The current counters are:
132 * <li>stat_table_update
133 * <li>stat_table_delete
134 * <li>stat_table_add_size
135 * <li>stat_table_update_size
136 * <li>stat_table_read_size
137 * <li>stat_table_read
138 * <li>stat_table_cache_hit
139 * <li>stat_table_cache_miss
140 * <li>stat_table_cache_drop
142 * Furthermore, the index will be asked to deliver separate index specific counters
144 public Hashtable<String,Long> readStatistics(){
145 Hashtable<String,Long> hash=new Hashtable<String,Long>();
146 synchronized(statlock){
147 hash.put("stat_table_add",atomicfields.getstat_add());
148 hash.put("stat_table_update",atomicfields.getstat_update());
149 hash.put("stat_table_delete",atomicfields.getstat_delete());
150 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
151 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
152 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
153 hash.put("stat_table_read",atomicfields.getstat_read());
154 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
155 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
156 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
157 atomicfields.setstat_add(0);
158 atomicfields.setstat_update(0);
159 atomicfields.setstat_delete(0);
160 atomicfields.setstat_add_size(0);
161 atomicfields.setstat_update_size(0);
162 atomicfields.setstat_read_size(0);
163 atomicfields.setstat_read(0);
164 atomicfields.setstat_cache_hit(0);
165 atomicfields.setstat_cache_miss(0);
166 atomicfields.setstat_cache_drop(0);
167 Hashtable<String,Long> ihash=index.readStatistics();
174 * Create a new table object with the default flat index model
179 * Create a new table object with a specific index model
181 public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
183 this.location=location;
184 if(!this.location.endsWith(File.separator))this.location+=File.separator;
186 case PAGED : index=new PagedIndexTransactional(getFileName(INDEX));
190 indexcachefirst=null;
192 atomicfields.setFileaposition(0);
193 atomicfields.setFilebposition(0);
194 atomicfields.setIndexcacheusage(0);
195 indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
199 * Set the maximal allowable size of the index cache.
201 public void setIndexCacheSize(int newsize){
202 INDEXCACHESIZE=newsize;
206 * Close all open file streams
210 if(fileastream!=null)fileastream.close();
211 if(filebstream!=null)filebstream.close();
212 if(filearandom!=null)filearandom.close();
213 if(filebrandom!=null)filebrandom.close();
215 }catch(Exception e){}
219 * Returns the name of this table
221 public String getTitle(){
226 * Returns the location of this tables datafiles
228 public String getLocation(){
232 protected String getFileName(int type){
234 case FILEB : return location+title+".a";
235 case FILEA : return location+title+".b";
236 case INDEX : return location+title+".index";
242 * Delete the files created by this table object.
243 * Be aware that this will delete any data stored in this table!
245 public void deleteFiles(){
247 File ftest=new File(getFileName(FILEA));
249 }catch(Exception e){}
251 File ftest=new File(getFileName(FILEB));
253 }catch(Exception e){}
255 File ftest=new File(getFileName(INDEX));
257 }catch(Exception e){}
260 private synchronized void openFile(int type) throws IOException{
262 case FILEA : if(fileastream==null){
263 // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
264 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
266 //File ftest=new File(getFileName(FILEA));
267 //atomicfields.setFileaposition(ftest.length());
268 atomicfields.setFileaposition(fileastream.length());
269 fileastream.seek(fileastream.length());
272 case FILEB : if(filebstream==null){
273 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
274 //File ftest=new File(getFileName(FILEB));
275 //atomicfields.setFilebposition(ftest.length());
276 atomicfields.setFilebposition(filebstream.length());
277 filebstream.seek(filebstream.length());
284 * Adds a row of data to this table.
286 public void addRow(RowTransactional row) throws IOException{
287 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
293 rowswitch=!rowswitch;
296 private void addCacheEntry(TableIndexEntryTransactional entry){
297 synchronized(indexcache){
298 if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
299 // remove first entry
300 TableIndexNodeTransactional node=indexcachefirst;
301 indexcache.remove(node.getData().getId());
302 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
303 synchronized(statlock){
304 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
306 indexcachefirst=node.getNext();
307 if(indexcachefirst==null){
310 indexcachefirst.setPrevious(null);
313 TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
314 if(indexcachelast!=null){
315 indexcachelast.setNext(node);
317 if(indexcachefirst==null){
318 indexcachefirst=node;
321 indexcache.put(entry.getId(),node);
322 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
326 private void addRowA(RowTransactional row) throws IOException{
327 synchronized(filealock){
329 //int pre=fileastream.size();
330 int pre= (int)fileastream.getFilePointer();
331 //row.writeToStream(fileastream);
332 row.writeToFile(fileastream);
333 //int post= fileastream.size();
334 int post= (int)fileastream.getFilePointer();
335 //fileastream.flush();
337 synchronized(statlock){
338 atomicfields.setstat_add(atomicfields.getstat_add()+1);
339 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
342 index.addEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFilebposition());
343 if(INDEXCACHESIZE>0){
344 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
345 addCacheEntry(entry);
347 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
350 private void addRowB(RowTransactional row) throws IOException{
351 synchronized(fileblock){
353 //int pre=filebstream.size();
354 int pre= (int)filebstream.getFilePointer();
355 //row.writeToStream(filebstream);
356 row.writeToFile(filebstream);
357 int post=(int)filebstream.getFilePointer();
358 //int post=filebstream.size();
359 //filebstream.flush();
360 synchronized(statlock){
361 atomicfields.setstat_add(atomicfields.getstat_add()+1);
362 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
364 index.addEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
365 if(INDEXCACHESIZE>0){
366 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
367 addCacheEntry(entry);
369 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
374 private void updateCacheEntry(TableIndexEntryTransactional entry){
375 synchronized(indexcache){
376 if(indexcache.containsKey(entry.getId())){
377 TableIndexNodeTransactional node=indexcache.get(entry.getId());
379 if(node!=indexcachelast){
380 if(node==indexcachefirst){
381 indexcachefirst=node.getNext();
384 indexcachelast.setNext(node);
385 node.setPrevious(indexcachelast);
390 addCacheEntry(entry);
395 private void removeCacheEntry(String id){
396 synchronized(indexcache){
397 if(indexcache.containsKey(id)){
398 TableIndexNodeTransactional node=indexcache.get(id);
399 indexcache.remove(id);
400 if(atomicfields.getIndexcacheusage()==1){
401 indexcachefirst=null;
403 atomicfields.setIndexcacheusage(0);
406 if(node==indexcachefirst){
407 indexcachefirst=node.getNext();
408 indexcachefirst.setPrevious(null);
409 }else if(node==indexcachelast){
410 indexcachelast=node.getPrevious();
411 indexcachelast.setNext(null);
415 atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
416 synchronized(statlock){
417 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
423 private TableIndexEntryTransactional getCacheEntry(String id){
424 synchronized(indexcache){
425 if(indexcache.containsKey(id)){
426 TableIndexNodeTransactional node=indexcache.get(id);
427 if(node!=indexcachelast){
428 if(node==indexcachefirst){
429 indexcachefirst=node.getNext();
432 indexcachelast.setNext(node);
433 node.setPrevious(indexcachelast);
437 synchronized(statlock){
438 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
440 return node.getData();
443 synchronized(statlock){
444 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
450 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
451 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
453 public void addOrUpdateRow(RowTransactional row) throws IOException{
454 RowTransactional tmprow=getRow(row.getId());
463 * Updates a row stored in this table.
465 public void updateRow(RowTransactional row) throws IOException{
466 TableIndexEntryTransactional entry=null;
467 // Handle index entry caching
468 if(INDEXCACHESIZE>0){
469 synchronized(indexcache){
470 entry=getCacheEntry(row.getId());
472 entry=index.scanIndex(row.getId());
473 addCacheEntry(entry);
477 entry=index.scanIndex(row.getId());
479 if(entry.getRowSize()>=row.getSize()){
480 // Add to the existing location
481 switch(entry.getLocation()){
482 case FILEA :synchronized(filealock){
483 if(filearandom==null){
484 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
485 // fca=filearandom.getChannel();
487 filearandom.seek(entry.getPosition());
488 row.writeToFile(filearandom);
493 case FILEB :synchronized(fileblock){
494 if(filebrandom==null){
495 filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
496 // fcb=filebrandom.getChannel();
498 filebrandom.seek(entry.getPosition());
499 row.writeToFile(filebrandom);
511 rowswitch=!rowswitch;
513 synchronized(statlock){
514 atomicfields.setstat_update(atomicfields.getstat_update()+1);
515 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
519 private void updateRowA(RowTransactional row) throws IOException{
520 synchronized(filealock){
522 //int pre=filebstream.size();
523 int pre=(int)fileastream.getFilePointer();
524 //row.writeToStream(filebstream);
525 row.writeToFile(fileastream);
526 //int post=filebstream.size();
527 int post=(int)fileastream.getFilePointer();
528 //fileastream.flush();
529 index.updateEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
531 // Handle index entry caching
532 if(INDEXCACHESIZE>0){
533 updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()));
535 atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
539 private void updateRowB(RowTransactional row) throws IOException{
540 synchronized(fileblock){
542 //int pre=filebstream.size();
543 int pre=(int)filebstream.getFilePointer();
544 //row.writeToStream(filebstream);
545 row.writeToFile(filebstream);
546 //int post=filebstream.size();
547 int post=(int)filebstream.getFilePointer();
548 //filebstream.flush();
549 index.updateEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
550 // Handle index entry caching
551 // Handle index entry caching
552 if(INDEXCACHESIZE>0){
553 updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()));
555 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
560 * Marks a row as deleted in the index.
561 * Be aware that the space consumed by the row is not actually reclaimed.
563 public void deleteRow(RowTransactional row) throws IOException{
564 // Handle index entry caching
565 if(INDEXCACHESIZE>0){
566 removeCacheEntry(row.getId());
568 index.updateEntry(row.getId(),row.getSize(),DELETE,0);
569 synchronized(statlock){
570 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
575 * Returns a tuplestream containing the given list of rows
577 public TupleStreamTransactional getRows(List<String> rows) throws IOException{
578 return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
582 * Returns a tuplestream containing the rows matching the given rowmatcher
584 public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
585 return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
589 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
591 public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
592 return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
596 * Returns a tuplestream of all rows in this table.
598 public TupleStreamTransactional getRows() throws IOException{
599 // return new TableReader(this);
600 return new IndexedTableReaderTransactional(this,index.scanIndex());
604 * Returns a single row stored in this table.
605 * If the row does not exist in the table, null will be returned.
607 public RowTransactional getRow(String id) throws IOException{
608 TableIndexEntryTransactional entry=null;
609 // Handle index entry caching
610 if(INDEXCACHESIZE>0){
611 synchronized(indexcache){
612 entry=getCacheEntry(id);
614 entry=index.scanIndex(id);
616 addCacheEntry(entry);
621 entry=index.scanIndex(id);
625 DataInputStream data=null;
626 if(entry.getLocation()==Table.FILEA){
627 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
628 }else if(entry.getLocation()==Table.FILEB){
629 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
632 while(dataoffset!=entry.getPosition()){
633 dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
635 RowTransactional row=RowTransactional.readFromStream(data);
637 synchronized(statlock){
638 atomicfields.setstat_read(atomicfields.getstat_read()+1);
639 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());