c034aa1a7bbfe48806ee2befeea6504dbdca81ae
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32  package com.solidosystems.tuplesoup.core;
33  
34  import com.solidosystems.tuplesoup.filter.*;
35  import java.io.*;
36  import java.util.*;
37  import java.util.concurrent.Callable;
38  import dstm2.Thread;
39
40 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
41     private DataInputStream fileastream=null;
42     private DataInputStream filebstream=null;
43     private long fileaposition=0;
44     private long filebposition=0;
45
46     private List<TableIndexEntryTransactional>fileaentries;
47     private List<TableIndexEntryTransactional>filebentries;
48     
49     private List<TableIndexEntryTransactional>entries;
50
51     private Hashtable<String,RowTransactional>fileabuffer;
52     private Hashtable<String,RowTransactional>filebbuffer;
53
54     private List<String>rows;
55     private int rowpointer;
56     private RowTransactional next=null;
57     
58     private DualFileTableTransactional table;
59     
60     private RowMatcherTransactional matcher=null;
61     
62     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
63         this.table=table;
64         this.rows=rows;
65         rowpointer=0;
66         
67         this.entries=entries;
68         fileaentries=new ArrayList<TableIndexEntryTransactional>();
69         filebentries=new ArrayList<TableIndexEntryTransactional>();
70         
71         Iterator<TableIndexEntryTransactional> it=entries.iterator();
72         while(it.hasNext()){
73             TableIndexEntryTransactional entry=it.next();
74             // TODO: we really shouldn't get nulls here
75             if(entry!=null){
76                 if(entry.getLocation()==Table.FILEA){
77                     fileaentries.add(entry);
78                 }else if(entry.getLocation()==Table.FILEB){
79                     filebentries.add(entry);
80                 }
81             }
82         }
83         
84         Collections.sort(fileaentries);
85         Collections.sort(filebentries);
86         
87         fileabuffer=new Hashtable<String,RowTransactional>();
88         filebbuffer=new Hashtable<String,RowTransactional>();
89         
90         readNext();   
91     }
92     
93     
94     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
95         this.table=table;
96         this.rows=rows;
97         rowpointer=0;
98         this.matcher=matcher;
99         
100         this.entries=entries;
101         fileaentries=new ArrayList<TableIndexEntryTransactional>();
102         filebentries=new ArrayList<TableIndexEntryTransactional>();
103         
104         Iterator<TableIndexEntryTransactional> it=entries.iterator();
105         while(it.hasNext()){
106             TableIndexEntryTransactional entry=it.next();
107             // TODO: we really shouldn't get nulls here
108             if(entry!=null){
109                 if(entry.getLocation()==Table.FILEA){
110                     fileaentries.add(entry);
111                 }else if(entry.getLocation()==Table.FILEB){
112                     filebentries.add(entry);
113                 }
114             }
115         }
116         
117         Collections.sort(fileaentries);
118         Collections.sort(filebentries);
119         
120         fileabuffer=new Hashtable<String,RowTransactional>();
121         filebbuffer=new Hashtable<String,RowTransactional>();
122         
123         readNext();   
124     }
125     
126     private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
127         if(fileabuffer.containsKey(entry.getId())){
128             next=fileabuffer.remove(entry.getId());
129             return;
130         }
131         while(true){
132             if(fileaentries.size()>0){
133                 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
134                 if(fileastream==null){
135                     fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
136                     fileaposition=0;
137                 }
138                 if(fileaposition>nextfilea.getPosition()){
139                     // We have already read this entry... skip it
140                     // readNextFromFileA(entry);
141                     // return;
142                 }else{
143                     while(fileaposition!=nextfilea.getPosition()){
144                         fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
145                     }
146                     
147                     RowTransactional row=RowTransactional.readFromStream(fileastream);
148                     final Vector args = new Vector();
149                     args.add(row);
150                     Thread.doIt(new Callable<Boolean>() {
151                        public Boolean call() throws Exception{
152                     //synchronized(table.statlock){
153                         table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
154                         table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
155                         return true;
156                       }
157                       //}
158                     });
159                     
160                     fileaposition+=row.getSize();
161                     if(row.getId().equals(entry.getId())){
162                         next=row;
163                         return;
164                     }else{
165                         fileabuffer.put(row.getId(),row);
166                         // readNextFromFileA(entry);
167                     }
168                 }
169             }else{
170                 next=null;
171                 return;
172             }
173         }
174     }
175     
176     private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
177         if(filebbuffer.containsKey(entry.getId())){
178             next=filebbuffer.remove(entry.getId());
179             return;
180         }
181         while(true){
182             if(filebentries.size()>0){
183                 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
184                 if(filebstream==null){
185                     filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
186                     filebposition=0;
187                 }
188                 if(filebposition>nextfileb.getPosition()){
189                     // We have already read this entry... skip it
190                     // readNextFromFileB(entry);
191                     // return;
192                 }else{
193                     while(filebposition!=nextfileb.getPosition()){
194                         filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
195                     }
196                     RowTransactional row=RowTransactional.readFromStream(filebstream);
197                     
198                     final Vector args = new Vector();
199                     args.add(row);
200                     Thread.doIt(new Callable<Boolean>() {
201                        public Boolean call() throws Exception{
202                     //synchronized(table.statlock){
203                             table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
204                             table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
205                             return true;
206                         }
207                     });
208                     
209                     filebposition+=row.getSize();
210                     if(row.getId().equals(entry.getId())){
211                         next=row;
212                         return;
213                     }else{
214                         filebbuffer.put(row.getId(),row);
215                         // readNextFromFileB(entry);
216                     }
217                 }
218             }else{
219                 next=null;
220                 return;
221             }
222         }
223     }
224     
225     private void readNext() throws IOException{
226         if(entries.size()>rowpointer){
227             TableIndexEntryTransactional entry=entries.get(rowpointer++);
228             if(entry!=null){
229                    switch(entry.getLocation()){
230                     case Table.FILEA    : readNextFromFileA(entry);
231                                         // return;
232                                         break;
233                     case Table.FILEB : readNextFromFileB(entry);
234                                         // return;
235                                         break;
236                 }
237                 if(next!=null){
238                     if(matcher!=null){
239                         if(!matcher.matches(next)){
240                              readNext();
241                         }
242                     }
243                 }
244                 return;
245             }else{
246                 readNext();
247                 return;
248             }
249         }
250         try{
251             if(fileastream!=null)fileastream.close();
252         }catch(Exception e){}
253         try{
254             if(filebstream!=null)filebstream.close();
255         }catch(Exception e){}
256         next=null;
257     }
258     
259     public boolean hasNext(){
260         if(next!=null)return true;
261         return false;
262     }
263     
264     public RowTransactional next(){
265         try{
266             if(next!=null){
267                 RowTransactional tmp=next;
268                 readNext();
269                 return tmp;
270             }
271         }catch(Exception e){
272             e.printStackTrace();
273         }
274         return null;
275     }
276     
277     public void remove(){
278         
279     }
280 }