483b0e9dfa5f6d1d5f952639236ed0f8f6df1b7d
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34
35 import TransactionalIO.core.TransactionalFile;
36 import java.io.*;
37 import java.util.*;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
40 import dstm2.atomic;
41 import dstm2.Thread;
42 import dstm2.benchmark.FinancialTransaction.FinancialTransactionDS;
43 import dstm2.factory.Factory;
44 import dstm2.util.StringKeyHashMap;
45 import java.util.concurrent.Callable;
46
47 /**
48  * The table stores a group of rows.
49  * Every row must have a unique id within a table.
50  */
51 public class DualFileTableTransactional implements TableTransactional{
52
53     
54     private int INDEXCACHESIZE=8192;
55      
56     private String filealock="filea-dummy";
57     private String fileblock="fileb-dummy";
58
59 //    private DataOutputStream fileastream=null;
60 //    private DataOutputStream filebstream=null;
61     private TransactionalFile fileastream=null;
62     private TransactionalFile filebstream=null;
63 //    private RandomAccessFile filearandom=null;
64     private TransactionalFile filearandom=null;
65     private TransactionalFile filebrandom=null;
66 //   FileChannel fca=null;
67 //   FileChannel fcb=null;
68     private TableIndexTransactional index=null;
69      
70 //    private long fileaposition=0;
71  //   private long filebposition=0;
72      
73     private boolean rowswitch=true;
74      
75     private String title;
76     private String location;
77      
78     private TableIndexNodeTransactional indexcachefirst;
79     private TableIndexNodeTransactional indexcachelast;
80     //private int indexcacheusage;
81     
82     private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
83     //private Hashtable<String,TableIndexNode> indexcache;
84     
85     static Factory<DualFileTableTSInf> factory = Thread.makeFactory(DualFileTableTSInf.class);
86     // static Factory<FinancialTransactionDS> factory = Thread.makeFactory(FinancialTransactionDS.class);
87     DualFileTableTSInf atomicfields;
88     // Statistic counters
89
90     public @atomic interface DualFileTableTSInf{
91         Long getstat_add();
92         Long getstat_update();
93         Long getstat_delete();
94         Long getstat_add_size();
95         Long getstat_update_size();
96         Long getstat_read_size();
97         Long getstat_read();
98         Long getstat_cache_hit();
99         Long getstat_cache_miss();
100         Long getstat_cache_drop();
101         Long getFileaposition();
102         Long getFilebposition();
103         Integer getIndexcacheusage();
104         
105         void setstat_add(Long val);
106         void setstat_update(Long val);
107         void setstat_delete(Long val);
108         void setstat_add_size(Long val);
109         void setstat_update_size(Long val);
110         void setstat_read_size(Long val);
111         void setstat_read(Long val);
112         void setstat_cache_hit(Long val);
113         void setstat_cache_miss(Long val);
114         void setstat_cache_drop(Long val);
115         void setIndexcacheusage(Integer val);
116         void setFileaposition(Long val);
117         void setFilebposition(Long val);
118     }
119   
120     /*long stat_add=0;
121     long stat_update=0;
122     long stat_delete=0;
123     long stat_add_size=0;
124     long stat_update_size=0;
125     long stat_read_size=0;
126     long stat_read=0;
127     long stat_cache_hit=0;
128     long stat_cache_miss=0;
129     long stat_cache_drop=0;*/
130     
131     protected String statlock="stat-dummy";
132     
133     /**
134      * Return the current values of the statistic counters and reset them.
135      * The current counters are:
136      * <ul>
137      *   <li>stat_table_add
138      *   <li>stat_table_update
139      *   <li>stat_table_delete
140      *   <li>stat_table_add_size
141      *   <li>stat_table_update_size
142      *   <li>stat_table_read_size
143      *   <li>stat_table_read
144      *   <li>stat_table_cache_hit
145      *   <li>stat_table_cache_miss
146      *   <li>stat_table_cache_drop
147      * </ul>
148      * Furthermore, the index will be asked to deliver separate index specific counters
149      */
150     public Hashtable<String,Long> readStatistics(){
151         
152     //    synchronized(statlock){
153         return Thread.doIt(new Callable<Hashtable<String,Long>>() {
154             public Hashtable<String,Long> call() throws Exception{
155                 Hashtable<String,Long> hash=new Hashtable<String,Long>();
156                 hash.put("stat_table_add",atomicfields.getstat_add());
157                 hash.put("stat_table_update",atomicfields.getstat_update());
158                 hash.put("stat_table_delete",atomicfields.getstat_delete());
159                 hash.put("stat_table_add_size",atomicfields.getstat_add_size());
160                 hash.put("stat_table_update_size",atomicfields.getstat_update_size());
161                 hash.put("stat_table_read_size",atomicfields.getstat_read_size());
162                 hash.put("stat_table_read",atomicfields.getstat_read());
163                 hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
164                 hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
165                 hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
166                 atomicfields.setstat_add(Long.valueOf(0));
167                 atomicfields.setstat_update(Long.valueOf(0));
168                 atomicfields.setstat_delete(Long.valueOf(0));
169                 atomicfields.setstat_add_size(Long.valueOf(0));
170                 atomicfields.setstat_update_size(Long.valueOf(0));
171                 atomicfields.setstat_read_size(Long.valueOf(0));
172                 atomicfields.getstat_read_size();
173                 atomicfields.setstat_read(Long.valueOf(0));
174                 atomicfields.setstat_cache_hit(Long.valueOf(0));
175                 atomicfields.setstat_cache_miss(Long.valueOf(0));
176                 atomicfields.setstat_cache_drop(Long.valueOf(0));
177                 Hashtable<String,Long> ihash=index.readStatistics();
178                 hash.putAll(ihash);
179                 return hash;
180             }
181         });
182         
183     }
184     
185     /**
186      * Create a new table object with the default flat index model
187      */
188
189     
190     /**
191      * Create a new table object with a specific index model
192      */
193     public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
194         atomicfields = factory.create();
195         
196         this.title=title;
197         this.location=location;
198         if(!this.location.endsWith(File.separator))this.location+=File.separator;
199         switch(indextype){
200              case PAGED  : index=new PagedIndexTransactional(getFileName(INDEX));
201                 break;
202            
203         }
204         indexcachefirst=null;
205         indexcachelast=null;
206         atomicfields.setFileaposition(Long.valueOf(0));
207         atomicfields.setFilebposition(Long.valueOf(0));
208         atomicfields.setstat_update_size(Long.valueOf(0));
209         atomicfields.setstat_update(Long.valueOf(0));
210         atomicfields.setstat_read_size(Long.valueOf(0));
211         atomicfields.setstat_read(Long.valueOf(0));
212         atomicfields.setstat_delete(Long.valueOf(0));
213         atomicfields.setstat_cache_miss(Long.valueOf(0));
214         atomicfields.setstat_cache_hit(Long.valueOf(0));
215         atomicfields.setstat_cache_drop(Long.valueOf(0));
216         atomicfields.setstat_add_size(Long.valueOf(0));
217         atomicfields.setstat_add(Long.valueOf(0));
218         atomicfields.setIndexcacheusage(Integer.valueOf(0));
219         indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
220     }
221      
222     /**
223      * Set the maximal allowable size of the index cache.
224      */ 
225     public void setIndexCacheSize(int newsize){
226         INDEXCACHESIZE=newsize;
227     }
228     
229     /**
230      * Close all open file streams
231      */
232     public void close(){
233         try{
234             if(fileastream!=null)fileastream.close();
235             if(filebstream!=null)filebstream.close();
236             if(filearandom!=null)filearandom.close();
237             if(filebrandom!=null)filebrandom.close();
238             index.close();
239         }catch(Exception e){}
240     }
241     
242     /** 
243      * Returns the name of this table
244      */ 
245     public String getTitle(){
246          return title;
247     }
248     
249     /**
250      * Returns the location of this tables datafiles
251      */ 
252     public String getLocation(){
253          return location;
254     }
255      
256     protected String getFileName(int type){
257          switch(type){
258              case FILEB  :   return location+title+".a";
259              case FILEA  :   return location+title+".b";
260              case INDEX  :   return location+title+".index";
261          }
262          return null;
263     }
264     
265     /**
266      * Delete the files created by this table object.
267      * Be aware that this will delete any data stored in this table!
268      */ 
269     public void deleteFiles(){
270          try{
271              File ftest=new File(getFileName(FILEA));
272              ftest.delete();
273          }catch(Exception e){}
274          try{
275              File ftest=new File(getFileName(FILEB));
276              ftest.delete();
277          }catch(Exception e){}
278          try{
279              File ftest=new File(getFileName(INDEX));
280              ftest.delete();
281          }catch(Exception e){}
282     }
283      
284      private /*synchronized*/ void openFile(int type) throws IOException{
285          switch(type){
286              case FILEA  : if(fileastream==null){
287                                // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
288                                 System.out.println("file a " + getFileName(FILEA));
289                                 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
290                                  
291                                 //File ftest=new File(getFileName(FILEA));
292                                 //atomicfields.setFileaposition(ftest.length());
293                                 atomicfields.setFileaposition(fileastream.length());
294                                 fileastream.seek(fileastream.length());
295                            }
296                           break;
297              case FILEB  : if(filebstream==null){
298                                 System.out.println("file a " + getFileName(FILEB));
299                                 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
300                                 //File ftest=new File(getFileName(FILEB));
301                                 //atomicfields.setFilebposition(ftest.length());
302                                 atomicfields.setFilebposition(filebstream.length());
303                                 filebstream.seek(filebstream.length());
304                            }
305                           break;
306          }
307      }
308      
309     /**
310      * Adds a row of data to this table.
311      */
312     public void addRow(RowTransactional row) throws IOException{
313         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
314         if(rowswitch){
315             addRowA(row);
316         }else{
317             addRowB(row);
318         }
319         rowswitch=!rowswitch;
320     }
321      
322      private void addCacheEntry(TableIndexEntryTransactional entry){
323       //   synchronized(indexcache){
324              if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
325                  // remove first entry
326                  TableIndexNodeTransactional node=indexcachefirst;
327                  indexcache.remove(node.getData().getId());
328                  atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
329               //   synchronized(statlock){
330                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
331              //    }
332                  indexcachefirst=node.getNext();
333                  if(indexcachefirst==null){
334                     indexcachelast=null;
335                  }else{
336                     indexcachefirst.setPrevious(null);
337                  }
338              }
339              TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
340              if(indexcachelast!=null){
341                  indexcachelast.setNext(node);
342              }
343              if(indexcachefirst==null){
344                  indexcachefirst=node;
345              }
346              indexcachelast=node;
347              indexcache.put(entry.getId(),node);
348              atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
349      //   }
350      }
351      
352      private void addRowA(RowTransactional row) throws IOException{
353          //synchronized(filealock){
354          final Vector args = new Vector();
355          args.add(row);
356          Thread.doIt(new Callable<Boolean>() {
357              public Boolean call() throws Exception{
358                  openFile(FILEA);             
359              //int pre=fileastream.size();
360                  int pre= (int)fileastream.getFilePointer();
361              //row.writeToStream(fileastream);
362                  ((RowTransactional)args.get(0)).writeToFile(fileastream);
363              //int post= fileastream.size();
364                  int post= (int)fileastream.getFilePointer();
365              //fileastream.flush();
366              
367              //synchronized(statlock){
368                  atomicfields.setstat_add(atomicfields.getstat_add()+1);
369                  atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
370              //}
371              
372                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFilebposition());
373                 if(INDEXCACHESIZE>0){
374                     TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEA,atomicfields.getFileaposition());
375                     addCacheEntry(entry);
376                 }
377                 atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
378                 return true;
379             }
380          });
381      }
382      private void addRowB(RowTransactional row) throws IOException{
383         // synchronized(fileblock){
384          final Vector args = new Vector();
385          args.add(row);
386          Thread.doIt(new Callable<Boolean>() {
387              public Boolean call() throws Exception{
388                 openFile(FILEB);
389              //int pre=filebstream.size();
390                 int pre= (int)filebstream.getFilePointer();
391              //row.writeToStream(filebstream);
392                 ((RowTransactional)args.get(0)).writeToFile(filebstream);
393                 int post=(int)filebstream.getFilePointer();
394              //int post=filebstream.size();
395              //filebstream.flush();
396              //synchronized(statlock){
397                 atomicfields.setstat_add(atomicfields.getstat_add()+1);
398                 atomicfields.setstat_add_size(atomicfields.getstat_add_size()+((RowTransactional)args.get(0)).getSize());
399              // }
400                 index.addEntry(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
401                 if(INDEXCACHESIZE>0){
402                   TableIndexEntryTransactional entry=new TableIndexEntryTransactional(((RowTransactional)args.get(0)).getId(),((RowTransactional)args.get(0)).getSize(),FILEB,atomicfields.getFilebposition());
403                   addCacheEntry(entry);
404                 }
405                 atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
406                 return true;
407             }
408          });
409      }
410      
411
412      private void updateCacheEntry(TableIndexEntryTransactional entry){
413         
414           //synchronized(indexcache){
415               if(indexcache.containsKey(entry.getId())){
416                   TableIndexNodeTransactional node=indexcache.get(entry.getId());
417                   node.setData(entry);
418                   if(node!=indexcachelast){
419                       if(node==indexcachefirst){
420                           indexcachefirst=node.getNext();
421                       }
422                       node.remove();
423                       indexcachelast.setNext(node);
424                       node.setPrevious(indexcachelast);
425                       node.setNext(null);
426                       indexcachelast=node;
427                   }
428               }else{
429                   addCacheEntry(entry);
430               }
431            //   return true;
432          //   }
433        // });
434       }
435
436       private void removeCacheEntry(String id){
437           //synchronized(indexcache){
438           final Vector args = new Vector();
439           args.add(id);
440           Thread.doIt(new Callable<Boolean>() {
441              public Boolean call() throws Exception{
442           
443                 if(indexcache.containsKey((String)(args.get(0)))){
444                     TableIndexNodeTransactional node=indexcache.get((String)(args.get(0)));
445                     indexcache.remove((String)(args.get(0)));
446                     if(atomicfields.getIndexcacheusage()==1){
447                         indexcachefirst=null;
448                         indexcachelast=null;
449                         atomicfields.setIndexcacheusage(0);
450                         return true;
451                     }
452                     if(node==indexcachefirst){
453                         indexcachefirst=node.getNext();
454                         indexcachefirst.setPrevious(null);
455                     }else if(node==indexcachelast){
456                         indexcachelast=node.getPrevious();
457                         indexcachelast.setNext(null);
458                     }else{
459                         node.remove();
460                     }
461                     atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
462               //      synchronized(statlock){
463                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
464                      
465               //      }
466                 }
467                 return true;
468             }
469           });
470       }
471
472       private TableIndexEntryTransactional getCacheEntry(String id){
473          
474           //synchronized(indexcache){
475               if(indexcache.containsKey(id)){
476                   TableIndexNodeTransactional node=indexcache.get(id);
477                   if(node!=indexcachelast){
478                       if(node==indexcachefirst){
479                             indexcachefirst=node.getNext();
480                         }
481                         node.remove();
482                         indexcachelast.setNext(node);
483                         node.setPrevious(indexcachelast);
484                         node.setNext(null);
485                         indexcachelast=node;
486                   }
487                 //  synchronized(statlock){
488                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
489                 //   }
490                   return node.getData();
491               }
492
493           //synchronized(statlock){
494                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
495
496            //}
497           return null;
498       }
499
500      /**
501       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
502       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
503       */
504      public void addOrUpdateRow(RowTransactional row) throws IOException{
505          RowTransactional tmprow=getRow(row.getId());
506          if(tmprow==null){
507              addRow(row);
508          }else{
509              updateRow(row);
510          }
511      }
512
513      /**
514       * Updates a row stored in this table.
515       */
516      public void updateRow(RowTransactional row) throws IOException{
517          TableIndexEntryTransactional entry=null;
518          final Vector args = new Vector();
519          args.add(row);
520          args.add(entry);
521          // Handle index entry caching
522          if(INDEXCACHESIZE>0){
523              Thread.doIt(new Callable<Boolean>() {
524                 public Boolean call() throws Exception {
525                     TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
526                     RowTransactional row = (RowTransactional) (args.get(0));
527                     entry = getCacheEntry(row.getId());
528                     if(entry==null){
529                        entry=index.scanIndex(row.getId());
530                        addCacheEntry(entry);
531                     }
532                     return true;
533                 }
534              });
535           /*   synchronized(indexcache){
536                  entry=getCacheEntry(row.getId());
537                  if(entry==null){
538                      entry=index.scanIndex(row.getId());
539                      addCacheEntry(entry);
540                  }
541              }*/
542          }else{
543              entry=index.scanIndexTransactional(row.getId());
544          }
545          if(entry.getRowSize()>=row.getSize()){
546             // Add to the existing location
547             switch(entry.getLocation()){
548                 case FILEA  :
549                //     synchronized(filealock){
550                      Thread.doIt(new Callable<Boolean>() {
551                         public Boolean call() throws Exception {
552                             if(filearandom==null){
553                                 filearandom=new TransactionalFile(getFileName(FILEA),"rw");
554                                // fca=filearandom.getChannel();
555                             }
556                             filearandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
557                             ((RowTransactional) (args.get(0))).writeToFile(filearandom);
558                             return true;
559                             //fca.force(false);
560                         }
561                      });    
562                //   }
563                     break;
564                 case FILEB :
565               //      synchronized(fileblock){
566                     Thread.doIt(new Callable<Boolean>() {
567                         public Boolean call() throws Exception {
568                                 if(filebrandom==null){
569                                     filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
570                                   //  fcb=filebrandom.getChannel();
571                                 }
572                                 filebrandom.seek(((TableIndexEntryTransactional) (args.get(1))).getPosition());
573                                 ((RowTransactional) (args.get(0))).writeToFile(filebrandom);
574                                 return true;
575                                 //fcb.force(false);
576                         }
577                     });
578                     break;
579             }
580          }else{
581              if(rowswitch){
582                   updateRowA(row);
583               }else{
584                   updateRowB(row);
585               }
586               rowswitch=!rowswitch;
587          }
588          //synchronized(statlock){
589          Thread.doIt(new Callable<Boolean>() {
590               public Boolean call() throws Exception{ 
591                 atomicfields.setstat_update(atomicfields.getstat_update()+1);
592                 atomicfields.setstat_update_size(atomicfields.getstat_update_size()+((RowTransactional) (args.get(0))).getSize());
593                 return true;
594               }
595          });
596      }
597      
598      private void updateRowA(RowTransactional row) throws IOException{
599          final Vector args = new Vector();
600          args.add(row);
601          Thread.doIt(new Callable<Boolean>() {
602               public Boolean call() throws Exception{ 
603          //synchronized(filealock){
604                   openFile(FILEA);
605                   //int pre=filebstream.size();
606                   int pre=(int)fileastream.getFilePointer();
607                   //row.writeToStream(filebstream);
608                   ((RowTransactional)(args.get(0))).writeToFile(fileastream);
609                   //int post=filebstream.size();
610                   int post=(int)fileastream.getFilePointer();
611                   //fileastream.flush();
612                   index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition());
613
614                   // Handle index entry caching
615                   if(INDEXCACHESIZE>0){
616                       updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEA,atomicfields.getFileaposition()));
617                   }
618                   atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
619                   return true;
620               }
621           });
622      }
623
624      private void updateRowB(RowTransactional row) throws IOException{
625          final Vector args = new Vector();
626          args.add(row);
627          Thread.doIt(new Callable<Boolean>() {
628              public Boolean call() throws Exception{ 
629          
630          //synchronized(fileblock){
631                 openFile(FILEB);
632               //int pre=filebstream.size();
633                 int pre=(int)filebstream.getFilePointer();
634               //row.writeToStream(filebstream);
635                 ((RowTransactional)(args.get(0))).writeToFile(filebstream);
636               //int post=filebstream.size();
637                 int post=(int)filebstream.getFilePointer();
638               //filebstream.flush();
639                 index.updateEntry(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition());
640               // Handle index entry caching
641               // Handle index entry caching
642                 if(INDEXCACHESIZE>0){
643                     updateCacheEntry(new TableIndexEntryTransactional(((RowTransactional)(args.get(0))).getId(),((RowTransactional)(args.get(0))).getSize(),FILEB,atomicfields.getFilebposition()));
644                 }
645                 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
646                 return true;
647             }
648         });
649      }
650
651      /**
652       * Marks a row as deleted in the index.
653       * Be aware that the space consumed by the row is not actually reclaimed.
654       */
655      public void deleteRow(RowTransactional row) throws IOException{
656           // Handle index entry caching
657           if(INDEXCACHESIZE>0){
658               removeCacheEntry(row.getId());
659           }
660           index.updateEntryTransactional(row.getId(),row.getSize(),DELETE,0);
661           Thread.doIt(new Callable<Boolean>() {
662              public Boolean call() throws Exception{
663           //synchronized(statlock){
664                 atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
665                 return true;
666              }
667           });
668          // }
669      }
670      
671      /**
672       * Returns a tuplestream containing the given list of rows
673       */
674      public TupleStreamTransactional getRows(List<String> rows) throws IOException{
675          return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
676      }
677
678      /**
679       * Returns a tuplestream containing the rows matching the given rowmatcher
680       */
681      public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
682          return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
683      }
684      
685      /**
686       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
687       */
688      public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
689           return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
690       }
691
692      /**
693       * Returns a tuplestream of all rows in this table.
694       */
695      public TupleStreamTransactional getRows() throws IOException{
696          // return new TableReader(this);
697          return new IndexedTableReaderTransactional(this,index.scanIndex());
698      }
699      
700      /**
701       * Returns a single row stored in this table.
702       * If the row does not exist in the table, null will be returned.
703       */
704      public RowTransactional getRow(String id) throws IOException{
705          TableIndexEntryTransactional entry=null;
706           // Handle index entry caching
707           if(INDEXCACHESIZE>0){
708               final Vector args = new Vector();
709               args.add(id);
710               args.add(entry);
711               //synchronized(indexcache){
712               Thread.doIt(new Callable<Boolean>() {
713                 public Boolean call() throws Exception{
714                   TableIndexEntryTransactional entry = (TableIndexEntryTransactional) (args.get(1));
715                   String id = (String) (args.get(0));  
716                   entry=getCacheEntry(id);
717                    if(entry==null){
718                        entry=index.scanIndex(id);
719                        if(entry!=null){
720                            addCacheEntry(entry);
721                        }
722                    }
723                   return true;
724                 }
725               });
726           }else{
727               entry=index.scanIndexTransactional(id);
728           }
729           if(entry!=null){
730               long dataoffset=0;
731               DataInputStream data=null;
732               if(entry.getLocation()==Table.FILEA){
733                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
734               }else if(entry.getLocation()==Table.FILEB){
735                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
736               }
737               if(data!=null){
738                   while(dataoffset!=entry.getPosition()){
739                       dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
740                   }
741                   RowTransactional row=RowTransactional.readFromStream(data);
742                   data.close();
743                   final Vector args = new Vector();
744                   args.add(row);
745                   Thread.doIt(new Callable<Boolean>() {
746                       public Boolean call() throws Exception{
747                   //synchronized(statlock){
748                           atomicfields.setstat_read(atomicfields.getstat_read()+1);
749                           atomicfields.setstat_read_size(atomicfields.getstat_read_size()+((RowTransactional)args.get(0)).getSize());
750                           return true;
751                       }
752                   });
753                   return row;
754               }
755               
756           }
757           return null;
758      }
759  }