b55c46c6310495812e57310dd09d272d448007f0
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34
35 import TransactionalIO.core.TransactionalFile;
36 import java.io.*;
37 import java.util.*;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
40 import dstm2.atomic;
41 import dstm2.Thread;
42 import dstm2.util.StringKeyHashMap;
43 import java.util.concurrent.Callable;
44
45 /**
46  * The table stores a group of rows.
47  * Every row must have a unique id within a table.
48  */
49 public class DualFileTableTransactional implements TableTransactional{
50
51     
52     private int INDEXCACHESIZE=8192;
53      
54     private String filealock="filea-dummy";
55     private String fileblock="fileb-dummy";
56
57 //    private DataOutputStream fileastream=null;
58 //    private DataOutputStream filebstream=null;
59     private TransactionalFile fileastream=null;
60     private TransactionalFile filebstream=null;
61 //    private RandomAccessFile filearandom=null;
62     private TransactionalFile filearandom=null;
63     private TransactionalFile filebrandom=null;
64 //   FileChannel fca=null;
65 //   FileChannel fcb=null;
66     private TableIndexTransactional index=null;
67      
68 //    private long fileaposition=0;
69  //   private long filebposition=0;
70      
71     private boolean rowswitch=true;
72      
73     private String title;
74     private String location;
75      
76     private TableIndexNodeTransactional indexcachefirst;
77     private TableIndexNodeTransactional indexcachelast;
78     //private int indexcacheusage;
79     
80     private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
81     //private Hashtable<String,TableIndexNode> indexcache;
82     
83     
84     DualFileTableTSInf atomicfields;
85     // Statistic counters
86     public @atomic interface DualFileTableTSInf{
87         long getstat_add();
88         long getstat_update();
89         long getstat_delete();
90         long getstat_add_size();
91         long getstat_update_size();
92         long getstat_read_size();
93         long getstat_read();
94         long getstat_cache_hit();
95         long getstat_cache_miss();
96         long getstat_cache_drop();
97         long getFileaposition();
98         long getFilebposition();
99         int getIndexcacheusage();
100         
101         void setstat_add(long val);
102         void setstat_update(long val);
103         void setstat_delete(long val);
104         void setstat_add_size(long val);
105         void setstat_update_size(long val);
106         void setstat_read_size(long val);
107         void setstat_read(long val);
108         void setstat_cache_hit(long val);
109         void setstat_cache_miss(long val);
110         void setstat_cache_drop(long val);
111         void setIndexcacheusage(int val);
112         void setFileaposition(long val);
113         void setFilebposition(long val);
114     }
115   
116     /*long stat_add=0;
117     long stat_update=0;
118     long stat_delete=0;
119     long stat_add_size=0;
120     long stat_update_size=0;
121     long stat_read_size=0;
122     long stat_read=0;
123     long stat_cache_hit=0;
124     long stat_cache_miss=0;
125     long stat_cache_drop=0;*/
126     
127     protected String statlock="stat-dummy";
128     
129     /**
130      * Return the current values of the statistic counters and reset them.
131      * The current counters are:
132      * <ul>
133      *   <li>stat_table_add
134      *   <li>stat_table_update
135      *   <li>stat_table_delete
136      *   <li>stat_table_add_size
137      *   <li>stat_table_update_size
138      *   <li>stat_table_read_size
139      *   <li>stat_table_read
140      *   <li>stat_table_cache_hit
141      *   <li>stat_table_cache_miss
142      *   <li>stat_table_cache_drop
143      * </ul>
144      * Furthermore, the index will be asked to deliver separate index specific counters
145      */
146     public Hashtable<String,Long> readStatistics(){
147         
148     //    synchronized(statlock){
149         return Thread.doIt(new Callable<Hashtable<String,Long>>() {
150             public Hashtable<String,Long> call() throws Exception{
151                 Hashtable<String,Long> hash=new Hashtable<String,Long>();
152                 hash.put("stat_table_add",atomicfields.getstat_add());
153                 hash.put("stat_table_update",atomicfields.getstat_update());
154                 hash.put("stat_table_delete",atomicfields.getstat_delete());
155                 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
156                 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
157                 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
158                 hash.put("stat_table_read",atomicfields.getstat_read());
159                 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
160                 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
161                 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
162                 atomicfields.setstat_add(0);
163                 atomicfields.setstat_update(0);
164                 atomicfields.setstat_delete(0);
165                 atomicfields.setstat_add_size(0);
166                 atomicfields.setstat_update_size(0);
167                 atomicfields.setstat_read_size(0);
168                 atomicfields.setstat_read(0);
169                 atomicfields.setstat_cache_hit(0);
170                 atomicfields.setstat_cache_miss(0);
171                 atomicfields.setstat_cache_drop(0);
172                 Hashtable<String,Long> ihash=index.readStatistics();
173                 hash.putAll(ihash);
174                 return hash;
175             }
176         });
177         
178     }
179     
180     /**
181      * Create a new table object with the default flat index model
182      */
183
184     
185     /**
186      * Create a new table object with a specific index model
187      */
188     public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
189         this.title=title;
190         this.location=location;
191         if(!this.location.endsWith(File.separator))this.location+=File.separator;
192         switch(indextype){
193              case PAGED  : index=new PagedIndexTransactional(getFileName(INDEX));
194                 break;
195            
196         }
197         indexcachefirst=null;
198         indexcachelast=null;
199         atomicfields.setFileaposition(0);
200         atomicfields.setFilebposition(0);
201         atomicfields.setIndexcacheusage(0);
202         indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
203     }
204      
205     /**
206      * Set the maximal allowable size of the index cache.
207      */ 
208     public void setIndexCacheSize(int newsize){
209         INDEXCACHESIZE=newsize;
210     }
211     
212     /**
213      * Close all open file streams
214      */
215     public void close(){
216         try{
217             if(fileastream!=null)fileastream.close();
218             if(filebstream!=null)filebstream.close();
219             if(filearandom!=null)filearandom.close();
220             if(filebrandom!=null)filebrandom.close();
221             index.close();
222         }catch(Exception e){}
223     }
224     
225     /** 
226      * Returns the name of this table
227      */ 
228     public String getTitle(){
229          return title;
230     }
231     
232     /**
233      * Returns the location of this tables datafiles
234      */ 
235     public String getLocation(){
236          return location;
237     }
238      
239     protected String getFileName(int type){
240          switch(type){
241              case FILEB  :   return location+title+".a";
242              case FILEA  :   return location+title+".b";
243              case INDEX  :   return location+title+".index";
244          }
245          return null;
246     }
247     
248     /**
249      * Delete the files created by this table object.
250      * Be aware that this will delete any data stored in this table!
251      */ 
252     public void deleteFiles(){
253          try{
254              File ftest=new File(getFileName(FILEA));
255              ftest.delete();
256          }catch(Exception e){}
257          try{
258              File ftest=new File(getFileName(FILEB));
259              ftest.delete();
260          }catch(Exception e){}
261          try{
262              File ftest=new File(getFileName(INDEX));
263              ftest.delete();
264          }catch(Exception e){}
265     }
266      
267      private /*synchronized*/ void openFile(int type) throws IOException{
268          switch(type){
269              case FILEA  : if(fileastream==null){
270                                // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
271                                 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
272                                  
273                                 //File ftest=new File(getFileName(FILEA));
274                                 //atomicfields.setFileaposition(ftest.length());
275                                 atomicfields.setFileaposition(fileastream.length());
276                                 fileastream.seek(fileastream.length());
277                            }
278                           break;
279              case FILEB  : if(filebstream==null){
280                                 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
281                                 //File ftest=new File(getFileName(FILEB));
282                                 //atomicfields.setFilebposition(ftest.length());
283                                 atomicfields.setFilebposition(filebstream.length());
284                                 filebstream.seek(filebstream.length());
285                            }
286                           break;
287          }
288      }
289      
290     /**
291      * Adds a row of data to this table.
292      */
293     public void addRow(RowTransactional row) throws IOException{
294         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
295         if(rowswitch){
296             addRowA(row);
297         }else{
298             addRowB(row);
299         }
300         rowswitch=!rowswitch;
301     }
302      
303      private void addCacheEntry(TableIndexEntryTransactional entry){
304       //   synchronized(indexcache){
305              if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
306                  // remove first entry
307                  TableIndexNodeTransactional node=indexcachefirst;
308                  indexcache.remove(node.getData().getId());
309                  atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
310               //   synchronized(statlock){
311                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
312              //    }
313                  indexcachefirst=node.getNext();
314                  if(indexcachefirst==null){
315                     indexcachelast=null;
316                  }else{
317                     indexcachefirst.setPrevious(null);
318                  }
319              }
320              TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
321              if(indexcachelast!=null){
322                  indexcachelast.setNext(node);
323              }
324              if(indexcachefirst==null){
325                  indexcachefirst=node;
326              }
327              indexcachelast=node;
328              indexcache.put(entry.getId(),node);
329              atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
330      //   }
331      }
332      
333      private void addRowA(RowTransactional row) throws IOException{
334          //synchronized(filealock){
335          final Vector args = new Vector();
336          args.add(row);
337          Thread.doIt(new Callable<Boolean>() {
338              public Boolean call() throws Exception{
339                  openFile(FILEA);             
340              //int pre=fileastream.size();
341                  int pre= (int)fileastream.getFilePointer();
342              //row.writeToStream(fileastream);
343                  ((RowTransactional)args.get(0)).writeToFile(fileastream);
344              //int post= fileastream.size();
345                  int post= (int)fileastream.getFilePointer();
346              //fileastream.flush();
347              
348              //synchronized(statlock){
349                  atomicfields.setstat_add(atomicfields.getstat_add()+1);
350                  atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
351              //}
352              
353                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
354                 if(INDEXCACHESIZE>0){
355                     TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
356                     addCacheEntry(entry);
357                 }
358                 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
359                 return true;
360             }
361          });
362      }
363      private void addRowB(RowTransactional row) throws IOException{
364         // synchronized(fileblock){
365          final Vector args = new Vector();
366          args.add(row);
367          Thread.doIt(new Callable<Boolean>() {
368              public Boolean call() throws Exception{
369                 openFile(FILEB);
370              //int pre=filebstream.size();
371                 int pre= (int)filebstream.getFilePointer();
372              //row.writeToStream(filebstream);
373                 ((RowTransactional)args.get(0)).writeToFile(filebstream);
374                 int post=(int)filebstream.getFilePointer();
375              //int post=filebstream.size();
376              //filebstream.flush();
377              //synchronized(statlock){
378                 atomicfields.setstat_add(atomicfields.getstat_add()+1);
379                 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
380              // }
381                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
382                 if(INDEXCACHESIZE>0){
383                   TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
384                   addCacheEntry(entry);
385                 }
386                 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
387                 return true;
388             }
389          });
390      }
391      
392
393      private void updateCacheEntry(TableIndexEntryTransactional entry){
394           final Vector args = new Vector();
395           args.add(entry);
396           Thread.doIt(new Callable<Boolean>() {
397              public Boolean call() throws Exception{
398           //synchronized(indexcache){
399               if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
400                   TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
401                   node.setData(((TableIndexEntryTransactional)(args.get(0))));
402                   if(node!=indexcachelast){
403                       if(node==indexcachefirst){
404                           indexcachefirst=node.getNext();
405                       }
406                       node.remove();
407                       indexcachelast.setNext(node);
408                       node.setPrevious(indexcachelast);
409                       node.setNext(null);
410                       indexcachelast=node;
411                   }
412               }else{
413                   addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
414               }
415               return true;
416             }
417         });
418       }
419
420       private void removeCacheEntry(String id){
421           //synchronized(indexcache){
422           final Vector args = new Vector();
423           args.add(id);
424           Thread.doIt(new Callable<Boolean>() {
425              public Boolean call() throws Exception{
426           
427                 if(indexcache.containsKey((String)(args.get(0)))){
428                     TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
429                     indexcache.remove((String)(args.get(0)));
430                     if(atomicfields.getIndexcacheusage()==1){
431                         indexcachefirst=null;
432                         indexcachelast=null;
433                         atomicfields.setIndexcacheusage(0);
434                         return true;
435                     }
436                     if(node==indexcachefirst){
437                         indexcachefirst=node.getNext();
438                         indexcachefirst.setPrevious(null);
439                     }else if(node==indexcachelast){
440                         indexcachelast=node.getPrevious();
441                         indexcachelast.setNext(null);
442                     }else{
443                         node.remove();
444                     }
445                     atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
446               //      synchronized(statlock){
447                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
448                      
449               //      }
450                 }
451                 return true;
452             }
453           });
454       }
455
456       private TableIndexEntryTransactional getCacheEntry(String id){
457           final Vector args = new Vector();
458           args.add(id);
459           
460           TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
461              public TableIndexEntryTransactional call() throws Exception{
462           //synchronized(indexcache){
463               if(indexcache.containsKey((String)(args.get(0)))){
464                   TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
465                   if(node!=indexcachelast){
466                       if(node==indexcachefirst){
467                             indexcachefirst=node.getNext();
468                         }
469                         node.remove();
470                         indexcachelast.setNext(node);
471                         node.setPrevious(indexcachelast);
472                         node.setNext(null);
473                         indexcachelast=node;
474                   }
475                 //  synchronized(statlock){
476                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
477                 //   }
478                   return node.getData();
479               }
480               return null;
481              }
482           });
483           if (res != null)
484               return res;
485           
486           Thread.doIt(new Callable<Boolean>() {
487              public Boolean call() throws Exception{
488           //synchronized(statlock){
489                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
490                return true;
491              }
492           });
493            //}
494           return null;
495       }
496
497      /**
498       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
499       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
500       */
501      public void addOrUpdateRow(RowTransactional row) throws IOException{
502          RowTransactional tmprow=getRow(row.getId());
503          if(tmprow==null){
504              addRow(row);
505          }else{
506              updateRow(row);
507          }
508      }
509
510      /**
511       * Updates a row stored in this table.
512       */
513      public void updateRow(RowTransactional row) throws IOException{
514          TableIndexEntryTransactional entry=null;
515          final Vector args = new Vector();
516          args.add(row);
517          args.add(entry);
518          // Handle index entry caching
519          if(INDEXCACHESIZE>0){
520              Thread.doIt(new Callable<Boolean>() {
521                 public Boolean call() throws Exception {
522                     TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
523                     RowTransactional row = (RowTransactional) (args.get(0));
524                     entry = getCacheEntry(row.getId());
525                     if(entry==null){
526                        entry=index.scanIndex(row.getId());
527                        addCacheEntry(entry);
528                     }
529                     return true;
530                 }
531              });
532           /*   synchronized(indexcache){
533                  entry=getCacheEntry(row.getId());
534                  if(entry==null){
535                      entry=index.scanIndex(row.getId());
536                      addCacheEntry(entry);
537                  }
538              }*/
539          }else{
540              entry=index.scanIndexTransactional(row.getId());
541          }
542          if(entry.getRowSize()>=row.getSize()){
543             // Add to the existing location
544             switch(entry.getLocation()){
545                 case FILEA  :
546                //     synchronized(filealock){
547                      Thread.doIt(new Callable<Boolean>() {
548                         public Boolean call() throws Exception {
549                             if(filearandom==null){
550                                 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
551                                // fca=filearandom.getChannel();
552                             }
553                             filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
554                             ((RowTransactional) (args.get(0))).writeToFile(filearandom);
555                             return true;
556                             //fca.force(false);
557                         }
558                      });    
559                //   }
560                     break;
561                 case FILEB :
562               //      synchronized(fileblock){
563                     Thread.doIt(new Callable<Boolean>() {
564                         public Boolean call() throws Exception {
565                                 if(filebrandom==null){
566                                     filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
567                                   //  fcb=filebrandom.getChannel();
568                                 }
569                                 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
570                                 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
571                                 return true;
572                                 //fcb.force(false);
573                         }
574                     });
575                     break;
576             }
577          }else{
578              if(rowswitch){
579                   updateRowA(row);
580               }else{
581                   updateRowB(row);
582               }
583               rowswitch=!rowswitch;
584          }
585          //synchronized(statlock){
586          Thread.doIt(new Callable<Boolean>() {
587               public Boolean call() throws Exception{ 
588                 atomicfields.setstat_update(atomicfields.getstat_update()+1);
589                 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
590                 return true;
591               }
592          });
593      }
594      
595      private void updateRowA(RowTransactional row) throws IOException{
596          final Vector args = new Vector();
597          args.add(row);
598          Thread.doIt(new Callable<Boolean>() {
599               public Boolean call() throws Exception{ 
600          //synchronized(filealock){
601                   openFile(FILEA);
602                   //int pre=filebstream.size();
603                   int pre=(int)fileastream.getFilePointer();
604                   //row.writeToStream(filebstream);
605                   ((RowTransactional)(args.get(0))).writeToFile(fileastream);
606                   //int post=filebstream.size();
607                   int post=(int)fileastream.getFilePointer();
608                   //fileastream.flush();
609                   index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
610
611                   // Handle index entry caching
612                   if(INDEXCACHESIZE>0){
613                       updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
614                   }
615                   atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
616                   return true;
617               }
618           });
619      }
620
621      private void updateRowB(RowTransactional row) throws IOException{
622          final Vector args = new Vector();
623          args.add(row);
624          Thread.doIt(new Callable<Boolean>() {
625              public Boolean call() throws Exception{ 
626          
627          //synchronized(fileblock){
628                 openFile(FILEB);
629               //int pre=filebstream.size();
630                 int pre=(int)filebstream.getFilePointer();
631               //row.writeToStream(filebstream);
632                 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
633               //int post=filebstream.size();
634                 int post=(int)filebstream.getFilePointer();
635               //filebstream.flush();
636                 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
637               // Handle index entry caching
638               // Handle index entry caching
639                 if(INDEXCACHESIZE>0){
640                     updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
641                 }
642                 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
643                 return true;
644             }
645         });
646      }
647
648      /**
649       * Marks a row as deleted in the index.
650       * Be aware that the space consumed by the row is not actually reclaimed.
651       */
652      public void deleteRow(RowTransactional row) throws IOException{
653           // Handle index entry caching
654           if(INDEXCACHESIZE>0){
655               removeCacheEntry(row.getId());
656           }
657           index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
658           Thread.doIt(new Callable<Boolean>() {
659              public Boolean call() throws Exception{
660           //synchronized(statlock){
661                 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
662                 return true;
663              }
664           });
665          // }
666      }
667      
668      /**
669       * Returns a tuplestream containing the given list of rows
670       */
671      public TupleStreamTransactional getRows(List<String> rows) throws IOException{
672          return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
673      }
674
675      /**
676       * Returns a tuplestream containing the rows matching the given rowmatcher
677       */
678      public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
679          return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
680      }
681      
682      /**
683       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
684       */
685      public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
686           return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
687       }
688
689      /**
690       * Returns a tuplestream of all rows in this table.
691       */
692      public TupleStreamTransactional getRows() throws IOException{
693          // return new TableReader(this);
694          return new IndexedTableReaderTransactional(this,index.scanIndex());
695      }
696      
697      /**
698       * Returns a single row stored in this table.
699       * If the row does not exist in the table, null will be returned.
700       */
701      public RowTransactional getRow(String id) throws IOException{
702          TableIndexEntryTransactional entry=null;
703           // Handle index entry caching
704           if(INDEXCACHESIZE>0){
705               final Vector args = new Vector();
706               args.add(id);
707               args.add(entry);
708               //synchronized(indexcache){
709               Thread.doIt(new Callable<Boolean>() {
710                 public Boolean call() throws Exception{
711                   TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
712                   String id = (String) (args.get(0));  
713                   entry=getCacheEntry(id);
714                    if(entry==null){
715                        entry=index.scanIndex(id);
716                        if(entry!=null){
717                            addCacheEntry(entry);
718                        }
719                    }
720                   return true;
721                 }
722               });
723           }else{
724               entry=index.scanIndexTransactional(id);
725           }
726           if(entry!=null){
727               long dataoffset=0;
728               DataInputStream data=null;
729               if(entry.getLocation()==Table.FILEA){
730                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
731               }else if(entry.getLocation()==Table.FILEB){
732                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
733               }
734               if(data!=null){
735                   while(dataoffset!=entry.getPosition()){
736                       dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
737                   }
738                   RowTransactional row=RowTransactional.readFromStream(data);
739                   data.close();
740                   final Vector args = new Vector();
741                   args.add(row);
742                   Thread.doIt(new Callable<Boolean>() {
743                       public Boolean call() throws Exception{
744                   //synchronized(statlock){
745                           atomicfields.setstat_read(atomicfields.getstat_read()+1);
746                           atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
747                           return true;
748                       }
749                   });
750                   return row;
751               }
752               
753           }
754           return null;
755      }
756  }