d506fa383e2c0f24fe14b9982a9e8b7607aeed43
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32 package com.solidosystems.tuplesoup.core;
33  
34
35 import java.io.*;
36 import java.util.*;
37 import java.nio.channels.*;
38 import com.solidosystems.tuplesoup.filter.*;
39 import dstm2.atomic;
40 import dstm2.util.StringKeyHashMap;
41
42 /**
43  * The table stores a group of rows.
44  * Every row must have a unique id within a table.
45  */
46 public class DualFileTableTransactional implements TableTransactional{
47
48     
49     private int INDEXCACHESIZE=8192;
50      
51     private String filealock="filea-dummy";
52     private String fileblock="fileb-dummy";
53
54     private DataOutputStream fileastream=null;
55     private DataOutputStream filebstream=null;
56     private RandomAccessFile filearandom=null;
57     private RandomAccessFile filebrandom=null;
58     FileChannel fca=null;
59     FileChannel fcb=null;
60     private TableIndexTransactional index=null;
61      
62     private long fileaposition=0;
63     private long filebposition=0;
64      
65     private boolean rowswitch=true;
66      
67     private String title;
68     private String location;
69      
70     private TableIndexNodeTransactional indexcachefirst;
71     private TableIndexNodeTransactional indexcachelast;
72     private int indexcacheusage;
73     
74     private StringKeyHashMap<TableIndexNodeTransactional> indexcache;
75     //private Hashtable<String,TableIndexNode> indexcache;
76     
77     
78     DualFileTableTSInf atomicfields;
79     // Statistic counters
80     public @atomic interface DualFileTableTSInf{
81         long getstat_add();
82         long getstat_update();
83         long getstat_delete();
84         long getstat_add_size();
85         long getstat_update_size();
86         long getstat_read_size();
87         long getstat_read();
88         long getstat_cache_hit();
89         long getstat_cache_miss();
90         long getstat_cache_drop();
91         
92         void setstat_add(long val);
93         void setstat_update(long val);
94         void setstat_delete(long val);
95         void setstat_add_size(long val);
96         void setstat_update_size(long val);
97         void setstat_read_size(long val);
98         void setstat_read(long val);
99         void setstat_cache_hit(long val);
100         void setstat_cache_miss(long val);
101         void setstat_cache_drop(long val);
102     }
103   
104     /*long stat_add=0;
105     long stat_update=0;
106     long stat_delete=0;
107     long stat_add_size=0;
108     long stat_update_size=0;
109     long stat_read_size=0;
110     long stat_read=0;
111     long stat_cache_hit=0;
112     long stat_cache_miss=0;
113     long stat_cache_drop=0;*/
114     
115     protected String statlock="stat-dummy";
116     
117     /**
118      * Return the current values of the statistic counters and reset them.
119      * The current counters are:
120      * <ul>
121      *   <li>stat_table_add
122      *   <li>stat_table_update
123      *   <li>stat_table_delete
124      *   <li>stat_table_add_size
125      *   <li>stat_table_update_size
126      *   <li>stat_table_read_size
127      *   <li>stat_table_read
128      *   <li>stat_table_cache_hit
129      *   <li>stat_table_cache_miss
130      *   <li>stat_table_cache_drop
131      * </ul>
132      * Furthermore, the index will be asked to deliver separate index specific counters
133      */
134     public Hashtable<String,Long> readStatistics(){
135         Hashtable<String,Long> hash=new Hashtable<String,Long>();
136         synchronized(statlock){
137             hash.put("stat_table_add",atomicfields.getstat_add());
138             hash.put("stat_table_update",atomicfields.getstat_update());
139             hash.put("stat_table_delete",atomicfields.getstat_delete());
140             hash.put("stat_table_add_size",atomicfields.getstat_add_size());
141             hash.put("stat_table_update_size",atomicfields.getstat_update_size());
142             hash.put("stat_table_read_size",atomicfields.getstat_read_size());
143             hash.put("stat_table_read",atomicfields.getstat_read());
144             hash.put("stat_table_cache_hit",atomicfields.getstat_cache_hit());
145             hash.put("stat_table_cache_miss",atomicfields.getstat_cache_miss());
146             hash.put("stat_table_cache_drop",atomicfields.getstat_cache_drop());
147             atomicfields.setstat_add(0);
148             atomicfields.setstat_update(0);
149             atomicfields.setstat_delete(0);
150             atomicfields.setstat_add_size(0);
151             atomicfields.setstat_update_size(0);
152             atomicfields.setstat_read_size(0);
153             atomicfields.setstat_read(0);
154             atomicfields.setstat_cache_hit(0);
155             atomicfields.setstat_cache_miss(0);
156             atomicfields.setstat_cache_drop(0);
157             Hashtable<String,Long> ihash=index.readStatistics();
158             hash.putAll(ihash);
159         }
160         return hash;
161     }
162     
163     /**
164      * Create a new table object with the default flat index model
165      */
166
167     
168     /**
169      * Create a new table object with a specific index model
170      */
171     public DualFileTableTransactional(String title,String location, int indextype) throws IOException{
172         this.title=title;
173         this.location=location;
174         if(!this.location.endsWith(File.separator))this.location+=File.separator;
175         switch(indextype){
176              case PAGED  : index=new PagedIndexTransactional(getFileName(INDEX));
177                 break;
178            
179         }
180         indexcachefirst=null;
181         indexcachelast=null;
182         indexcacheusage=0;
183         indexcache=new StringKeyHashMap<TableIndexNodeTransactional>();
184     }
185      
186     /**
187      * Set the maximal allowable size of the index cache.
188      */ 
189     public void setIndexCacheSize(int newsize){
190         INDEXCACHESIZE=newsize;
191     }
192     
193     /**
194      * Close all open file streams
195      */
196     public void close(){
197         try{
198             if(fileastream!=null)fileastream.close();
199             if(filebstream!=null)filebstream.close();
200             if(filearandom!=null)filearandom.close();
201             if(filebrandom!=null)filebrandom.close();
202             index.close();
203         }catch(Exception e){}
204     }
205     
206     /** 
207      * Returns the name of this table
208      */ 
209     public String getTitle(){
210          return title;
211     }
212     
213     /**
214      * Returns the location of this tables datafiles
215      */ 
216     public String getLocation(){
217          return location;
218     }
219      
220     protected String getFileName(int type){
221          switch(type){
222              case FILEB  :   return location+title+".a";
223              case FILEA  :   return location+title+".b";
224              case INDEX  :   return location+title+".index";
225          }
226          return null;
227     }
228     
229     /**
230      * Delete the files created by this table object.
231      * Be aware that this will delete any data stored in this table!
232      */ 
233     public void deleteFiles(){
234          try{
235              File ftest=new File(getFileName(FILEA));
236              ftest.delete();
237          }catch(Exception e){}
238          try{
239              File ftest=new File(getFileName(FILEB));
240              ftest.delete();
241          }catch(Exception e){}
242          try{
243              File ftest=new File(getFileName(INDEX));
244              ftest.delete();
245          }catch(Exception e){}
246     }
247      
248      private synchronized void openFile(int type) throws IOException{
249          switch(type){
250              case FILEA  : if(fileastream==null){
251                                 fileastream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEA),true)));
252                                 File ftest=new File(getFileName(FILEA));
253                                 fileaposition=ftest.length();
254                            }
255                           break;
256              case FILEB  : if(filebstream==null){
257                                 filebstream=new DataOutputStream(new BufferedOutputStream(new FileOutputStream(getFileName(FILEB),true)));
258                                 File ftest=new File(getFileName(FILEB));
259                                 filebposition=ftest.length();
260                            }
261                           break;
262          }
263      }
264      
265     /**
266      * Adds a row of data to this table.
267      */
268     public void addRow(RowTransactional row) throws IOException{
269         // Distribute new rows between the two datafiles by using the rowswitch, but don't spend time synchronizing... this does not need to be acurate!
270         if(rowswitch){
271             addRowA(row);
272         }else{
273             addRowB(row);
274         }
275         rowswitch=!rowswitch;
276     }
277      
278      private void addCacheEntry(TableIndexEntryTransactional entry){
279          synchronized(indexcache){
280              if(indexcacheusage>INDEXCACHESIZE){
281                  // remove first entry
282                  TableIndexNodeTransactional node=indexcachefirst;
283                  indexcache.remove(node.getData().getId());
284                  indexcacheusage--;
285                  synchronized(statlock){
286                      atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
287                  }
288                  indexcachefirst=node.getNext();
289                  if(indexcachefirst==null){
290                     indexcachelast=null;
291                  }else{
292                     indexcachefirst.setPrevious(null);
293                  }
294              }
295              TableIndexNodeTransactional node=new TableIndexNodeTransactional(indexcachelast,entry);
296              if(indexcachelast!=null){
297                  indexcachelast.setNext(node);
298              }
299              if(indexcachefirst==null){
300                  indexcachefirst=node;
301              }
302              indexcachelast=node;
303              indexcache.put(entry.getId(),node);
304              indexcacheusage++;
305         }
306      }
307      
308      private void addRowA(RowTransactional row) throws IOException{
309          synchronized(filealock){
310              openFile(FILEA);
311              int pre=fileastream.size();
312              row.writeToStream(fileastream);
313              int post=fileastream.size();
314              fileastream.flush();
315              
316              synchronized(statlock){
317                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
318                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
319              }
320              
321              index.addEntry(row.getId(),row.getSize(),FILEA,fileaposition);
322              if(INDEXCACHESIZE>0){
323                  TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,fileaposition);
324                  addCacheEntry(entry);
325              }
326              fileaposition+=Row.calcSize(pre,post);
327          }
328      }
329      private void addRowB(RowTransactional row) throws IOException{
330          synchronized(fileblock){
331              openFile(FILEB);
332              int pre=filebstream.size();
333              row.writeToStream(filebstream);
334              int post=filebstream.size();
335              filebstream.flush();
336              synchronized(statlock){
337                   atomicfields.setstat_add(atomicfields.getstat_add()+1);
338                   atomicfields.setstat_add_size(atomicfields.getstat_add_size()+row.getSize());
339               }
340              index.addEntry(row.getId(),row.getSize(),FILEB,filebposition);
341              if(INDEXCACHESIZE>0){
342                  TableIndexEntryTransactional entry=new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,filebposition);
343                  addCacheEntry(entry);
344              }
345              filebposition+=RowTransactional.calcSize(pre,post);
346          }
347      }
348      
349
350      private void updateCacheEntry(TableIndexEntryTransactional entry){
351           synchronized(indexcache){
352               if(indexcache.containsKey(entry.getId())){
353                   TableIndexNodeTransactional node=indexcache.get(entry.getId());
354                   node.setData(entry);
355                   if(node!=indexcachelast){
356                       if(node==indexcachefirst){
357                           indexcachefirst=node.getNext();
358                       }
359                       node.remove();
360                       indexcachelast.setNext(node);
361                       node.setPrevious(indexcachelast);
362                       node.setNext(null);
363                       indexcachelast=node;
364                   }
365               }else{
366                   addCacheEntry(entry);
367               }
368          }
369       }
370
371       private void removeCacheEntry(String id){
372           synchronized(indexcache){
373                 if(indexcache.containsKey(id)){
374                     TableIndexNodeTransactional node=indexcache.get(id);
375                     indexcache.remove(id);
376                     if(indexcacheusage==1){
377                         indexcachefirst=null;
378                         indexcachelast=null;
379                         indexcacheusage=0;
380                         return;
381                     }
382                     if(node==indexcachefirst){
383                         indexcachefirst=node.getNext();
384                         indexcachefirst.setPrevious(null);
385                     }else if(node==indexcachelast){
386                         indexcachelast=node.getPrevious();
387                         indexcachelast.setNext(null);
388                     }else{
389                         node.remove();
390                     }
391                     indexcacheusage--;
392                     synchronized(statlock){
393                          atomicfields.setstat_cache_drop(atomicfields.getstat_cache_drop()+1);
394                     }
395                 }
396           }
397       }
398
399       private TableIndexEntryTransactional getCacheEntry(String id){
400           synchronized(indexcache){
401               if(indexcache.containsKey(id)){
402                   TableIndexNodeTransactional node=indexcache.get(id);
403                   if(node!=indexcachelast){
404                       if(node==indexcachefirst){
405                             indexcachefirst=node.getNext();
406                         }
407                         node.remove();
408                         indexcachelast.setNext(node);
409                         node.setPrevious(indexcachelast);
410                         node.setNext(null);
411                         indexcachelast=node;
412                   }
413                   synchronized(statlock){
414                        atomicfields.setstat_cache_hit(atomicfields.getstat_cache_hit()+1);
415                    }
416                   return node.getData();
417               }
418           }
419           synchronized(statlock){
420                atomicfields.setstat_cache_miss(atomicfields.getstat_cache_miss()+1);
421            }
422           return null;
423       }
424
425      /**
426       * Adds a row to this table if it doesn't already exist, if it does it updates the row instead.
427       * This method is much slower than directly using add or update, so only use it if you don't know wether or not the row already exists.
428       */
429      public void addOrUpdateRow(RowTransactional row) throws IOException{
430          RowTransactional tmprow=getRow(row.getId());
431          if(tmprow==null){
432              addRow(row);
433          }else{
434              updateRow(row);
435          }
436      }
437
438      /**
439       * Updates a row stored in this table.
440       */
441      public void updateRow(RowTransactional row) throws IOException{
442          TableIndexEntryTransactional entry=null;
443          // Handle index entry caching
444          if(INDEXCACHESIZE>0){
445              synchronized(indexcache){
446                  entry=getCacheEntry(row.getId());
447                  if(entry==null){
448                      entry=index.scanIndex(row.getId());
449                      addCacheEntry(entry);
450                  }
451              }
452          }else{
453              entry=index.scanIndex(row.getId());
454          }
455          if(entry.getRowSize()>=row.getSize()){
456             // Add to the existing location
457             switch(entry.getLocation()){
458                 case FILEA  :synchronized(filealock){
459                                 if(filearandom==null){
460                                     filearandom=new RandomAccessFile(getFileName(FILEA),"rw");
461                                     fca=filearandom.getChannel();
462                                 }
463                                 filearandom.seek(entry.getPosition());
464                                 row.writeToFile(filearandom);
465                                 
466                                 fca.force(false);
467                             }
468                             break;
469                 case FILEB :synchronized(fileblock){
470                                 if(filebrandom==null){
471                                     filebrandom=new RandomAccessFile(getFileName(FILEB),"rw");
472                                     fcb=filebrandom.getChannel();
473                                 }
474                                 filebrandom.seek(entry.getPosition());
475                                 row.writeToFile(filebrandom);
476                                 
477                                 fcb.force(false);
478                             }
479                     break;
480             }
481          }else{
482              if(rowswitch){
483                   updateRowA(row);
484               }else{
485                   updateRowB(row);
486               }
487               rowswitch=!rowswitch;
488          }
489          synchronized(statlock){
490               atomicfields.setstat_update(atomicfields.getstat_update()+1);
491               atomicfields.setstat_update_size(atomicfields.getstat_update_size()+row.getSize());
492          }
493      }
494      
495      private void updateRowA(RowTransactional row) throws IOException{
496          synchronized(filealock){
497               openFile(FILEA);
498               int pre=fileastream.size();
499               row.writeToStream(fileastream);
500               int post=fileastream.size();
501               fileastream.flush();
502               index.updateEntry(row.getId(),row.getSize(),FILEA,fileaposition);
503               
504               // Handle index entry caching
505               if(INDEXCACHESIZE>0){
506                   updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEA,fileaposition));
507               }
508               fileaposition+=Row.calcSize(pre,post);
509           }
510      }
511
512      private void updateRowB(RowTransactional row) throws IOException{
513          synchronized(fileblock){
514               openFile(FILEB);
515               int pre=filebstream.size();
516               row.writeToStream(filebstream);
517               int post=filebstream.size();
518               filebstream.flush();
519               index.updateEntry(row.getId(),row.getSize(),FILEB,filebposition);
520               // Handle index entry caching
521               // Handle index entry caching
522                 if(INDEXCACHESIZE>0){
523                     updateCacheEntry(new TableIndexEntryTransactional(row.getId(),row.getSize(),FILEB,filebposition));
524                 }
525                 filebposition+=Row.calcSize(pre,post);
526           }
527      }
528
529      /**
530       * Marks a row as deleted in the index.
531       * Be aware that the space consumed by the row is not actually reclaimed.
532       */
533      public void deleteRow(RowTransactional row) throws IOException{
534           // Handle index entry caching
535           if(INDEXCACHESIZE>0){
536               removeCacheEntry(row.getId());
537           }
538           index.updateEntry(row.getId(),row.getSize(),DELETE,0);
539           synchronized(statlock){
540               atomicfields.setstat_delete(atomicfields.getstat_delete()+1);
541           }
542      }
543      
544      /**
545       * Returns a tuplestream containing the given list of rows
546       */
547      public TupleStreamTransactional getRows(List<String> rows) throws IOException{
548          return new IndexedTableReaderTransactional(this,index.scanIndex(rows));
549      }
550
551      /**
552       * Returns a tuplestream containing the rows matching the given rowmatcher
553       */
554      public TupleStreamTransactional getRows(RowMatcherTransactional matcher) throws IOException{
555          return new IndexedTableReaderTransactional(this,index.scanIndex(),matcher);
556      }
557      
558      /**
559       * Returns a tuplestream containing those rows in the given list that matches the given RowMatcher
560       */
561      public TupleStreamTransactional getRows(List<String> rows,RowMatcherTransactional matcher) throws IOException{
562           return new IndexedTableReaderTransactional(this,index.scanIndex(rows),matcher);
563       }
564
565      /**
566       * Returns a tuplestream of all rows in this table.
567       */
568      public TupleStreamTransactional getRows() throws IOException{
569          // return new TableReader(this);
570          return new IndexedTableReaderTransactional(this,index.scanIndex());
571      }
572      
573      /**
574       * Returns a single row stored in this table.
575       * If the row does not exist in the table, null will be returned.
576       */
577      public RowTransactional getRow(String id) throws IOException{
578          TableIndexEntryTransactional entry=null;
579           // Handle index entry caching
580           if(INDEXCACHESIZE>0){
581               synchronized(indexcache){
582                   entry=getCacheEntry(id);
583                    if(entry==null){
584                        entry=index.scanIndex(id);
585                        if(entry!=null){
586                            addCacheEntry(entry);
587                        }
588                    }
589               }
590           }else{
591               entry=index.scanIndex(id);
592           }
593           if(entry!=null){
594               long dataoffset=0;
595               DataInputStream data=null;
596               if(entry.getLocation()==Table.FILEA){
597                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEA))));
598               }else if(entry.getLocation()==Table.FILEB){
599                   data=new DataInputStream(new BufferedInputStream(new FileInputStream(getFileName(Table.FILEB))));
600               }
601               if(data!=null){
602                   while(dataoffset!=entry.getPosition()){
603                       dataoffset+=data.skipBytes((int)(entry.getPosition()-dataoffset));
604                   }
605                   RowTransactional row=RowTransactional.readFromStream(data);
606                   data.close();
607                   synchronized(statlock){
608                        atomicfields.setstat_read(atomicfields.getstat_read()+1);
609                        atomicfields.setstat_read_size(atomicfields.getstat_read_size()+row.getSize());
610                   }
611                   return row;
612               }
613               
614           }
615           return null;
616      }
617  }