61dbecf04a88a04056669f534c4d20e2a84cc99d
[IRC.git] / Robust / Transactions / mytuplesoup / src / com / solidosystems / tuplesoup / core / DualFileTable.java
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34 import java.io.*;
35 import java.util.*;
36 import java.nio.channels.*;
37 import com.solidosystems.tuplesoup.filter.*;
38 import dstm2.atomic;
39
40 /**
41  * The table stores a group of rows.
42  * Every row must have a unique id within a table.
43  */
44 public class DualFileTable implements Table{
45
46     
47     private int INDEXCACHESIZE=8192;
48      
49     private String filealock="filea-dummy";
50     private String fileblock="fileb-dummy";
51
52     private DataOutputStream fileastream=null;
53     private DataOutputStream filebstream=null;
54     private RandomAccessFile filearandom=null;
55     private RandomAccessFile filebrandom=null;
56     FileChannel fca=null;
57     FileChannel fcb=null;
58     private TableIndex index=null;
59      
60     private long fileaposition=0;
61     private long filebposition=0;
62      
63     private boolean rowswitch=true;
64      
65     private String title;
66     private String location;
67      
68     private TableIndexNode indexcachefirst;
69     private TableIndexNode indexcachelast;
70     private int indexcacheusage;
71     private Hashtable<String,TableIndexNode> indexcache;
72     
73     
74     DualFileTableTSInf atomicfields;
75     // Statistic counters
76     public @atomic interface DualFileTableTSInf{
77         long getstat_add();
78         long getstat_update();
79         long getstat_delete();
80         long getstat_add_size();
81         long getstat_update_size();
82         long getstat_read_size();
83         long getstat_read();
84         long getstat_cache_hit();
85         long getstat_cache_miss();
86         long getstat_cache_drop();
87         
88         void setstat_add(long val);
89         void setstat_update(long val);
90         void setstat_delete(long val);
91         void setstat_add_size(long val);
92         void setstat_update_size(long val);
93         void setstat_read_size(long val);
94         void setstat_read(long val);
95         void setstat_cache_hit(long val);
96         void setstat_cache_miss(long val);
97         void setstat_cache_drop(long val);
98     }
99   
100     /*long stat_add=0;
101     long stat_update=0;
102     long stat_delete=0;
103     long stat_add_size=0;
104     long stat_update_size=0;
105     long stat_read_size=0;
106     long stat_read=0;
107     long stat_cache_hit=0;
108     long stat_cache_miss=0;
109     long stat_cache_drop=0;*/
110     
111     protected String statlock="stat-dummy";
112     
113     /**
114      * Return the current values of the statistic counters and reset them.
115      * The current counters are:
116      * <ul>
117      *   <li>stat_table_add
118      *   <li>stat_table_update
119      *   <li>stat_table_delete
120      *   <li>stat_table_add_size
121      *   <li>stat_table_update_size
122      *   <li>stat_table_read_size
123      *   <li>stat_table_read
124      *   <li>stat_table_cache_hit
125      *   <li>stat_table_cache_miss
126      *   <li>stat_table_cache_drop
127      * </ul>
128      * Furthermore, the index will be asked to deliver separate index specific counters
129      */
130     public Hashtable<String,Long> readStatistics(){
131         Hashtable<String,Long> hash=new Hashtable<String,Long>();
132         synchronized(statlock){
133             hash.put("stat_table_add",atomicfields.getstat_add());
134             hash.put("stat_table_update",atomicfields.getstat_update());
135             hash.put("stat_table_delete",atomicfields.getstat_delete());
136             hash.put("stat_table_add_size",atomicfields.getstat_add_size());
137             hash.put("stat_table_update_size",atomicfields.getstat_update_size());
138             hash.put("stat_table_read_size",atomicfields.getstat_read_size());
139             hash.put("stat_table_read",atomicfields.getstat_read());
140             hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
141             hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
142             hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
143             atomicfields.setstat_add(0);
144             atomicfields.setstat_update(0);
145             atomicfields.setstat_delete(0);
146             atomicfields.setstat_add_size(0);
147             atomicfields.setstat_update_size(0);
148             atomicfields.setstat_read_size(0);
149             atomicfields.setstat_read(0);
150             atomicfields.setstat_cache_hit(0);
151             atomicfields.setstat_cache_miss(0);
152             atomicfields.setstat_cache_drop(0);
153             Hashtable<String,Long> ihash=index.readStatistics();
154             hash.putAll(ihash);
155         }
156         return hash;
157     }
158     
159     /**
160      * Create a new table object with the default flat index model
161      */
162
163     
164     /**
165      * Create a new table object with a specific index model
166      */
167     public DualFileTable(String title,String location, int indextype) throws IOException{
168         this.title=title;
169         this.location=location;
170         if(!this.location.endsWith(File.separator))this.location+=File.separator;
171         switch(indextype){
172              case PAGED  : index=new PagedIndex(getFileName(INDEX));
173                 break;
174            
175         }
176         indexcachefirst=null;
177         indexcachelast=null;
178         indexcacheusage=0;
179         indexcache=new Hashtable<String,TableIndexNode>();
180     }
181      
182     /**
183      * Set the maximal allowable size of the index cache.
184      */ 
185     public void setIndexCacheSize(int newsize){
186         INDEXCACHESIZE=newsize;
187     }
188     
189     /**
190      * Close all open file streams
191      */
192     public void close(){
193         try{
194             if(fileastream!=null)fileastream.close();
195             if(filebstream!=null)filebstream.close();
196             if(filearandom!=null)filearandom.close();
197             if(filebrandom!=null)filebrandom.close();
198             index.close();
199         }catch(Exception e){}
200     }
201     
202     /** 
203      * Returns the name of this table
204      */ 
205     public String getTitle(){
206          return title;
207     }
208     
209     /**
210      * Returns the location of this tables datafiles
211      */ 
212     public String getLocation(){
213          return location;
214     }
215      
216     protected String getFileName(int type){
217          switch(type){
218              case FILEB  :   return location+title+".a";
219              case FILEA  :   return location+title+".b";
220              case INDEX  :   return location+title+".index";
221          }
222          return null;
223     }
224     
225     /**
226      * Delete the files created by this table object.
227      * Be aware that this will delete any data stored in this table!
228      */ 
229     public void deleteFiles(){
230          try{
231              File ftest=new File(getFileName(FILEA));
232              ftest.delete();
233          }catch(Exception e){}
234          try{
235              File ftest=new File(getFileName(FILEB));
236              ftest.delete();
237          }catch(Exception e){}
238          try{
239              File ftest=new File(getFileName(INDEX));
240              ftest.delete();
241          }catch(Exception e){}
242     }
243      
244      private synchronized void openFile(int type) throws IOException{
245          switch(type){
246              case FILEA  : if(fileastream==null){
247                                 fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
248                                 File ftest=new File(getFileName(FILEA));
249                                 fileaposition=ftest.length();
250                            }
251                           break;
252              case FILEB  : if(filebstream==null){
253                                 filebstream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEB),true)));
254                                 File ftest=new File(getFileName(FILEB));
255                                 filebposition=ftest.length();
256                            }
257                           break;
258          }
259      }
260      
261     /**
262      * Adds a row of data to this table.
263      */
264     public void addRow(Row row) throws IOException{
265         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
266         if(rowswitch){
267             addRowA(row);
268         }else{
269             addRowB(row);
270         }
271         rowswitch=!rowswitch;
272     }
273      
274      private void addCacheEntry(TableIndexEntry entry){
275          synchronized(indexcache){
276              if(indexcacheusage>INDEXCACHESIZE){
277                  // remove first entry
278                  TableIndexNode node=indexcachefirst;
279                  indexcache.remove(node.getData().getId());
280                  indexcacheusage--;
281                  synchronized(statlock){
282                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
283                  }
284                  indexcachefirst=node.getNext();
285                  if(indexcachefirst==null){
286                     indexcachelast=null;
287                  }else{
288                     indexcachefirst.setPrevious(null);
289                  }
290              }
291              TableIndexNode node=new TableIndexNode(indexcachelast,entry);
292              if(indexcachelast!=null){
293                  indexcachelast.setNext(node);
294              }
295              if(indexcachefirst==null){
296                  indexcachefirst=node;
297              }
298              indexcachelast=node;
299              indexcache.put(entry.getId(),node);
300              indexcacheusage++;
301         }
302      }
303      
304      private void addRowA(Row row) throws IOException{
305          synchronized(filealock){
306              openFile(FILEA);
307              int pre=fileastream.size();
308              row.writeToStream(fileastream);
309              int post=fileastream.size();
310              fileastream.flush();
311              
312              synchronized(statlock){
313                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
314                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
315              }
316              
317              index.addEntry(row.getId(),row.getSize(),FILEA,fileaposition);
318              if(INDEXCACHESIZE>0){
319                  TableIndexEntry entry=new TableIndexEntry(row.getId(),row.getSize(),FILEA,fileaposition);
320                  addCacheEntry(entry);
321              }
322              fileaposition+=Row.calcSize(pre,post);
323          }
324      }
325      private void addRowB(Row row) throws IOException{
326          synchronized(fileblock){
327              openFile(FILEB);
328              int pre=filebstream.size();
329              row.writeToStream(filebstream);
330              int post=filebstream.size();
331              filebstream.flush();
332              synchronized(statlock){
333                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
334                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
335               }
336              index.addEntry(row.getId(),row.getSize(),FILEB,filebposition);
337              if(INDEXCACHESIZE>0){
338                  TableIndexEntry entry=new TableIndexEntry(row.getId(),row.getSize(),FILEB,filebposition);
339                  addCacheEntry(entry);
340              }
341              filebposition+=Row.calcSize(pre,post);
342          }
343      }
344      
345
346      private void updateCacheEntry(TableIndexEntry entry){
347           synchronized(indexcache){
348               if(indexcache.containsKey(entry.getId())){
349                   TableIndexNode node=indexcache.get(entry.getId());
350                   node.setData(entry);
351                   if(node!=indexcachelast){
352                       if(node==indexcachefirst){
353                           indexcachefirst=node.getNext();
354                       }
355                       node.remove();
356                       indexcachelast.setNext(node);
357                       node.setPrevious(indexcachelast);
358                       node.setNext(null);
359                       indexcachelast=node;
360                   }
361               }else{
362                   addCacheEntry(entry);
363               }
364          }
365       }
366
367       private void removeCacheEntry(String id){
368           synchronized(indexcache){
369                 if(indexcache.containsKey(id)){
370                     TableIndexNode node=indexcache.get(id);
371                     indexcache.remove(id);
372                     if(indexcacheusage==1){
373                         indexcachefirst=null;
374                         indexcachelast=null;
375                         indexcacheusage=0;
376                         return;
377                     }
378                     if(node==indexcachefirst){
379                         indexcachefirst=node.getNext();
380                         indexcachefirst.setPrevious(null);
381                     }else if(node==indexcachelast){
382                         indexcachelast=node.getPrevious();
383                         indexcachelast.setNext(null);
384                     }else{
385                         node.remove();
386                     }
387                     indexcacheusage--;
388                     synchronized(statlock){
389                          atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
390                     }
391                 }
392           }
393       }
394
395       private TableIndexEntry getCacheEntry(String id){
396           synchronized(indexcache){
397               if(indexcache.containsKey(id)){
398                   TableIndexNode node=indexcache.get(id);
399                   if(node!=indexcachelast){
400                       if(node==indexcachefirst){
401                             indexcachefirst=node.getNext();
402                         }
403                         node.remove();
404                         indexcachelast.setNext(node);
405                         node.setPrevious(indexcachelast);
406                         node.setNext(null);
407                         indexcachelast=node;
408                   }
409                   synchronized(statlock){
410                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
411                    }
412                   return node.getData();
413               }
414           }
415           synchronized(statlock){
416                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
417            }
418           return null;
419       }
420
421      /**
422       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
423       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
424       */
425      public void addOrUpdateRow(Row row) throws IOException{
426          Row tmprow=getRow(row.getId());
427          if(tmprow==null){
428              addRow(row);
429          }else{
430              updateRow(row);
431          }
432      }
433
434      /**
435       * Updates a row stored in this table.
436       */
437      public void updateRow(Row row) throws IOException{
438          TableIndexEntry entry=null;
439          // Handle index entry caching
440          if(INDEXCACHESIZE>0){
441              synchronized(indexcache){
442                  entry=getCacheEntry(row.getId());
443                  if(entry==null){
444                      entry=index.scanIndex(row.getId());
445                      addCacheEntry(entry);
446                  }
447              }
448          }else{
449              entry=index.scanIndex(row.getId());
450          }
451          if(entry.getRowSize()>=row.getSize()){
452             // Add to the existing location
453             switch(entry.getLocation()){
454                 case FILEA  :synchronized(filealock){
455                                 if(filearandom==null){
456                                     filearandom=new RandomAccessFile(getFileName(FILEA),"rw");
457                                     fca=filearandom.getChannel();
458                                 }
459                                 filearandom.seek(entry.getPosition());
460                                 row.writeToFile(filearandom);
461                                 
462                                 fca.force(false);
463                             }
464                             break;
465                 case FILEB :synchronized(fileblock){
466                                 if(filebrandom==null){
467                                     filebrandom=new RandomAccessFile(getFileName(FILEB),"rw");
468                                     fcb=filebrandom.getChannel();
469                                 }
470                                 filebrandom.seek(entry.getPosition());
471                                 row.writeToFile(filebrandom);
472                                 
473                                 fcb.force(false);
474                             }
475                     break;
476             }
477          }else{
478              if(rowswitch){
479                   updateRowA(row);
480               }else{
481                   updateRowB(row);
482               }
483               rowswitch=!rowswitch;
484          }
485          synchronized(statlock){
486               atomicfields.setstat_update(atomicfields.getstat_update()+1);
487               atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
488          }
489      }
490      
491      private void updateRowA(Row row) throws IOException{
492          synchronized(filealock){
493               openFile(FILEA);
494               int pre=fileastream.size();
495               row.writeToStream(fileastream);
496               int post=fileastream.size();
497               fileastream.flush();
498               index.updateEntry(row.getId(),row.getSize(),FILEA,fileaposition);
499               
500               // Handle index entry caching
501               if(INDEXCACHESIZE>0){
502                   updateCacheEntry(new TableIndexEntry(row.getId(),row.getSize(),FILEA,fileaposition));
503               }
504               fileaposition+=Row.calcSize(pre,post);
505           }
506      }
507
508      private void updateRowB(Row row) throws IOException{
509          synchronized(fileblock){
510               openFile(FILEB);
511               int pre=filebstream.size();
512               row.writeToStream(filebstream);
513               int post=filebstream.size();
514               filebstream.flush();
515               index.updateEntry(row.getId(),row.getSize(),FILEB,filebposition);
516               // Handle index entry caching
517               // Handle index entry caching
518                 if(INDEXCACHESIZE>0){
519                     updateCacheEntry(new TableIndexEntry(row.getId(),row.getSize(),FILEB,filebposition));
520                 }
521                 filebposition+=Row.calcSize(pre,post);
522           }
523      }
524
525      /**
526       * Marks a row as deleted in the index.
527       * Be aware that the space consumed by the row is not actually reclaimed.
528       */
529      public void deleteRow(Row row) throws IOException{
530           // Handle index entry caching
531           if(INDEXCACHESIZE>0){
532               removeCacheEntry(row.getId());
533           }
534           index.updateEntry(row.getId(),row.getSize(),DELETE,0);
535           synchronized(statlock){
536               atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
537           }
538      }
539      
540      /**
541       * Returns a tuplestream containing the given list of rows
542       */
543      public TupleStream getRows(List<String> rows) throws IOException{
544          return new IndexedTableReader(this,index.scanIndex(rows));
545      }
546
547      /**
548       * Returns a tuplestream containing the rows matching the given rowmatcher
549       */
550      public TupleStream getRows(RowMatcher matcher) throws IOException{
551          return new IndexedTableReader(this,index.scanIndex(),matcher);
552      }
553      
554      /**
555       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
556       */
557      public TupleStream getRows(List<String> rows,RowMatcher matcher) throws IOException{
558           return new IndexedTableReader(this,index.scanIndex(rows),matcher);
559       }
560
561      /**
562       * Returns a tuplestream of all rows in this table.
563       */
564      public TupleStream getRows() throws IOException{
565          // return new TableReader(this);
566          return new IndexedTableReader(this,index.scanIndex());
567      }
568      
569      /**
570       * Returns a single row stored in this table.
571       * If the row does not exist in the table, null will be returned.
572       */
573      public Row getRow(String id) throws IOException{
574          TableIndexEntry entry=null;
575           // Handle index entry caching
576           if(INDEXCACHESIZE>0){
577               synchronized(indexcache){
578                   entry=getCacheEntry(id);
579                    if(entry==null){
580                        entry=index.scanIndex(id);
581                        if(entry!=null){
582                            addCacheEntry(entry);
583                        }
584                    }
585               }
586           }else{
587               entry=index.scanIndex(id);
588           }
589           if(entry!=null){
590               long dataoffset=0;
591               DataInputStream data=null;
592               if(entry.location==Table.FILEA){
593                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
594               }else if(entry.location==Table.FILEB){
595                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
596               }
597               if(data!=null){
598                   while(dataoffset!=entry.position){
599                       dataoffset+=data.skipBytes((int)(entry.position-dataoffset));
600                   }
601                   Row row=Row.readFromStream(data);
602                   data.close();
603                   synchronized(statlock){
604                        atomicfields.setstat_read(atomicfields.getstat_read()+1);
605                        atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());
606                   }
607                   return row;
608               }
609               
610           }
611           return null;
612      }
613  }