e9c5cad73d505ea41f8a5da3cb43046af9a74aea
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32  package com.solidosystems.tuplesoup.core;
33  
34  import com.solidosystems.tuplesoup.filter.*;
35  import java.io.*;
36  import java.util.*;
37  import java.util.concurrent.Callable;
38  import dstm2.Thread;
39 import dstm2.factory.Factory;
40
41 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
42
43     
44     private DataInputStream fileastream=null;
45     private DataInputStream filebstream=null;
46     private long fileaposition=0;
47     private long filebposition=0;
48
49     private List<TableIndexEntryTransactional>fileaentries;
50     private List<TableIndexEntryTransactional>filebentries;
51     
52     private List<TableIndexEntryTransactional>entries;
53
54     private Hashtable<String,RowTransactional>fileabuffer;
55     private Hashtable<String,RowTransactional>filebbuffer;
56
57     private List<String>rows;
58     private int rowpointer;
59     private RowTransactional next=null;
60     
61     private DualFileTableTransactional table;
62     
63     private RowMatcherTransactional matcher=null;
64     
65     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
66         this.table=table;
67         this.rows=rows;
68         rowpointer=0;
69         
70         this.entries=entries;
71         fileaentries=new ArrayList<TableIndexEntryTransactional>();
72         filebentries=new ArrayList<TableIndexEntryTransactional>();
73         
74         Iterator<TableIndexEntryTransactional> it=entries.iterator();
75         while(it.hasNext()){
76             TableIndexEntryTransactional entry=it.next();
77             // TODO: we really shouldn't get nulls here
78             if(entry!=null){
79                 if(entry.getLocation()==Table.FILEA){
80                     fileaentries.add(entry);
81                 }else if(entry.getLocation()==Table.FILEB){
82                     filebentries.add(entry);
83                 }
84             }
85         }
86         
87         Collections.sort(fileaentries);
88         Collections.sort(filebentries);
89         
90         fileabuffer=new Hashtable<String,RowTransactional>();
91         filebbuffer=new Hashtable<String,RowTransactional>();
92         
93         readNext();   
94     }
95     
96     
97     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
98         this.table=table;
99         this.rows=rows;
100         rowpointer=0;
101         this.matcher=matcher;
102         
103         this.entries=entries;
104         fileaentries=new ArrayList<TableIndexEntryTransactional>();
105         filebentries=new ArrayList<TableIndexEntryTransactional>();
106         
107         Iterator<TableIndexEntryTransactional> it=entries.iterator();
108         while(it.hasNext()){
109             TableIndexEntryTransactional entry=it.next();
110             // TODO: we really shouldn't get nulls here
111             if(entry!=null){
112                 if(entry.getLocation()==Table.FILEA){
113                     fileaentries.add(entry);
114                 }else if(entry.getLocation()==Table.FILEB){
115                     filebentries.add(entry);
116                 }
117             }
118         }
119         
120         Collections.sort(fileaentries);
121         Collections.sort(filebentries);
122         
123         fileabuffer=new Hashtable<String,RowTransactional>();
124         filebbuffer=new Hashtable<String,RowTransactional>();
125         
126         readNext();   
127     }
128     
129     private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
130         if(fileabuffer.containsKey(entry.getId())){
131             next=fileabuffer.remove(entry.getId());
132             return;
133         }
134         while(true){
135             if(fileaentries.size()>0){
136                 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
137                 if(fileastream==null){
138                     fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
139                     fileaposition=0;
140                 }
141                 if(fileaposition>nextfilea.getPosition()){
142                     // We have already read this entry... skip it
143                     // readNextFromFileA(entry);
144                     // return;
145                 }else{
146                     while(fileaposition!=nextfilea.getPosition()){
147                         fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
148                     }
149                     
150                     RowTransactional row=RowTransactional.readFromStream(fileastream);
151                     final Vector args = new Vector();
152                     args.add(row);
153                     Thread.doIt(new Callable<Boolean>() {
154                        public Boolean call() throws Exception{
155                     //synchronized(table.statlock){
156                         table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
157                         table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
158                         return true;
159                       }
160                       //}
161                     });
162                     
163                     fileaposition+=row.getSize();
164                     if(row.getId().equals(entry.getId())){
165                         next=row;
166                         return;
167                     }else{
168                         fileabuffer.put(row.getId(),row);
169                         // readNextFromFileA(entry);
170                     }
171                 }
172             }else{
173                 next=null;
174                 return;
175             }
176         }
177     }
178     
179     private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
180         if(filebbuffer.containsKey(entry.getId())){
181             next=filebbuffer.remove(entry.getId());
182             return;
183         }
184         while(true){
185             if(filebentries.size()>0){
186                 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
187                 if(filebstream==null){
188                     filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
189                     filebposition=0;
190                 }
191                 if(filebposition>nextfileb.getPosition()){
192                     // We have already read this entry... skip it
193                     // readNextFromFileB(entry);
194                     // return;
195                 }else{
196                     while(filebposition!=nextfileb.getPosition()){
197                         filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
198                     }
199                     RowTransactional row=RowTransactional.readFromStream(filebstream);
200                     
201                     final Vector args = new Vector();
202                     args.add(row);
203                     Thread.doIt(new Callable<Boolean>() {
204                        public Boolean call() throws Exception{
205                     //synchronized(table.statlock){
206                             table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
207                             table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
208                             return true;
209                         }
210                     });
211                     
212                     filebposition+=row.getSize();
213                     if(row.getId().equals(entry.getId())){
214                         next=row;
215                         return;
216                     }else{
217                         filebbuffer.put(row.getId(),row);
218                         // readNextFromFileB(entry);
219                     }
220                 }
221             }else{
222                 next=null;
223                 return;
224             }
225         }
226     }
227     
228     private void readNext() throws IOException{
229         if(entries.size()>rowpointer){
230             TableIndexEntryTransactional entry=entries.get(rowpointer++);
231             if(entry!=null){
232                    switch(entry.getLocation()){
233                     case Table.FILEA    : readNextFromFileA(entry);
234                                         // return;
235                                         break;
236                     case Table.FILEB : readNextFromFileB(entry);
237                                         // return;
238                                         break;
239                 }
240                 if(next!=null){
241                     if(matcher!=null){
242                         if(!matcher.matches(next)){
243                              readNext();
244                         }
245                     }
246                 }
247                 return;
248             }else{
249                 readNext();
250                 return;
251             }
252         }
253         try{
254             if(fileastream!=null)fileastream.close();
255         }catch(Exception e){}
256         try{
257             if(filebstream!=null)filebstream.close();
258         }catch(Exception e){}
259         next=null;
260     }
261     
262     public boolean hasNext(){
263         if(next!=null)return true;
264         return false;
265     }
266     
267     public RowTransactional next(){
268         try{
269             if(next!=null){
270                 RowTransactional tmp=next;
271                 readNext();
272                 return tmp;
273             }
274         }catch(Exception e){
275             e.printStackTrace();
276         }
277         return null;
278     }
279     
280     public void remove(){
281         
282     }
283 }