2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
37 import java.nio.channels.*;
38 import com.solidosystems.tuplesoup.filter.*;
40 import dstm2.util.StringKeyHashMap;
43 * The table stores a group of rows.
44 * Every row must have a unique id within a table.
46 public class DualFileTableTransactional implements TableTransactional{
49 private int INDEXCACHESIZE=8192;
51 private String filealock="filea-dummy";
52 private String fileblock="fileb-dummy";
54 private DataOutputStream fileastream=null;
55 private DataOutputStream filebstream=null;
56 private RandomAccessFile filearandom=null;
57 private RandomAccessFile filebrandom=null;
60 private TableIndexTransactional index=null;
62 private long fileaposition=0;
63 private long filebposition=0;
65 private boolean rowswitch=true;
68 private String location;
70 private TableIndexNodeTransactional indexcachefirst;
71 private TableIndexNodeTransactional indexcachelast;
72 private int indexcacheusage;
74 private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
75 //private Hashtable<String,TableIndexNode> indexcache;
78 DualFileTableTSInf atomicfields;
80 public @atomic interface DualFileTableTSInf{
82 long getstat_update();
83 long getstat_delete();
84 long getstat_add_size();
85 long getstat_update_size();
86 long getstat_read_size();
88 long getstat_cache_hit();
89 long getstat_cache_miss();
90 long getstat_cache_drop();
92 void setstat_add(long val);
93 void setstat_update(long val);
94 void setstat_delete(long val);
95 void setstat_add_size(long val);
96 void setstat_update_size(long val);
97 void setstat_read_size(long val);
98 void setstat_read(long val);
99 void setstat_cache_hit(long val);
100 void setstat_cache_miss(long val);
101 void setstat_cache_drop(long val);
107 long stat_add_size=0;
108 long stat_update_size=0;
109 long stat_read_size=0;
111 long stat_cache_hit=0;
112 long stat_cache_miss=0;
113 long stat_cache_drop=0;*/
115 protected String statlock="stat-dummy";
118 * Return the current values of the statistic counters and reset them.
119 * The current counters are:
122 * <li>stat_table_update
123 * <li>stat_table_delete
124 * <li>stat_table_add_size
125 * <li>stat_table_update_size
126 * <li>stat_table_read_size
127 * <li>stat_table_read
128 * <li>stat_table_cache_hit
129 * <li>stat_table_cache_miss
130 * <li>stat_table_cache_drop
132 * Furthermore, the index will be asked to deliver separate index specific counters
134 public Hashtable<String,Long> readStatistics(){
135 Hashtable<String,Long> hash=new Hashtable<String,Long>();
136 synchronized(statlock){
137 hash.put("stat_table_add",atomicfields.getstat_add());
138 hash.put("stat_table_update",atomicfields.getstat_update());
139 hash.put("stat_table_delete",atomicfields.getstat_delete());
140 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
141 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
142 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
143 hash.put("stat_table_read",atomicfields.getstat_read());
144 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
145 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
146 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
147 atomicfields.setstat_add(0);
148 atomicfields.setstat_update(0);
149 atomicfields.setstat_delete(0);
150 atomicfields.setstat_add_size(0);
151 atomicfields.setstat_update_size(0);
152 atomicfields.setstat_read_size(0);
153 atomicfields.setstat_read(0);
154 atomicfields.setstat_cache_hit(0);
155 atomicfields.setstat_cache_miss(0);
156 atomicfields.setstat_cache_drop(0);
157 Hashtable<String,Long> ihash=index.readStatistics();
164 * Create a new table object with the default flat index model
169 * Create a new table object with a specific index model
171 public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
173 this.location=location;
174 if(!this.location.endsWith(File.separator))this.location+=File.separator;
176 case PAGED : index=new PagedIndexTransactional(getFileName(INDEX));
180 indexcachefirst=null;
183 indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
187 * Set the maximal allowable size of the index cache.
189 public void setIndexCacheSize(int newsize){
190 INDEXCACHESIZE=newsize;
194 * Close all open file streams
198 if(fileastream!=null)fileastream.close();
199 if(filebstream!=null)filebstream.close();
200 if(filearandom!=null)filearandom.close();
201 if(filebrandom!=null)filebrandom.close();
203 }catch(Exception e){}
207 * Returns the name of this table
209 public String getTitle(){
214 * Returns the location of this tables datafiles
216 public String getLocation(){
220 protected String getFileName(int type){
222 case FILEB : return location+title+".a";
223 case FILEA : return location+title+".b";
224 case INDEX : return location+title+".index";
230 * Delete the files created by this table object.
231 * Be aware that this will delete any data stored in this table!
233 public void deleteFiles(){
235 File ftest=new File(getFileName(FILEA));
237 }catch(Exception e){}
239 File ftest=new File(getFileName(FILEB));
241 }catch(Exception e){}
243 File ftest=new File(getFileName(INDEX));
245 }catch(Exception e){}
248 private synchronized void openFile(int type) throws IOException{
250 case FILEA : if(fileastream==null){
251 fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
252 File ftest=new File(getFileName(FILEA));
253 fileaposition=ftest.length();
256 case FILEB : if(filebstream==null){
257 filebstream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEB),true)));
258 File ftest=new File(getFileName(FILEB));
259 filebposition=ftest.length();
266 * Adds a row of data to this table.
268 public void addRow(RowTransactional row) throws IOException{
269 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
275 rowswitch=!rowswitch;
278 private void addCacheEntry(TableIndexEntryTransactional entry){
279 synchronized(indexcache){
280 if(indexcacheusage>INDEXCACHESIZE){
281 // remove first entry
282 TableIndexNodeTransactional node=indexcachefirst;
283 indexcache.remove(node.getData().getId());
285 synchronized(statlock){
286 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
288 indexcachefirst=node.getNext();
289 if(indexcachefirst==null){
292 indexcachefirst.setPrevious(null);
295 TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
296 if(indexcachelast!=null){
297 indexcachelast.setNext(node);
299 if(indexcachefirst==null){
300 indexcachefirst=node;
303 indexcache.put(entry.getId(),node);
308 private void addRowA(RowTransactional row) throws IOException{
309 synchronized(filealock){
311 int pre=fileastream.size();
312 row.writeToStream(fileastream);
313 int post=fileastream.size();
316 synchronized(statlock){
317 atomicfields.setstat_add(atomicfields.getstat_add()+1);
318 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
321 index.addEntry(row.getId(),row.getSize(),FILEA,fileaposition);
322 if(INDEXCACHESIZE>0){
323 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,fileaposition);
324 addCacheEntry(entry);
326 fileaposition+=Row.calcSize(pre,post);
329 private void addRowB(RowTransactional row) throws IOException{
330 synchronized(fileblock){
332 int pre=filebstream.size();
333 row.writeToStream(filebstream);
334 int post=filebstream.size();
336 synchronized(statlock){
337 atomicfields.setstat_add(atomicfields.getstat_add()+1);
338 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
340 index.addEntry(row.getId(),row.getSize(),FILEB,filebposition);
341 if(INDEXCACHESIZE>0){
342 TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,filebposition);
343 addCacheEntry(entry);
345 filebposition+=RowTransactional.calcSize(pre,post);
350 private void updateCacheEntry(TableIndexEntryTransactional entry){
351 synchronized(indexcache){
352 if(indexcache.containsKey(entry.getId())){
353 TableIndexNodeTransactional node=indexcache.get(entry.getId());
355 if(node!=indexcachelast){
356 if(node==indexcachefirst){
357 indexcachefirst=node.getNext();
360 indexcachelast.setNext(node);
361 node.setPrevious(indexcachelast);
366 addCacheEntry(entry);
371 private void removeCacheEntry(String id){
372 synchronized(indexcache){
373 if(indexcache.containsKey(id)){
374 TableIndexNodeTransactional node=indexcache.get(id);
375 indexcache.remove(id);
376 if(indexcacheusage==1){
377 indexcachefirst=null;
382 if(node==indexcachefirst){
383 indexcachefirst=node.getNext();
384 indexcachefirst.setPrevious(null);
385 }else if(node==indexcachelast){
386 indexcachelast=node.getPrevious();
387 indexcachelast.setNext(null);
392 synchronized(statlock){
393 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
399 private TableIndexEntryTransactional getCacheEntry(String id){
400 synchronized(indexcache){
401 if(indexcache.containsKey(id)){
402 TableIndexNodeTransactional node=indexcache.get(id);
403 if(node!=indexcachelast){
404 if(node==indexcachefirst){
405 indexcachefirst=node.getNext();
408 indexcachelast.setNext(node);
409 node.setPrevious(indexcachelast);
413 synchronized(statlock){
414 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
416 return node.getData();
419 synchronized(statlock){
420 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
426 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
427 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
429 public void addOrUpdateRow(RowTransactional row) throws IOException{
430 RowTransactional tmprow=getRow(row.getId());
439 * Updates a row stored in this table.
441 public void updateRow(RowTransactional row) throws IOException{
442 TableIndexEntryTransactional entry=null;
443 // Handle index entry caching
444 if(INDEXCACHESIZE>0){
445 synchronized(indexcache){
446 entry=getCacheEntry(row.getId());
448 entry=index.scanIndex(row.getId());
449 addCacheEntry(entry);
453 entry=index.scanIndex(row.getId());
455 if(entry.getRowSize()>=row.getSize()){
456 // Add to the existing location
457 switch(entry.getLocation()){
458 case FILEA :synchronized(filealock){
459 if(filearandom==null){
460 filearandom=new RandomAccessFile(getFileName(FILEA),"rw");
461 fca=filearandom.getChannel();
463 filearandom.seek(entry.getPosition());
464 row.writeToFile(filearandom);
469 case FILEB :synchronized(fileblock){
470 if(filebrandom==null){
471 filebrandom=new RandomAccessFile(getFileName(FILEB),"rw");
472 fcb=filebrandom.getChannel();
474 filebrandom.seek(entry.getPosition());
475 row.writeToFile(filebrandom);
487 rowswitch=!rowswitch;
489 synchronized(statlock){
490 atomicfields.setstat_update(atomicfields.getstat_update()+1);
491 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
495 private void updateRowA(RowTransactional row) throws IOException{
496 synchronized(filealock){
498 int pre=fileastream.size();
499 row.writeToStream(fileastream);
500 int post=fileastream.size();
502 index.updateEntry(row.getId(),row.getSize(),FILEA,fileaposition);
504 // Handle index entry caching
505 if(INDEXCACHESIZE>0){
506 updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,fileaposition));
508 fileaposition+=Row.calcSize(pre,post);
512 private void updateRowB(RowTransactional row) throws IOException{
513 synchronized(fileblock){
515 int pre=filebstream.size();
516 row.writeToStream(filebstream);
517 int post=filebstream.size();
519 index.updateEntry(row.getId(),row.getSize(),FILEB,filebposition);
520 // Handle index entry caching
521 // Handle index entry caching
522 if(INDEXCACHESIZE>0){
523 updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,filebposition));
525 filebposition+=Row.calcSize(pre,post);
530 * Marks a row as deleted in the index.
531 * Be aware that the space consumed by the row is not actually reclaimed.
533 public void deleteRow(RowTransactional row) throws IOException{
534 // Handle index entry caching
535 if(INDEXCACHESIZE>0){
536 removeCacheEntry(row.getId());
538 index.updateEntry(row.getId(),row.getSize(),DELETE,0);
539 synchronized(statlock){
540 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
545 * Returns a tuplestream containing the given list of rows
547 public TupleStreamTransactional getRows(List<String> rows) throws IOException{
548 return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
552 * Returns a tuplestream containing the rows matching the given rowmatcher
554 public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
555 return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
559 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
561 public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
562 return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
566 * Returns a tuplestream of all rows in this table.
568 public TupleStreamTransactional getRows() throws IOException{
569 // return new TableReader(this);
570 return new IndexedTableReaderTransactional(this,index.scanIndex());
574 * Returns a single row stored in this table.
575 * If the row does not exist in the table, null will be returned.
577 public RowTransactional getRow(String id) throws IOException{
578 TableIndexEntryTransactional entry=null;
579 // Handle index entry caching
580 if(INDEXCACHESIZE>0){
581 synchronized(indexcache){
582 entry=getCacheEntry(id);
584 entry=index.scanIndex(id);
586 addCacheEntry(entry);
591 entry=index.scanIndex(id);
595 DataInputStream data=null;
596 if(entry.getLocation()==Table.FILEA){
597 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
598 }else if(entry.getLocation()==Table.FILEB){
599 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
602 while(dataoffset!=entry.getPosition()){
603 dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
605 RowTransactional row=RowTransactional.readFromStream(data);
607 synchronized(statlock){
608 atomicfields.setstat_read(atomicfields.getstat_read()+1);
609 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());