af4be46f593ed2e15be8aefbecfee239c1fde310
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34
35 import TransactionalIO.core.TransactionalFile;
36 import java.io.*;
37 import java.util.*;
38 import java.nio.channels.*;
39 import com.solidosystems.tuplesoup.filter.*;
40 import dstm2.atomic;
41 import dstm2.util.StringKeyHashMap;
42
43 /**
44  * The table stores a group of rows.
45  * Every row must have a unique id within a table.
46  */
47 public class DualFileTableTransactional implements TableTransactional{
48
49     
50     private int INDEXCACHESIZE=8192;
51      
52     private String filealock="filea-dummy";
53     private String fileblock="fileb-dummy";
54
55 //    private DataOutputStream fileastream=null;
56 //    private DataOutputStream filebstream=null;
57     private TransactionalFile fileastream=null;
58     private TransactionalFile filebstream=null;
59 //    private RandomAccessFile filearandom=null;
60     private TransactionalFile filearandom=null;
61     private TransactionalFile filebrandom=null;
62     FileChannel fca=null;
63     FileChannel fcb=null;
64     private TableIndexTransactional index=null;
65      
66 //    private long fileaposition=0;
67  //   private long filebposition=0;
68      
69     private boolean rowswitch=true;
70      
71     private String title;
72     private String location;
73      
74     private TableIndexNodeTransactional indexcachefirst;
75     private TableIndexNodeTransactional indexcachelast;
76     //private int indexcacheusage;
77     
78     private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
79     //private Hashtable<String,TableIndexNode> indexcache;
80     
81     
82     DualFileTableTSInf atomicfields;
83     // Statistic counters
84     public @atomic interface DualFileTableTSInf{
85         long getstat_add();
86         long getstat_update();
87         long getstat_delete();
88         long getstat_add_size();
89         long getstat_update_size();
90         long getstat_read_size();
91         long getstat_read();
92         long getstat_cache_hit();
93         long getstat_cache_miss();
94         long getstat_cache_drop();
95         long getFileaposition();
96         long getFilebposition();
97         int getIndexcacheusage();
98         
99         void setstat_add(long val);
100         void setstat_update(long val);
101         void setstat_delete(long val);
102         void setstat_add_size(long val);
103         void setstat_update_size(long val);
104         void setstat_read_size(long val);
105         void setstat_read(long val);
106         void setstat_cache_hit(long val);
107         void setstat_cache_miss(long val);
108         void setstat_cache_drop(long val);
109         void setIndexcacheusage(int val);
110         void setFileaposition(long val);
111         void setFilebposition(long val);
112     }
113   
114     /*long stat_add=0;
115     long stat_update=0;
116     long stat_delete=0;
117     long stat_add_size=0;
118     long stat_update_size=0;
119     long stat_read_size=0;
120     long stat_read=0;
121     long stat_cache_hit=0;
122     long stat_cache_miss=0;
123     long stat_cache_drop=0;*/
124     
125     protected String statlock="stat-dummy";
126     
127     /**
128      * Return the current values of the statistic counters and reset them.
129      * The current counters are:
130      * <ul>
131      *   <li>stat_table_add
132      *   <li>stat_table_update
133      *   <li>stat_table_delete
134      *   <li>stat_table_add_size
135      *   <li>stat_table_update_size
136      *   <li>stat_table_read_size
137      *   <li>stat_table_read
138      *   <li>stat_table_cache_hit
139      *   <li>stat_table_cache_miss
140      *   <li>stat_table_cache_drop
141      * </ul>
142      * Furthermore, the index will be asked to deliver separate index specific counters
143      */
144     public Hashtable<String,Long> readStatistics(){
145         Hashtable<String,Long> hash=new Hashtable<String,Long>();
146         synchronized(statlock){
147             hash.put("stat_table_add",atomicfields.getstat_add());
148             hash.put("stat_table_update",atomicfields.getstat_update());
149             hash.put("stat_table_delete",atomicfields.getstat_delete());
150             hash.put("stat_table_add_size",atomicfields.getstat_add_size());
151             hash.put("stat_table_update_size",atomicfields.getstat_update_size());
152             hash.put("stat_table_read_size",atomicfields.getstat_read_size());
153             hash.put("stat_table_read",atomicfields.getstat_read());
154             hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
155             hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
156             hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
157             atomicfields.setstat_add(0);
158             atomicfields.setstat_update(0);
159             atomicfields.setstat_delete(0);
160             atomicfields.setstat_add_size(0);
161             atomicfields.setstat_update_size(0);
162             atomicfields.setstat_read_size(0);
163             atomicfields.setstat_read(0);
164             atomicfields.setstat_cache_hit(0);
165             atomicfields.setstat_cache_miss(0);
166             atomicfields.setstat_cache_drop(0);
167             Hashtable<String,Long> ihash=index.readStatistics();
168             hash.putAll(ihash);
169         }
170         return hash;
171     }
172     
173     /**
174      * Create a new table object with the default flat index model
175      */
176
177     
178     /**
179      * Create a new table object with a specific index model
180      */
181     public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
182         this.title=title;
183         this.location=location;
184         if(!this.location.endsWith(File.separator))this.location+=File.separator;
185         switch(indextype){
186              case PAGED  : index=new PagedIndexTransactional(getFileName(INDEX));
187                 break;
188            
189         }
190         indexcachefirst=null;
191         indexcachelast=null;
192         atomicfields.setFileaposition(0);
193         atomicfields.setFilebposition(0);
194         atomicfields.setIndexcacheusage(0);
195         indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
196     }
197      
198     /**
199      * Set the maximal allowable size of the index cache.
200      */ 
201     public void setIndexCacheSize(int newsize){
202         INDEXCACHESIZE=newsize;
203     }
204     
205     /**
206      * Close all open file streams
207      */
208     public void close(){
209         try{
210             if(fileastream!=null)fileastream.close();
211             if(filebstream!=null)filebstream.close();
212             if(filearandom!=null)filearandom.close();
213             if(filebrandom!=null)filebrandom.close();
214             index.close();
215         }catch(Exception e){}
216     }
217     
218     /** 
219      * Returns the name of this table
220      */ 
221     public String getTitle(){
222          return title;
223     }
224     
225     /**
226      * Returns the location of this tables datafiles
227      */ 
228     public String getLocation(){
229          return location;
230     }
231      
232     protected String getFileName(int type){
233          switch(type){
234              case FILEB  :   return location+title+".a";
235              case FILEA  :   return location+title+".b";
236              case INDEX  :   return location+title+".index";
237          }
238          return null;
239     }
240     
241     /**
242      * Delete the files created by this table object.
243      * Be aware that this will delete any data stored in this table!
244      */ 
245     public void deleteFiles(){
246          try{
247              File ftest=new File(getFileName(FILEA));
248              ftest.delete();
249          }catch(Exception e){}
250          try{
251              File ftest=new File(getFileName(FILEB));
252              ftest.delete();
253          }catch(Exception e){}
254          try{
255              File ftest=new File(getFileName(INDEX));
256              ftest.delete();
257          }catch(Exception e){}
258     }
259      
260      private synchronized void openFile(int type) throws IOException{
261          switch(type){
262              case FILEA  : if(fileastream==null){
263                                // fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
264                                 fileastream=new TransactionalFile(getFileName(FILEA),"rw");
265                                  
266                                 //File ftest=new File(getFileName(FILEA));
267                                 //atomicfields.setFileaposition(ftest.length());
268                                 atomicfields.setFileaposition(fileastream.length());
269                                 fileastream.seek(fileastream.length());
270                            }
271                           break;
272              case FILEB  : if(filebstream==null){
273                                 filebstream=new TransactionalFile(getFileName(FILEB),"rw");
274                                 //File ftest=new File(getFileName(FILEB));
275                                 //atomicfields.setFilebposition(ftest.length());
276                                 atomicfields.setFilebposition(filebstream.length());
277                                 filebstream.seek(filebstream.length());
278                            }
279                           break;
280          }
281      }
282      
283     /**
284      * Adds a row of data to this table.
285      */
286     public void addRow(RowTransactional row) throws IOException{
287         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
288         if(rowswitch){
289             addRowA(row);
290         }else{
291             addRowB(row);
292         }
293         rowswitch=!rowswitch;
294     }
295      
296      private void addCacheEntry(TableIndexEntryTransactional entry){
297          synchronized(indexcache){
298              if(atomicfields.getIndexcacheusage()>INDEXCACHESIZE){
299                  // remove first entry
300                  TableIndexNodeTransactional node=indexcachefirst;
301                  indexcache.remove(node.getData().getId());
302                  atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
303                  synchronized(statlock){
304                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
305                  }
306                  indexcachefirst=node.getNext();
307                  if(indexcachefirst==null){
308                     indexcachelast=null;
309                  }else{
310                     indexcachefirst.setPrevious(null);
311                  }
312              }
313              TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
314              if(indexcachelast!=null){
315                  indexcachelast.setNext(node);
316              }
317              if(indexcachefirst==null){
318                  indexcachefirst=node;
319              }
320              indexcachelast=node;
321              indexcache.put(entry.getId(),node);
322              atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()+1);
323         }
324      }
325      
326      private void addRowA(RowTransactional row) throws IOException{
327          synchronized(filealock){
328              openFile(FILEA);             
329              //int pre=fileastream.size();
330              int pre= (int)fileastream.getFilePointer();
331              //row.writeToStream(fileastream);
332              row.writeToFile(fileastream);
333              //int post= fileastream.size();
334              int post= (int)fileastream.getFilePointer();
335              //fileastream.flush();
336              
337              synchronized(statlock){
338                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
339                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
340              }
341              
342              index.addEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFilebposition());
343              if(INDEXCACHESIZE>0){
344                  TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
345                  addCacheEntry(entry);
346              }
347              atomicfields.setFileaposition(atomicfields.getFileaposition()+Row.calcSize(pre,post));
348          }
349      }
350      private void addRowB(RowTransactional row) throws IOException{
351          synchronized(fileblock){
352              openFile(FILEB);
353              //int pre=filebstream.size();
354              int pre= (int)filebstream.getFilePointer();
355              //row.writeToStream(filebstream);
356              row.writeToFile(filebstream);
357              int post=(int)filebstream.getFilePointer();
358              //int post=filebstream.size();
359              //filebstream.flush();
360              synchronized(statlock){
361                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
362                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
363               }
364              index.addEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
365              if(INDEXCACHESIZE>0){
366                  TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
367                  addCacheEntry(entry);
368              }
369              atomicfields.setFilebposition(atomicfields.getFilebposition()+Row.calcSize(pre,post));
370          }
371      }
372      
373
374      private void updateCacheEntry(TableIndexEntryTransactional entry){
375           synchronized(indexcache){
376               if(indexcache.containsKey(entry.getId())){
377                   TableIndexNodeTransactional node=indexcache.get(entry.getId());
378                   node.setData(entry);
379                   if(node!=indexcachelast){
380                       if(node==indexcachefirst){
381                           indexcachefirst=node.getNext();
382                       }
383                       node.remove();
384                       indexcachelast.setNext(node);
385                       node.setPrevious(indexcachelast);
386                       node.setNext(null);
387                       indexcachelast=node;
388                   }
389               }else{
390                   addCacheEntry(entry);
391               }
392          }
393       }
394
395       private void removeCacheEntry(String id){
396           synchronized(indexcache){
397                 if(indexcache.containsKey(id)){
398                     TableIndexNodeTransactional node=indexcache.get(id);
399                     indexcache.remove(id);
400                     if(atomicfields.getIndexcacheusage()==1){
401                         indexcachefirst=null;
402                         indexcachelast=null;
403                         atomicfields.setIndexcacheusage(0);
404                         return;
405                     }
406                     if(node==indexcachefirst){
407                         indexcachefirst=node.getNext();
408                         indexcachefirst.setPrevious(null);
409                     }else if(node==indexcachelast){
410                         indexcachelast=node.getPrevious();
411                         indexcachelast.setNext(null);
412                     }else{
413                         node.remove();
414                     }
415                     atomicfields.setIndexcacheusage(atomicfields.getIndexcacheusage()-1);
416                     synchronized(statlock){
417                          atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
418                     }
419                 }
420           }
421       }
422
423       private TableIndexEntryTransactional getCacheEntry(String id){
424           synchronized(indexcache){
425               if(indexcache.containsKey(id)){
426                   TableIndexNodeTransactional node=indexcache.get(id);
427                   if(node!=indexcachelast){
428                       if(node==indexcachefirst){
429                             indexcachefirst=node.getNext();
430                         }
431                         node.remove();
432                         indexcachelast.setNext(node);
433                         node.setPrevious(indexcachelast);
434                         node.setNext(null);
435                         indexcachelast=node;
436                   }
437                   synchronized(statlock){
438                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
439                    }
440                   return node.getData();
441               }
442           }
443           synchronized(statlock){
444                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
445            }
446           return null;
447       }
448
449      /**
450       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
451       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
452       */
453      public void addOrUpdateRow(RowTransactional row) throws IOException{
454          RowTransactional tmprow=getRow(row.getId());
455          if(tmprow==null){
456              addRow(row);
457          }else{
458              updateRow(row);
459          }
460      }
461
462      /**
463       * Updates a row stored in this table.
464       */
465      public void updateRow(RowTransactional row) throws IOException{
466          TableIndexEntryTransactional entry=null;
467          // Handle index entry caching
468          if(INDEXCACHESIZE>0){
469              synchronized(indexcache){
470                  entry=getCacheEntry(row.getId());
471                  if(entry==null){
472                      entry=index.scanIndex(row.getId());
473                      addCacheEntry(entry);
474                  }
475              }
476          }else{
477              entry=index.scanIndex(row.getId());
478          }
479          if(entry.getRowSize()>=row.getSize()){
480             // Add to the existing location
481             switch(entry.getLocation()){
482                 case FILEA  :synchronized(filealock){
483                                 if(filearandom==null){
484                                     filearandom=new TransactionalFile(getFileName(FILEA),"rw");
485                                    // fca=filearandom.getChannel();
486                                 }
487                                 filearandom.seek(entry.getPosition());
488                                 row.writeToFile(filearandom);
489                                 
490                                 fca.force(false);
491                             }
492                             break;
493                 case FILEB :synchronized(fileblock){
494                                 if(filebrandom==null){
495                                     filebrandom=new TransactionalFile(getFileName(FILEB),"rw");
496                                   //  fcb=filebrandom.getChannel();
497                                 }
498                                 filebrandom.seek(entry.getPosition());
499                                 row.writeToFile(filebrandom);
500                                 
501                                 fcb.force(false);
502                             }
503                     break;
504             }
505          }else{
506              if(rowswitch){
507                   updateRowA(row);
508               }else{
509                   updateRowB(row);
510               }
511               rowswitch=!rowswitch;
512          }
513          synchronized(statlock){
514               atomicfields.setstat_update(atomicfields.getstat_update()+1);
515               atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
516          }
517      }
518      
519      private void updateRowA(RowTransactional row) throws IOException{
520          synchronized(filealock){
521               openFile(FILEA);
522               //int pre=filebstream.size();
523               int pre=(int)fileastream.getFilePointer();
524               //row.writeToStream(filebstream);
525               row.writeToFile(fileastream);
526               //int post=filebstream.size();
527               int post=(int)fileastream.getFilePointer();
528               //fileastream.flush();
529               index.updateEntry(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition());
530               
531               // Handle index entry caching
532               if(INDEXCACHESIZE>0){
533                   updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,atomicfields.getFileaposition()));
534               }
535               atomicfields.setFileaposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
536           }
537      }
538
539      private void updateRowB(RowTransactional row) throws IOException{
540          synchronized(fileblock){
541               openFile(FILEB);
542               //int pre=filebstream.size();
543               int pre=(int)filebstream.getFilePointer();
544               //row.writeToStream(filebstream);
545               row.writeToFile(filebstream);
546               //int post=filebstream.size();
547               int post=(int)filebstream.getFilePointer();
548               //filebstream.flush();
549               index.updateEntry(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition());
550               // Handle index entry caching
551               // Handle index entry caching
552                 if(INDEXCACHESIZE>0){
553                     updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,atomicfields.getFilebposition()));
554                 }
555                 atomicfields.setFilebposition(atomicfields.getFilebposition()+RowTransactional.calcSize(pre,post));
556           }
557      }
558
559      /**
560       * Marks a row as deleted in the index.
561       * Be aware that the space consumed by the row is not actually reclaimed.
562       */
563      public void deleteRow(RowTransactional row) throws IOException{
564           // Handle index entry caching
565           if(INDEXCACHESIZE>0){
566               removeCacheEntry(row.getId());
567           }
568           index.updateEntry(row.getId(),row.getSize(),DELETE,0);
569           synchronized(statlock){
570               atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
571           }
572      }
573      
574      /**
575       * Returns a tuplestream containing the given list of rows
576       */
577      public TupleStreamTransactional getRows(List<String> rows) throws IOException{
578          return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
579      }
580
581      /**
582       * Returns a tuplestream containing the rows matching the given rowmatcher
583       */
584      public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
585          return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
586      }
587      
588      /**
589       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
590       */
591      public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
592           return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
593       }
594
595      /**
596       * Returns a tuplestream of all rows in this table.
597       */
598      public TupleStreamTransactional getRows() throws IOException{
599          // return new TableReader(this);
600          return new IndexedTableReaderTransactional(this,index.scanIndex());
601      }
602      
603      /**
604       * Returns a single row stored in this table.
605       * If the row does not exist in the table, null will be returned.
606       */
607      public RowTransactional getRow(String id) throws IOException{
608          TableIndexEntryTransactional entry=null;
609           // Handle index entry caching
610           if(INDEXCACHESIZE>0){
611               synchronized(indexcache){
612                   entry=getCacheEntry(id);
613                    if(entry==null){
614                        entry=index.scanIndex(id);
615                        if(entry!=null){
616                            addCacheEntry(entry);
617                        }
618                    }
619               }
620           }else{
621               entry=index.scanIndex(id);
622           }
623           if(entry!=null){
624               long dataoffset=0;
625               DataInputStream data=null;
626               if(entry.getLocation()==Table.FILEA){
627                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
628               }else if(entry.getLocation()==Table.FILEB){
629                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
630               }
631               if(data!=null){
632                   while(dataoffset!=entry.getPosition()){
633                       dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
634                   }
635                   RowTransactional row=RowTransactional.readFromStream(data);
636                   data.close();
637                   synchronized(statlock){
638                        atomicfields.setstat_read(atomicfields.getstat_read()+1);
639                        atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());
640                   }
641                   return row;
642               }
643               
644           }
645           return null;
646      }
647  }