2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
34 import com.solidosystems.tuplesoup.filter.*;
37 import java.util.concurrent.Callable;
40 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
41 private DataInputStream fileastream=null;
42 private DataInputStream filebstream=null;
43 private long fileaposition=0;
44 private long filebposition=0;
46 private List<TableIndexEntryTransactional>fileaentries;
47 private List<TableIndexEntryTransactional>filebentries;
49 private List<TableIndexEntryTransactional>entries;
51 private Hashtable<String,RowTransactional>fileabuffer;
52 private Hashtable<String,RowTransactional>filebbuffer;
54 private List<String>rows;
55 private int rowpointer;
56 private RowTransactional next=null;
58 private DualFileTableTransactional table;
60 private RowMatcherTransactional matcher=null;
62 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
68 fileaentries=new ArrayList<TableIndexEntryTransactional>();
69 filebentries=new ArrayList<TableIndexEntryTransactional>();
71 Iterator<TableIndexEntryTransactional> it=entries.iterator();
73 TableIndexEntryTransactional entry=it.next();
74 // TODO: we really shouldn't get nulls here
76 if(entry.getLocation()==Table.FILEA){
77 fileaentries.add(entry);
78 }else if(entry.getLocation()==Table.FILEB){
79 filebentries.add(entry);
84 Collections.sort(fileaentries);
85 Collections.sort(filebentries);
87 fileabuffer=new Hashtable<String,RowTransactional>();
88 filebbuffer=new Hashtable<String,RowTransactional>();
94 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
100 this.entries=entries;
101 fileaentries=new ArrayList<TableIndexEntryTransactional>();
102 filebentries=new ArrayList<TableIndexEntryTransactional>();
104 Iterator<TableIndexEntryTransactional> it=entries.iterator();
106 TableIndexEntryTransactional entry=it.next();
107 // TODO: we really shouldn't get nulls here
109 if(entry.getLocation()==Table.FILEA){
110 fileaentries.add(entry);
111 }else if(entry.getLocation()==Table.FILEB){
112 filebentries.add(entry);
117 Collections.sort(fileaentries);
118 Collections.sort(filebentries);
120 fileabuffer=new Hashtable<String,RowTransactional>();
121 filebbuffer=new Hashtable<String,RowTransactional>();
126 private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
127 if(fileabuffer.containsKey(entry.getId())){
128 next=fileabuffer.remove(entry.getId());
132 if(fileaentries.size()>0){
133 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
134 if(fileastream==null){
135 fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
138 if(fileaposition>nextfilea.getPosition()){
139 // We have already read this entry... skip it
140 // readNextFromFileA(entry);
143 while(fileaposition!=nextfilea.getPosition()){
144 fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
147 RowTransactional row=RowTransactional.readFromStream(fileastream);
148 final Vector args = new Vector();
150 Thread.doIt(new Callable<Boolean>() {
151 public Boolean call() throws Exception{
152 //synchronized(table.statlock){
153 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
154 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
160 fileaposition+=row.getSize();
161 if(row.getId().equals(entry.getId())){
165 fileabuffer.put(row.getId(),row);
166 // readNextFromFileA(entry);
176 private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
177 if(filebbuffer.containsKey(entry.getId())){
178 next=filebbuffer.remove(entry.getId());
182 if(filebentries.size()>0){
183 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
184 if(filebstream==null){
185 filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
188 if(filebposition>nextfileb.getPosition()){
189 // We have already read this entry... skip it
190 // readNextFromFileB(entry);
193 while(filebposition!=nextfileb.getPosition()){
194 filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
196 RowTransactional row=RowTransactional.readFromStream(filebstream);
198 final Vector args = new Vector();
200 Thread.doIt(new Callable<Boolean>() {
201 public Boolean call() throws Exception{
202 //synchronized(table.statlock){
203 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
204 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
209 filebposition+=row.getSize();
210 if(row.getId().equals(entry.getId())){
214 filebbuffer.put(row.getId(),row);
215 // readNextFromFileB(entry);
225 private void readNext() throws IOException{
226 if(entries.size()>rowpointer){
227 TableIndexEntryTransactional entry=entries.get(rowpointer++);
229 switch(entry.getLocation()){
230 case Table.FILEA : readNextFromFileA(entry);
233 case Table.FILEB : readNextFromFileB(entry);
239 if(!matcher.matches(next)){
251 if(fileastream!=null)fileastream.close();
252 }catch(Exception e){}
254 if(filebstream!=null)filebstream.close();
255 }catch(Exception e){}
259 public boolean hasNext(){
260 if(next!=null)return true;
264 public RowTransactional next(){
267 RowTransactional tmp=next;
277 public void remove(){