2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
36 import java.nio.channels.*;
37 import com.solidosystems.tuplesoup.filter.*;
41 * The table stores a group of rows.
42 * Every row must have a unique id within a table.
44 public class DualFileTable implements Table{
47 private int INDEXCACHESIZE=8192;
49 private String filealock="filea-dummy";
50 private String fileblock="fileb-dummy";
52 private DataOutputStream fileastream=null;
53 private DataOutputStream filebstream=null;
54 private RandomAccessFile filearandom=null;
55 private RandomAccessFile filebrandom=null;
58 private TableIndex index=null;
60 private long fileaposition=0;
61 private long filebposition=0;
63 private boolean rowswitch=true;
66 private String location;
68 private TableIndexNode indexcachefirst;
69 private TableIndexNode indexcachelast;
70 private int indexcacheusage;
71 private Hashtable<String,TableIndexNode> indexcache;
74 DualFileTableTSInf atomicfields;
76 public @atomic interface DualFileTableTSInf{
78 long getstat_update();
79 long getstat_delete();
80 long getstat_add_size();
81 long getstat_update_size();
82 long getstat_read_size();
84 long getstat_cache_hit();
85 long getstat_cache_miss();
86 long getstat_cache_drop();
88 void setstat_add(long val);
89 void setstat_update(long val);
90 void setstat_delete(long val);
91 void setstat_add_size(long val);
92 void setstat_update_size(long val);
93 void setstat_read_size(long val);
94 void setstat_read(long val);
95 void setstat_cache_hit(long val);
96 void setstat_cache_miss(long val);
97 void setstat_cache_drop(long val);
103 long stat_add_size=0;
104 long stat_update_size=0;
105 long stat_read_size=0;
107 long stat_cache_hit=0;
108 long stat_cache_miss=0;
109 long stat_cache_drop=0;*/
111 protected String statlock="stat-dummy";
114 * Return the current values of the statistic counters and reset them.
115 * The current counters are:
118 * <li>stat_table_update
119 * <li>stat_table_delete
120 * <li>stat_table_add_size
121 * <li>stat_table_update_size
122 * <li>stat_table_read_size
123 * <li>stat_table_read
124 * <li>stat_table_cache_hit
125 * <li>stat_table_cache_miss
126 * <li>stat_table_cache_drop
128 * Furthermore, the index will be asked to deliver separate index specific counters
130 public Hashtable<String,Long> readStatistics(){
131 Hashtable<String,Long> hash=new Hashtable<String,Long>();
132 synchronized(statlock){
133 hash.put("stat_table_add",atomicfields.getstat_add());
134 hash.put("stat_table_update",atomicfields.getstat_update());
135 hash.put("stat_table_delete",atomicfields.getstat_delete());
136 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
137 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
138 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
139 hash.put("stat_table_read",atomicfields.getstat_read());
140 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
141 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
142 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
143 atomicfields.setstat_add(0);
144 atomicfields.setstat_update(0);
145 atomicfields.setstat_delete(0);
146 atomicfields.setstat_add_size(0);
147 atomicfields.setstat_update_size(0);
148 atomicfields.setstat_read_size(0);
149 atomicfields.setstat_read(0);
150 atomicfields.setstat_cache_hit(0);
151 atomicfields.setstat_cache_miss(0);
152 atomicfields.setstat_cache_drop(0);
153 Hashtable<String,Long> ihash=index.readStatistics();
160 * Create a new table object with the default flat index model
165 * Create a new table object with a specific index model
167 public DualFileTable(String title,String location, int indextype) throws IOException{
169 this.location=location;
170 if(!this.location.endsWith(File.separator))this.location+=File.separator;
172 case PAGED : index=new PagedIndex(getFileName(INDEX));
176 indexcachefirst=null;
179 indexcache=new Hashtable<String,TableIndexNode>();
183 * Set the maximal allowable size of the index cache.
185 public void setIndexCacheSize(int newsize){
186 INDEXCACHESIZE=newsize;
190 * Close all open file streams
194 if(fileastream!=null)fileastream.close();
195 if(filebstream!=null)filebstream.close();
196 if(filearandom!=null)filearandom.close();
197 if(filebrandom!=null)filebrandom.close();
199 }catch(Exception e){}
203 * Returns the name of this table
205 public String getTitle(){
210 * Returns the location of this tables datafiles
212 public String getLocation(){
216 protected String getFileName(int type){
218 case FILEB : return location+title+".a";
219 case FILEA : return location+title+".b";
220 case INDEX : return location+title+".index";
226 * Delete the files created by this table object.
227 * Be aware that this will delete any data stored in this table!
229 public void deleteFiles(){
231 File ftest=new File(getFileName(FILEA));
233 }catch(Exception e){}
235 File ftest=new File(getFileName(FILEB));
237 }catch(Exception e){}
239 File ftest=new File(getFileName(INDEX));
241 }catch(Exception e){}
244 private synchronized void openFile(int type) throws IOException{
246 case FILEA : if(fileastream==null){
247 fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
248 File ftest=new File(getFileName(FILEA));
249 fileaposition=ftest.length();
252 case FILEB : if(filebstream==null){
253 filebstream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEB),true)));
254 File ftest=new File(getFileName(FILEB));
255 filebposition=ftest.length();
262 * Adds a row of data to this table.
264 public void addRow(Row row) throws IOException{
265 // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
271 rowswitch=!rowswitch;
274 private void addCacheEntry(TableIndexEntry entry){
275 synchronized(indexcache){
276 if(indexcacheusage>INDEXCACHESIZE){
277 // remove first entry
278 TableIndexNode node=indexcachefirst;
279 indexcache.remove(node.getData().getId());
281 synchronized(statlock){
282 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
284 indexcachefirst=node.getNext();
285 if(indexcachefirst==null){
288 indexcachefirst.setPrevious(null);
291 TableIndexNode node=new TableIndexNode(indexcachelast,entry);
292 if(indexcachelast!=null){
293 indexcachelast.setNext(node);
295 if(indexcachefirst==null){
296 indexcachefirst=node;
299 indexcache.put(entry.getId(),node);
304 private void addRowA(Row row) throws IOException{
305 synchronized(filealock){
307 int pre=fileastream.size();
308 row.writeToStream(fileastream);
309 int post=fileastream.size();
312 synchronized(statlock){
313 atomicfields.setstat_add(atomicfields.getstat_add()+1);
314 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
317 index.addEntry(row.getId(),row.getSize(),FILEA,fileaposition);
318 if(INDEXCACHESIZE>0){
319 TableIndexEntry entry=new TableIndexEntry(row.getId(),row.getSize(),FILEA,fileaposition);
320 addCacheEntry(entry);
322 fileaposition+=Row.calcSize(pre,post);
325 private void addRowB(Row row) throws IOException{
326 synchronized(fileblock){
328 int pre=filebstream.size();
329 row.writeToStream(filebstream);
330 int post=filebstream.size();
332 synchronized(statlock){
333 atomicfields.setstat_add(atomicfields.getstat_add()+1);
334 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
336 index.addEntry(row.getId(),row.getSize(),FILEB,filebposition);
337 if(INDEXCACHESIZE>0){
338 TableIndexEntry entry=new TableIndexEntry(row.getId(),row.getSize(),FILEB,filebposition);
339 addCacheEntry(entry);
341 filebposition+=Row.calcSize(pre,post);
346 private void updateCacheEntry(TableIndexEntry entry){
347 synchronized(indexcache){
348 if(indexcache.containsKey(entry.getId())){
349 TableIndexNode node=indexcache.get(entry.getId());
351 if(node!=indexcachelast){
352 if(node==indexcachefirst){
353 indexcachefirst=node.getNext();
356 indexcachelast.setNext(node);
357 node.setPrevious(indexcachelast);
362 addCacheEntry(entry);
367 private void removeCacheEntry(String id){
368 synchronized(indexcache){
369 if(indexcache.containsKey(id)){
370 TableIndexNode node=indexcache.get(id);
371 indexcache.remove(id);
372 if(indexcacheusage==1){
373 indexcachefirst=null;
378 if(node==indexcachefirst){
379 indexcachefirst=node.getNext();
380 indexcachefirst.setPrevious(null);
381 }else if(node==indexcachelast){
382 indexcachelast=node.getPrevious();
383 indexcachelast.setNext(null);
388 synchronized(statlock){
389 atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
395 private TableIndexEntry getCacheEntry(String id){
396 synchronized(indexcache){
397 if(indexcache.containsKey(id)){
398 TableIndexNode node=indexcache.get(id);
399 if(node!=indexcachelast){
400 if(node==indexcachefirst){
401 indexcachefirst=node.getNext();
404 indexcachelast.setNext(node);
405 node.setPrevious(indexcachelast);
409 synchronized(statlock){
410 atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
412 return node.getData();
415 synchronized(statlock){
416 atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
422 * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
423 * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
425 public void addOrUpdateRow(Row row) throws IOException{
426 Row tmprow=getRow(row.getId());
435 * Updates a row stored in this table.
437 public void updateRow(Row row) throws IOException{
438 TableIndexEntry entry=null;
439 // Handle index entry caching
440 if(INDEXCACHESIZE>0){
441 synchronized(indexcache){
442 entry=getCacheEntry(row.getId());
444 entry=index.scanIndex(row.getId());
445 addCacheEntry(entry);
449 entry=index.scanIndex(row.getId());
451 if(entry.getRowSize()>=row.getSize()){
452 // Add to the existing location
453 switch(entry.getLocation()){
454 case FILEA :synchronized(filealock){
455 if(filearandom==null){
456 filearandom=new RandomAccessFile(getFileName(FILEA),"rw");
457 fca=filearandom.getChannel();
459 filearandom.seek(entry.getPosition());
460 row.writeToFile(filearandom);
465 case FILEB :synchronized(fileblock){
466 if(filebrandom==null){
467 filebrandom=new RandomAccessFile(getFileName(FILEB),"rw");
468 fcb=filebrandom.getChannel();
470 filebrandom.seek(entry.getPosition());
471 row.writeToFile(filebrandom);
483 rowswitch=!rowswitch;
485 synchronized(statlock){
486 atomicfields.setstat_update(atomicfields.getstat_update()+1);
487 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
491 private void updateRowA(Row row) throws IOException{
492 synchronized(filealock){
494 int pre=fileastream.size();
495 row.writeToStream(fileastream);
496 int post=fileastream.size();
498 index.updateEntry(row.getId(),row.getSize(),FILEA,fileaposition);
500 // Handle index entry caching
501 if(INDEXCACHESIZE>0){
502 updateCacheEntry(new TableIndexEntry(row.getId(),row.getSize(),FILEA,fileaposition));
504 fileaposition+=Row.calcSize(pre,post);
508 private void updateRowB(Row row) throws IOException{
509 synchronized(fileblock){
511 int pre=filebstream.size();
512 row.writeToStream(filebstream);
513 int post=filebstream.size();
515 index.updateEntry(row.getId(),row.getSize(),FILEB,filebposition);
516 // Handle index entry caching
517 // Handle index entry caching
518 if(INDEXCACHESIZE>0){
519 updateCacheEntry(new TableIndexEntry(row.getId(),row.getSize(),FILEB,filebposition));
521 filebposition+=Row.calcSize(pre,post);
526 * Marks a row as deleted in the index.
527 * Be aware that the space consumed by the row is not actually reclaimed.
529 public void deleteRow(Row row) throws IOException{
530 // Handle index entry caching
531 if(INDEXCACHESIZE>0){
532 removeCacheEntry(row.getId());
534 index.updateEntry(row.getId(),row.getSize(),DELETE,0);
535 synchronized(statlock){
536 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
541 * Returns a tuplestream containing the given list of rows
543 public TupleStream getRows(List<String> rows) throws IOException{
544 return new IndexedTableReader(this,index.scanIndex(rows));
548 * Returns a tuplestream containing the rows matching the given rowmatcher
550 public TupleStream getRows(RowMatcher matcher) throws IOException{
551 return new IndexedTableReader(this,index.scanIndex(),matcher);
555 * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
557 public TupleStream getRows(List<String> rows,RowMatcher matcher) throws IOException{
558 return new IndexedTableReader(this,index.scanIndex(rows),matcher);
562 * Returns a tuplestream of all rows in this table.
564 public TupleStream getRows() throws IOException{
565 // return new TableReader(this);
566 return new IndexedTableReader(this,index.scanIndex());
570 * Returns a single row stored in this table.
571 * If the row does not exist in the table, null will be returned.
573 public Row getRow(String id) throws IOException{
574 TableIndexEntry entry=null;
575 // Handle index entry caching
576 if(INDEXCACHESIZE>0){
577 synchronized(indexcache){
578 entry=getCacheEntry(id);
580 entry=index.scanIndex(id);
582 addCacheEntry(entry);
587 entry=index.scanIndex(id);
591 DataInputStream data=null;
592 if(entry.location==Table.FILEA){
593 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
594 }else if(entry.location==Table.FILEB){
595 data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
598 while(dataoffset!=entry.position){
599 dataoffset+=data.skipBytes((int)(entry.position-dataoffset));
601 Row row=Row.readFromStream(data);
603 synchronized(statlock){
604 atomicfields.setstat_read(atomicfields.getstat_read()+1);
605 atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());