2a0388075a2f53a90fc7b5bd3805a669c95817d2
[IRC.git] /
1 /*
2  * Copyright (c) 2007, Solido Systems
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  * Redistributions of source code must retain the above copyright notice, this
9  * list of conditions and the following disclaimer.
10  *
11  * Redistributions in binary form must reproduce the above copyright notice,
12  * this list of conditions and the following disclaimer in the documentation
13  * and/or other materials provided with the distribution.
14  *
15  * Neither the name of Solido Systems nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31  
32  package com.solidosystems.tuplesoup.core;
33  
34  import com.solidosystems.tuplesoup.filter.*;
35  import java.io.*;
36  import java.util.*;
37
38 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
39     private DataInputStream fileastream=null;
40     private DataInputStream filebstream=null;
41     private long fileaposition=0;
42     private long filebposition=0;
43
44     private List<TableIndexEntryTransactional>fileaentries;
45     private List<TableIndexEntryTransactional>filebentries;
46     
47     private List<TableIndexEntryTransactional>entries;
48
49     private Hashtable<String,RowTransactional>fileabuffer;
50     private Hashtable<String,RowTransactional>filebbuffer;
51
52     private List<String>rows;
53     private int rowpointer;
54     private RowTransactional next=null;
55     
56     private DualFileTableTransactional table;
57     
58     private RowMatcherTransactional matcher=null;
59     
60     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
61         this.table=table;
62         this.rows=rows;
63         rowpointer=0;
64         
65         this.entries=entries;
66         fileaentries=new ArrayList<TableIndexEntryTransactional>();
67         filebentries=new ArrayList<TableIndexEntryTransactional>();
68         
69         Iterator<TableIndexEntryTransactional> it=entries.iterator();
70         while(it.hasNext()){
71             TableIndexEntryTransactional entry=it.next();
72             // TODO: we really shouldn't get nulls here
73             if(entry!=null){
74                 if(entry.getLocation()==Table.FILEA){
75                     fileaentries.add(entry);
76                 }else if(entry.getLocation()==Table.FILEB){
77                     filebentries.add(entry);
78                 }
79             }
80         }
81         
82         Collections.sort(fileaentries);
83         Collections.sort(filebentries);
84         
85         fileabuffer=new Hashtable<String,RowTransactional>();
86         filebbuffer=new Hashtable<String,RowTransactional>();
87         
88         readNext();   
89     }
90     
91     
92     public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
93         this.table=table;
94         this.rows=rows;
95         rowpointer=0;
96         this.matcher=matcher;
97         
98         this.entries=entries;
99         fileaentries=new ArrayList<TableIndexEntryTransactional>();
100         filebentries=new ArrayList<TableIndexEntryTransactional>();
101         
102         Iterator<TableIndexEntryTransactional> it=entries.iterator();
103         while(it.hasNext()){
104             TableIndexEntryTransactional entry=it.next();
105             // TODO: we really shouldn't get nulls here
106             if(entry!=null){
107                 if(entry.getLocation()==Table.FILEA){
108                     fileaentries.add(entry);
109                 }else if(entry.getLocation()==Table.FILEB){
110                     filebentries.add(entry);
111                 }
112             }
113         }
114         
115         Collections.sort(fileaentries);
116         Collections.sort(filebentries);
117         
118         fileabuffer=new Hashtable<String,RowTransactional>();
119         filebbuffer=new Hashtable<String,RowTransactional>();
120         
121         readNext();   
122     }
123     
124     private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
125         if(fileabuffer.containsKey(entry.getId())){
126             next=fileabuffer.remove(entry.getId());
127             return;
128         }
129         while(true){
130             if(fileaentries.size()>0){
131                 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
132                 if(fileastream==null){
133                     fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
134                     fileaposition=0;
135                 }
136                 if(fileaposition>nextfilea.getPosition()){
137                     // We have already read this entry... skip it
138                     // readNextFromFileA(entry);
139                     // return;
140                 }else{
141                     while(fileaposition!=nextfilea.getPosition()){
142                         fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
143                     }
144                     RowTransactional row=RowTransactional.readFromStream(fileastream);
145                     synchronized(table.statlock){
146                        table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
147                        table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
148                     }
149                     fileaposition+=row.getSize();
150                     if(row.getId().equals(entry.getId())){
151                         next=row;
152                         return;
153                     }else{
154                         fileabuffer.put(row.getId(),row);
155                         // readNextFromFileA(entry);
156                     }
157                 }
158             }else{
159                 next=null;
160                 return;
161             }
162         }
163     }
164     
165     private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
166         if(filebbuffer.containsKey(entry.getId())){
167             next=filebbuffer.remove(entry.getId());
168             return;
169         }
170         while(true){
171             if(filebentries.size()>0){
172                 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
173                 if(filebstream==null){
174                     filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
175                     filebposition=0;
176                 }
177                 if(filebposition>nextfileb.getPosition()){
178                     // We have already read this entry... skip it
179                     // readNextFromFileB(entry);
180                     // return;
181                 }else{
182                     while(filebposition!=nextfileb.getPosition()){
183                         filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
184                     }
185                     RowTransactional row=RowTransactional.readFromStream(filebstream);
186                     synchronized(table.statlock){
187                         table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
188                         table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
189                     }
190                     filebposition+=row.getSize();
191                     if(row.getId().equals(entry.getId())){
192                         next=row;
193                         return;
194                     }else{
195                         filebbuffer.put(row.getId(),row);
196                         // readNextFromFileB(entry);
197                     }
198                 }
199             }else{
200                 next=null;
201                 return;
202             }
203         }
204     }
205     
206     private void readNext() throws IOException{
207         if(entries.size()>rowpointer){
208             TableIndexEntryTransactional entry=entries.get(rowpointer++);
209             if(entry!=null){
210                    switch(entry.getLocation()){
211                     case Table.FILEA    : readNextFromFileA(entry);
212                                         // return;
213                                         break;
214                     case Table.FILEB : readNextFromFileB(entry);
215                                         // return;
216                                         break;
217                 }
218                 if(next!=null){
219                     if(matcher!=null){
220                         if(!matcher.matches(next)){
221                              readNext();
222                         }
223                     }
224                 }
225                 return;
226             }else{
227                 readNext();
228                 return;
229             }
230         }
231         try{
232             if(fileastream!=null)fileastream.close();
233         }catch(Exception e){}
234         try{
235             if(filebstream!=null)filebstream.close();
236         }catch(Exception e){}
237         next=null;
238     }
239     
240     public boolean hasNext(){
241         if(next!=null)return true;
242         return false;
243     }
244     
245     public RowTransactional next(){
246         try{
247             if(next!=null){
248                 RowTransactional tmp=next;
249                 readNext();
250                 return tmp;
251             }
252         }catch(Exception e){
253             e.printStackTrace();
254         }
255         return null;
256     }
257     
258     public void remove(){
259         
260     }
261 }