2 * Copyright (c) 2007, Solido Systems
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * Redistributions of source code must retain the above copyright notice, this
9 * list of conditions and the following disclaimer.
11 * Redistributions in binary form must reproduce the above copyright notice,
12 * this list of conditions and the following disclaimer in the documentation
13 * and/or other materials provided with the distribution.
15 * Neither the name of Solido Systems nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 package com.solidosystems.tuplesoup.core;
34 import com.solidosystems.tuplesoup.filter.*;
37 import java.util.concurrent.Callable;
39 import dstm2.factory.Factory;
41 public class IndexedTableReaderTransactional extends TupleStreamTransactional{
44 private DataInputStream fileastream=null;
45 private DataInputStream filebstream=null;
46 private long fileaposition=0;
47 private long filebposition=0;
49 private List<TableIndexEntryTransactional>fileaentries;
50 private List<TableIndexEntryTransactional>filebentries;
52 private List<TableIndexEntryTransactional>entries;
54 private Hashtable<String,RowTransactional>fileabuffer;
55 private Hashtable<String,RowTransactional>filebbuffer;
57 private List<String>rows;
58 private int rowpointer;
59 private RowTransactional next=null;
61 private DualFileTableTransactional table;
63 private RowMatcherTransactional matcher=null;
65 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries) throws IOException{
71 fileaentries=new ArrayList<TableIndexEntryTransactional>();
72 filebentries=new ArrayList<TableIndexEntryTransactional>();
74 Iterator<TableIndexEntryTransactional> it=entries.iterator();
76 TableIndexEntryTransactional entry=it.next();
77 // TODO: we really shouldn't get nulls here
79 if(entry.getLocation()==Table.FILEA){
80 fileaentries.add(entry);
81 }else if(entry.getLocation()==Table.FILEB){
82 filebentries.add(entry);
87 Collections.sort(fileaentries);
88 Collections.sort(filebentries);
90 fileabuffer=new Hashtable<String,RowTransactional>();
91 filebbuffer=new Hashtable<String,RowTransactional>();
97 public IndexedTableReaderTransactional(DualFileTableTransactional table,List<TableIndexEntryTransactional>entries,RowMatcherTransactional matcher) throws IOException{
101 this.matcher=matcher;
103 this.entries=entries;
104 fileaentries=new ArrayList<TableIndexEntryTransactional>();
105 filebentries=new ArrayList<TableIndexEntryTransactional>();
107 Iterator<TableIndexEntryTransactional> it=entries.iterator();
109 TableIndexEntryTransactional entry=it.next();
110 // TODO: we really shouldn't get nulls here
112 if(entry.getLocation()==Table.FILEA){
113 fileaentries.add(entry);
114 }else if(entry.getLocation()==Table.FILEB){
115 filebentries.add(entry);
120 Collections.sort(fileaentries);
121 Collections.sort(filebentries);
123 fileabuffer=new Hashtable<String,RowTransactional>();
124 filebbuffer=new Hashtable<String,RowTransactional>();
129 private void readNextFromFileA(TableIndexEntryTransactional entry) throws IOException{
130 if(fileabuffer.containsKey(entry.getId())){
131 next=fileabuffer.remove(entry.getId());
135 if(fileaentries.size()>0){
136 TableIndexEntryTransactional nextfilea=fileaentries.remove(0);
137 if(fileastream==null){
138 fileastream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEA))));
141 if(fileaposition>nextfilea.getPosition()){
142 // We have already read this entry... skip it
143 // readNextFromFileA(entry);
146 while(fileaposition!=nextfilea.getPosition()){
147 fileaposition+=fileastream.skipBytes((int)(nextfilea.getPosition()-fileaposition));
150 RowTransactional row=RowTransactional.readFromStream(fileastream);
151 final Vector args = new Vector();
153 Thread.doIt(new Callable<Boolean>() {
154 public Boolean call() throws Exception{
155 //synchronized(table.statlock){
156 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
157 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
163 fileaposition+=row.getSize();
164 if(row.getId().equals(entry.getId())){
168 fileabuffer.put(row.getId(),row);
169 // readNextFromFileA(entry);
179 private void readNextFromFileB(TableIndexEntryTransactional entry) throws IOException{
180 if(filebbuffer.containsKey(entry.getId())){
181 next=filebbuffer.remove(entry.getId());
185 if(filebentries.size()>0){
186 TableIndexEntryTransactional nextfileb=filebentries.remove(0);
187 if(filebstream==null){
188 filebstream=new DataInputStream(new BufferedInputStream(new FileInputStream(table.getFileName(Table.FILEB))));
191 if(filebposition>nextfileb.getPosition()){
192 // We have already read this entry... skip it
193 // readNextFromFileB(entry);
196 while(filebposition!=nextfileb.getPosition()){
197 filebposition+=filebstream.skipBytes((int)(nextfileb.getPosition()-filebposition));
199 RowTransactional row=RowTransactional.readFromStream(filebstream);
201 final Vector args = new Vector();
203 Thread.doIt(new Callable<Boolean>() {
204 public Boolean call() throws Exception{
205 //synchronized(table.statlock){
206 table.atomicfields.setstat_read_size(table.atomicfields.getstat_read_size()+((RowTransactional)(args.get(0))).getSize());
207 table.atomicfields.setstat_read(table.atomicfields.getstat_read()+1);
212 filebposition+=row.getSize();
213 if(row.getId().equals(entry.getId())){
217 filebbuffer.put(row.getId(),row);
218 // readNextFromFileB(entry);
228 private void readNext() throws IOException{
229 if(entries.size()>rowpointer){
230 TableIndexEntryTransactional entry=entries.get(rowpointer++);
232 switch(entry.getLocation()){
233 case Table.FILEA : readNextFromFileA(entry);
236 case Table.FILEB : readNextFromFileB(entry);
242 if(!matcher.matches(next)){
254 if(fileastream!=null)fileastream.close();
255 }catch(Exception e){}
257 if(filebstream!=null)filebstream.close();
258 }catch(Exception e){}
262 public boolean hasNext(){
263 if(next!=null)return true;
267 public RowTransactional next(){
270 RowTransactional tmp=next;
280 public void remove(){