2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
34 import com.solidosystems.tuplesoup.filter.*;
38 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
39 private DataInputStream fileastream=null;
40 private DataInputStream filebstream=null;
41 private long fileaposition=0;
42 private long filebposition=0;
44 private List<TableIndexEntryTransactional>fileaentries;
45 private List<TableIndexEntryTransactional>filebentries;
47 private List<TableIndexEntryTransactional>entries;
49 private Hashtable<String,RowTransactional>fileabuffer;
50 private Hashtable<String,RowTransactional>filebbuffer;
52 private List<String>rows;
53 private int rowpointer;
54 private RowTransactional next=null;
56 private DualFileTableTransactional table;
58 private RowMatcherTransactional matcher=null;
60 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
66 fileaentries=new ArrayList<TableIndexEntryTransactional>();
67 filebentries=new ArrayList<TableIndexEntryTransactional>();
69 Iterator<TableIndexEntryTransactional> it=entries.iterator();
71 TableIndexEntryTransactional entry=it.next();
72 // TODO: we really shouldn't get nulls here
74 if(entry.getLocation()==Table.FILEA){
75 fileaentries.add(entry);
76 }else if(entry.getLocation()==Table.FILEB){
77 filebentries.add(entry);
82 Collections.sort(fileaentries);
83 Collections.sort(filebentries);
85 fileabuffer=new Hashtable<String,RowTransactional>();
86 filebbuffer=new Hashtable<String,RowTransactional>();
92 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
99 fileaentries=new ArrayList<TableIndexEntryTransactional>();
100 filebentries=new ArrayList<TableIndexEntryTransactional>();
102 Iterator<TableIndexEntryTransactional> it=entries.iterator();
104 TableIndexEntryTransactional entry=it.next();
105 // TODO: we really shouldn't get nulls here
107 if(entry.getLocation()==Table.FILEA){
108 fileaentries.add(entry);
109 }else if(entry.getLocation()==Table.FILEB){
110 filebentries.add(entry);
115 Collections.sort(fileaentries);
116 Collections.sort(filebentries);
118 fileabuffer=new Hashtable<String,RowTransactional>();
119 filebbuffer=new Hashtable<String,RowTransactional>();
124 private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
125 if(fileabuffer.containsKey(entry.getId())){
126 next=fileabuffer.remove(entry.getId());
130 if(fileaentries.size()>0){
131 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
132 if(fileastream==null){
133 fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
136 if(fileaposition>nextfilea.getPosition()){
137 // We have already read this entry... skip it
138 // readNextFromFileA(entry);
141 while(fileaposition!=nextfilea.getPosition()){
142 fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
144 RowTransactional row=RowTransactional.readFromStream(fileastream);
145 synchronized(table.statlock){
146 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
147 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
149 fileaposition+=row.getSize();
150 if(row.getId().equals(entry.getId())){
154 fileabuffer.put(row.getId(),row);
155 // readNextFromFileA(entry);
165 private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
166 if(filebbuffer.containsKey(entry.getId())){
167 next=filebbuffer.remove(entry.getId());
171 if(filebentries.size()>0){
172 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
173 if(filebstream==null){
174 filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
177 if(filebposition>nextfileb.getPosition()){
178 // We have already read this entry... skip it
179 // readNextFromFileB(entry);
182 while(filebposition!=nextfileb.getPosition()){
183 filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
185 RowTransactional row=RowTransactional.readFromStream(filebstream);
186 synchronized(table.statlock){
187 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+row.getSize());
188 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
190 filebposition+=row.getSize();
191 if(row.getId().equals(entry.getId())){
195 filebbuffer.put(row.getId(),row);
196 // readNextFromFileB(entry);
206 private void readNext() throws IOException{
207 if(entries.size()>rowpointer){
208 TableIndexEntryTransactional entry=entries.get(rowpointer++);
210 switch(entry.getLocation()){
211 case Table.FILEA : readNextFromFileA(entry);
214 case Table.FILEB : readNextFromFileB(entry);
220 if(!matcher.matches(next)){
232 if(fileastream!=null)fileastream.close();
233 }catch(Exception e){}
235 if(filebstream!=null)filebstream.close();
236 }catch(Exception e){}
240 public boolean hasNext(){
241 if(next!=null)return true;
245 public RowTransactional next(){
248 RowTransactional tmp=next;
258 public void remove(){