*** empty log message ***
[IRC.git] / Robust / Transactions / mytuplesoup / src / com / solidosystems / tuplesoup / core / DualFileTableTransactional.java
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34
35 import TransactionalIO.core.TransactionalFile;
36 import java.io.*;
37 import java.util.*;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
40 import dstm2.atomic;
41 import dstm2.Thread;
42 import dstm2.util.StringKeyHashMap;
43 import java.util.concurrent.Callable;
44
45 /**
46  * The table stores a group of rows.
47  * Every row must have a unique id within a table.
48  */
49 public class DualFileTableTransactional implements TableTransactional{
50
51     
52     private int INDEXCACHESIZE=8192;
53      
54     private String filealock="filea-dummy";
55     private String fileblock="fileb-dummy";
56
57 //    private DataOutputStream fileastream=null;
58 //    private DataOutputStream filebstream=null;
59     private TransactionalFile fileastream=null;
60     private TransactionalFile filebstream=null;
61 //    private RandomAccessFile filearandom=null;
62     private TransactionalFile filearandom=null;
63     private TransactionalFile filebrandom=null;
64 //   FileChannel fca=null;
65 //   FileChannel fcb=null;
66     private TableIndexTransactional index=null;
67      
68 //    private long fileaposition=0;
69  //   private long filebposition=0;
70      
71     private boolean rowswitch=true;
72      
73     private String title;
74     private String location;
75      
76     private TableIndexNodeTransactional indexcachefirst;
77     private TableIndexNodeTransactional indexcachelast;
78     //private int indexcacheusage;
79     
80     private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
81     //private Hashtable<String,TableIndexNode> indexcache;
82     
83     
84     DualFileTableTSInf atomicfields;
85     // Statistic counters
86     public @atomic interface DualFileTableTSInf{
87         long getstat_add();
88         long getstat_update();
89         long getstat_delete();
90         long getstat_add_size();
91         long getstat_update_size();
92         long getstat_read_size();
93         long getstat_read();
94         long getstat_cache_hit();
95         long getstat_cache_miss();
96         long getstat_cache_drop();
97         long getFileaposition();
98         long getFilebposition();
99         int getIndexcacheusage();
100         
101         void setstat_add(long val);
102         void setstat_update(long val);
103         void setstat_delete(long val);
104         void setstat_add_size(long val);
105         void setstat_update_size(long val);
106         void setstat_read_size(long val);
107         void setstat_read(long val);
108         void setstat_cache_hit(long val);
109         void setstat_cache_miss(long val);
110         void setstat_cache_drop(long val);
111         void setIndexcacheusage(int val);
112         void setFileaposition(long val);
113         void setFilebposition(long val);
114     }
115   
116     /*long stat_add=0;
117     long stat_update=0;
118     long stat_delete=0;
119     long stat_add_size=0;
120     long stat_update_size=0;
121     long stat_read_size=0;
122     long stat_read=0;
123     long stat_cache_hit=0;
124     long stat_cache_miss=0;
125     long stat_cache_drop=0;*/
126     
127     protected String statlock="stat-dummy";
128     
129     /**
130      * Return the current values of the statistic counters and reset them.
131      * The current counters are:
132      * <ul>
133      *   <li>stat_table_add
134      *   <li>stat_table_update
135      *   <li>stat_table_delete
136      *   <li>stat_table_add_size
137      *   <li>stat_table_update_size
138      *   <li>stat_table_read_size
139      *   <li>stat_table_read
140      *   <li>stat_table_cache_hit
141      *   <li>stat_table_cache_miss
142      *   <li>stat_table_cache_drop
143      * </ul>
144      * Furthermore, the index will be asked to deliver separate index specific counters
145      */
146     public Hashtable<String,Long> readStatistics(){
147         Hashtable<String,Long> hash=new Hashtable<String,Long>();
148         synchronized(statlock){
149             hash.put("stat_table_add",atomicfields.getstat_add());
150             hash.put("stat_table_update",atomicfields.getstat_update());
151             hash.put("stat_table_delete",atomicfields.getstat_delete());
152             hash.put("stat_table_add_size",atomicfields.getstat_add_size());
153             hash.put("stat_table_update_size",atomicfields.getstat_update_size());
154             hash.put("stat_table_read_size",atomicfields.getstat_read_size());
155             hash.put("stat_table_read",atomicfields.getstat_read());
156             hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
157             hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
158             hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
159             atomicfields.setstat_add(0);
160             atomicfields.setstat_update(0);
161             atomicfields.setstat_delete(0);
162             atomicfields.setstat_add_size(0);
163             atomicfields.setstat_update_size(0);
164             atomicfields.setstat_read_size(0);
165             atomicfields.setstat_read(0);
166             atomicfields.setstat_cache_hit(0);
167             atomicfields.setstat_cache_miss(0);
168             atomicfields.setstat_cache_drop(0);
169             Hashtable<String,Long> ihash=index.readStatistics();
170             hash.putAll(ihash);
171         }
172         return hash;
173     }
174     
175     /**
176      * Create a new table object with the default flat index model
177      */
178
179     
180     /**
181      * Create a new table object with a specific index model
182      */
183     public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
184         this.title=title;
185         this.location=location;
186         if(!this.location.endsWith(File.separator))this.location+=File.separator;
187         switch(indextype){
188              case PAGED  : index=new PagedIndexTransactional(getFileName(INDEX));
189                 break;
190            
191         }
192         indexcachefirst=null;
193         indexcachelast=null;
194         atomicfields.setFileaposition(0);
195         atomicfields.setFilebposition(0);
196         atomicfields.setIndexcacheusage(0);
197         indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
198     }
199      
200     /**
201      * Set the maximal allowable size of the index cache.
202      */ 
203     public void setIndexCacheSize(int newsize){
204         INDEXCACHESIZE=newsize;
205     }
206     
207     /**
208      * Close all open file streams
209      */
210     public void close(){
211         try{
212             if(fileastream!=null)fileastream.close();
213             if(filebstream!=null)filebstream.close();
214             if(filearandom!=null)filearandom.close();
215             if(filebrandom!=null)filebrandom.close();
216             index.close();
217         }catch(Exception e){}
218     }
219     
220     /** 
221      * Returns the name of this table
222      */ 
223     public String getTitle(){
224          return title;
225     }
226     
227     /**
228      * Returns the location of this tables datafiles
229      */ 
230     public String getLocation(){
231          return location;
232     }
233      
234     protected String getFileName(int type){
235          switch(type){
236              case FILEB  :   return location+title+".a";
237              case FILEA  :   return location+title+".b";
238              case INDEX  :   return location+title+".index";
239          }
240          return null;
241     }
242     
243     /**
244      * Delete the files created by this table object.
245      * Be aware that this will delete any data stored in this table!
246      */ 
247     public void deleteFiles(){
248          try{
249              File ftest=new File(getFileName(FILEA));
250              ftest.delete();
251          }catch(Exception e){}
252          try{
253              File ftest=new File(getFileName(FILEB));
254              ftest.delete();
255          }catch(Exception e){}
256          try{
257              File ftest=new File(getFileName(INDEX));
258              ftest.delete();
259          }catch(Exception e){}
260     }
261      
262      private synchronized void openFile(int type) throws IOException{
263          switch(type){
264              case FILEA  : if(fileastream==null){
265                                // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
266                                 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
267                                  
268                                 //File ftest=new File(getFileName(FILEA));
269                                 //atomicfields.setFileaposition(ftest.length());
270                                 atomicfields.setFileaposition(fileastream.length());
271                                 fileastream.seek(fileastream.length());
272                            }
273                           break;
274              case FILEB  : if(filebstream==null){
275                                 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
276                                 //File ftest=new File(getFileName(FILEB));
277                                 //atomicfields.setFilebposition(ftest.length());
278                                 atomicfields.setFilebposition(filebstream.length());
279                                 filebstream.seek(filebstream.length());
280                            }
281                           break;
282          }
283      }
284      
285     /**
286      * Adds a row of data to this table.
287      */
288     public void addRow(RowTransactional row) throws IOException{
289         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
290         if(rowswitch){
291             addRowA(row);
292         }else{
293             addRowB(row);
294         }
295         rowswitch=!rowswitch;
296     }
297      
298      private void addCacheEntry(TableIndexEntryTransactional entry){
299          synchronized(indexcache){
300              if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
301                  // remove first entry
302                  TableIndexNodeTransactional node=indexcachefirst;
303                  indexcache.remove(node.getData().getId());
304                  atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
305                  synchronized(statlock){
306                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
307                  }
308                  indexcachefirst=node.getNext();
309                  if(indexcachefirst==null){
310                     indexcachelast=null;
311                  }else{
312                     indexcachefirst.setPrevious(null);
313                  }
314              }
315              TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
316              if(indexcachelast!=null){
317                  indexcachelast.setNext(node);
318              }
319              if(indexcachefirst==null){
320                  indexcachefirst=node;
321              }
322              indexcachelast=node;
323              indexcache.put(entry.getId(),node);
324              atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
325         }
326      }
327      
328      private void addRowA(RowTransactional row) throws IOException{
329          //synchronized(filealock){
330          final Vector args = new Vector();
331          args.add(row);
332          Thread.doIt(new Callable<Boolean>() {
333              public Boolean call() throws Exception{
334                  openFile(FILEA);             
335              //int pre=fileastream.size();
336                  int pre= (int)fileastream.getFilePointer();
337              //row.writeToStream(fileastream);
338                  ((RowTransactional)args.get(0)).writeToFile(fileastream);
339              //int post= fileastream.size();
340                  int post= (int)fileastream.getFilePointer();
341              //fileastream.flush();
342              
343              //synchronized(statlock){
344                  atomicfields.setstat_add(atomicfields.getstat_add()+1);
345                  atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
346              //}
347              
348                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
349                 if(INDEXCACHESIZE>0){
350                     TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
351                     addCacheEntry(entry);
352                 }
353                 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
354             }
355          });
356      }
357      private void addRowB(RowTransactional row) throws IOException{
358         // synchronized(fileblock){
359          final Vector args = new Vector();
360          args.add(row);
361          Thread.doIt(new Callable<Boolean>() {
362              public Boolean call() throws Exception{
363                 openFile(FILEB);
364              //int pre=filebstream.size();
365                 int pre= (int)filebstream.getFilePointer();
366              //row.writeToStream(filebstream);
367                 ((RowTransactional)args.get(0)).writeToFile(filebstream);
368                 int post=(int)filebstream.getFilePointer();
369              //int post=filebstream.size();
370              //filebstream.flush();
371              //synchronized(statlock){
372                 atomicfields.setstat_add(atomicfields.getstat_add()+1);
373                 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
374              // }
375                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
376                 if(INDEXCACHESIZE>0){
377                   TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
378                   addCacheEntry(entry);
379                 }
380                 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
381             }
382          });
383      }
384      
385
386      private void updateCacheEntry(TableIndexEntryTransactional entry){
387           final Vector args = new Vector();
388           args.add(entry);
389           Thread.doIt(new Callable<Boolean>() {
390              public Boolean call() throws Exception{
391           //synchronized(indexcache){
392               if(indexcache.containsKey(((TableIndexEntryTransactional)(args.get(0))).getId())){
393                   TableIndexNodeTransactional node=indexcache.get(((TableIndexEntryTransactional)(args.get(0))).getId());
394                   node.setData(((TableIndexEntryTransactional)(args.get(0))));
395                   if(node!=indexcachelast){
396                       if(node==indexcachefirst){
397                           indexcachefirst=node.getNext();
398                       }
399                       node.remove();
400                       indexcachelast.setNext(node);
401                       node.setPrevious(indexcachelast);
402                       node.setNext(null);
403                       indexcachelast=node;
404                   }
405               }else{
406                   addCacheEntry(((TableIndexEntryTransactional)(args.get(0))));
407               }
408             }
409         });
410       }
411
412       private void removeCacheEntry(String id){
413           //synchronized(indexcache){
414           final Vector args = new Vector();
415           args.add(id);
416           Thread.doIt(new Callable<Boolean>() {
417              public Boolean call() throws Exception{
418           
419                 if(indexcache.containsKey((String)(args.get(0)))){
420                     TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
421                     indexcache.remove((String)(args.get(0)));
422                     if(atomicfields.getIndexcacheusage()==1){
423                         indexcachefirst=null;
424                         indexcachelast=null;
425                         atomicfields.setIndexcacheusage(0);
426                         return true;
427                     }
428                     if(node==indexcachefirst){
429                         indexcachefirst=node.getNext();
430                         indexcachefirst.setPrevious(null);
431                     }else if(node==indexcachelast){
432                         indexcachelast=node.getPrevious();
433                         indexcachelast.setNext(null);
434                     }else{
435                         node.remove();
436                     }
437                     atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
438               //      synchronized(statlock){
439                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
440                      
441               //      }
442                 }
443                 return true;
444             }
445           });
446       }
447
448       private TableIndexEntryTransactional getCacheEntry(String id){
449           final Vector args = new Vector();
450           args.add(id);
451           
452           TableIndexEntryTransactional res = Thread.doIt(new Callable<TableIndexEntryTransactional>() {
453              public TableIndexEntryTransactional call() throws Exception{
454           //synchronized(indexcache){
455               if(indexcache.containsKey((String)(args.get(0)))){
456                   TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
457                   if(node!=indexcachelast){
458                       if(node==indexcachefirst){
459                             indexcachefirst=node.getNext();
460                         }
461                         node.remove();
462                         indexcachelast.setNext(node);
463                         node.setPrevious(indexcachelast);
464                         node.setNext(null);
465                         indexcachelast=node;
466                   }
467                 //  synchronized(statlock){
468                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
469                 //   }
470                   return node.getData();
471               }
472               return null;
473              }
474           });
475           if (res != null)
476               return res;
477           
478           Thread.doIt(new Callable<Boolean>() {
479              public Boolean call() throws Exception{
480           //synchronized(statlock){
481                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
482                return true;
483              }
484           });
485            //}
486           return null;
487       }
488
489      /**
490       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
491       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
492       */
493      public void addOrUpdateRow(RowTransactional row) throws IOException{
494          RowTransactional tmprow=getRow(row.getId());
495          if(tmprow==null){
496              addRow(row);
497          }else{
498              updateRow(row);
499          }
500      }
501
502      /**
503       * Updates a row stored in this table.
504       */
505      public void updateRow(RowTransactional row) throws IOException{
506          TableIndexEntryTransactional entry=null;
507          final Vector args = new Vector();
508          args.add(row);
509          args.add(entry);
510          // Handle index entry caching
511          if(INDEXCACHESIZE>0){
512              Thread.doIt(new Callable<Boolean>() {
513                 public Boolean call() throws Exception {
514                     TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
515                     RowTransactional row = (RowTransactional) (args.get(0));
516                     entry = getCacheEntry(row.getId());
517                     if(entry==null){
518                        entry=index.scanIndex(row.getId());
519                        addCacheEntry(entry);
520                     }
521                     return true;
522                 }
523              });
524           /*   synchronized(indexcache){
525                  entry=getCacheEntry(row.getId());
526                  if(entry==null){
527                      entry=index.scanIndex(row.getId());
528                      addCacheEntry(entry);
529                  }
530              }*/
531          }else{
532              entry=index.scanIndex(row.getId());
533          }
534          if(entry.getRowSize()>=row.getSize()){
535             // Add to the existing location
536             switch(entry.getLocation()){
537                 case FILEA  :
538                //     synchronized(filealock){
539                      Thread.doIt(new Callable<Boolean>() {
540                         public Boolean call() throws Exception {
541                             if(filearandom==null){
542                                 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
543                                // fca=filearandom.getChannel();
544                             }
545                             filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
546                             ((RowTransactional) (args.get(0))).writeToFile(filearandom);
547                             return true;
548                             //fca.force(false);
549                         }
550                      });    
551                //   }
552                     break;
553                 case FILEB :
554               //      synchronized(fileblock){
555                     Thread.doIt(new Callable<Boolean>() {
556                         public Boolean call() throws Exception {
557                                 if(filebrandom==null){
558                                     filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
559                                   //  fcb=filebrandom.getChannel();
560                                 }
561                                 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
562                                 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
563                                 return true;
564                                 //fcb.force(false);
565                         }
566                     });
567                     break;
568             }
569          }else{
570              if(rowswitch){
571                   updateRowA(row);
572               }else{
573                   updateRowB(row);
574               }
575               rowswitch=!rowswitch;
576          }
577          //synchronized(statlock){
578          Thread.doIt(new Callable<Boolean>() {
579               public Boolean call() throws Exception{ 
580                 atomicfields.setstat_update(atomicfields.getstat_update()+1);
581                 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
582                 return true;
583               }
584          });
585      }
586      
587      private void updateRowA(RowTransactional row) throws IOException{
588          final Vector args = new Vector();
589          args.add(row);
590          Thread.doIt(new Callable<Boolean>() {
591               public Boolean call() throws Exception{ 
592          //synchronized(filealock){
593                   openFile(FILEA);
594                   //int pre=filebstream.size();
595                   int pre=(int)fileastream.getFilePointer();
596                   //row.writeToStream(filebstream);
597                   ((RowTransactional)(args.get(0))).writeToFile(fileastream);
598                   //int post=filebstream.size();
599                   int post=(int)fileastream.getFilePointer();
600                   //fileastream.flush();
601                   index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
602
603                   // Handle index entry caching
604                   if(INDEXCACHESIZE>0){
605                       updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
606                   }
607                   atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
608               }
609           });
610      }
611
612      private void updateRowB(RowTransactional row) throws IOException{
613          final Vector args = new Vector();
614          args.add(row);
615          Thread.doIt(new Callable<Boolean>() {
616              public Boolean call() throws Exception{ 
617          
618          //synchronized(fileblock){
619                 openFile(FILEB);
620               //int pre=filebstream.size();
621                 int pre=(int)filebstream.getFilePointer();
622               //row.writeToStream(filebstream);
623                 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
624               //int post=filebstream.size();
625                 int post=(int)filebstream.getFilePointer();
626               //filebstream.flush();
627                 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
628               // Handle index entry caching
629               // Handle index entry caching
630                 if(INDEXCACHESIZE>0){
631                     updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
632                 }
633                 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
634                 return true;
635             }
636         });
637      }
638
639      /**
640       * Marks a row as deleted in the index.
641       * Be aware that the space consumed by the row is not actually reclaimed.
642       */
643      public void deleteRow(RowTransactional row) throws IOException{
644           // Handle index entry caching
645           if(INDEXCACHESIZE>0){
646               removeCacheEntry(row.getId());
647           }
648           index.updateEntry(row.getId(),row.getSize(),DELETE,0);
649           Thread.doIt(new Callable<Boolean>() {
650              public Boolean call() throws Exception{
651           //synchronized(statlock){
652                 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
653                 return true;
654              }
655           });
656          // }
657      }
658      
659      /**
660       * Returns a tuplestream containing the given list of rows
661       */
662      public TupleStreamTransactional getRows(List<String> rows) throws IOException{
663          return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
664      }
665
666      /**
667       * Returns a tuplestream containing the rows matching the given rowmatcher
668       */
669      public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
670          return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
671      }
672      
673      /**
674       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
675       */
676      public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
677           return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
678       }
679
680      /**
681       * Returns a tuplestream of all rows in this table.
682       */
683      public TupleStreamTransactional getRows() throws IOException{
684          // return new TableReader(this);
685          return new IndexedTableReaderTransactional(this,index.scanIndex());
686      }
687      
688      /**
689       * Returns a single row stored in this table.
690       * If the row does not exist in the table, null will be returned.
691       */
692      public RowTransactional getRow(String id) throws IOException{
693          TableIndexEntryTransactional entry=null;
694           // Handle index entry caching
695           if(INDEXCACHESIZE>0){
696               synchronized(indexcache){
697                   entry=getCacheEntry(id);
698                    if(entry==null){
699                        entry=index.scanIndex(id);
700                        if(entry!=null){
701                            addCacheEntry(entry);
702                        }
703                    }
704               }
705           }else{
706               entry=index.scanIndex(id);
707           }
708           if(entry!=null){
709               long dataoffset=0;
710               DataInputStream data=null;
711               if(entry.getLocation()==Table.FILEA){
712                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
713               }else if(entry.getLocation()==Table.FILEB){
714                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
715               }
716               if(data!=null){
717                   while(dataoffset!=entry.getPosition()){
718                       dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
719                   }
720                   RowTransactional row=RowTransactional.readFromStream(data);
721                   data.close();
722                   final Vector args = new Vector();
723                   args.add(row);
724                   Thread.doIt(new Callable<Boolean>() {
725                       public Boolean call() throws Exception{
726                   //synchronized(statlock){
727                           atomicfields.setstat_read(atomicfields.getstat_read()+1);
728                           atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
729                       }
730                   });
731                   return row;
732               }
733               
734           }
735           return null;
736      }
737  }