start of new file
[IRC.git] / Robust / src / Benchmarks / MapReduce / Java / ReduceWorker.java
1 //package mapreduce;
2
3 /*import java.io.FileInputStream;
4 import java.io.FileOutputStream;
5 import java.io.IOException;
6 import java.util.HashMap;
7 import java.util.Vector;*/
8
9 import mapreduce.MapReduceBase;
10
11 public class ReduceWorker {
12
13     int ID;
14     MapReduceBase mapreducer;
15  
16     Vector interoutputs;  // string vector containing paths
17                           // of intermediate outputs from map worker
18     Vector keys;
19     HashMap values; // hashmap map key to vector of string vector
20     int[] sorts; // array record the sort of keys
21     OutputCollector output;
22     String outputfile;  // path of the intermediate output file
23
24     public ReduceWorker(Vector interoutputs, int id) {
25         this.ID = id;
26         this.mapreducer = null;
27         
28         this.interoutputs = interoutputs;
29
30         this.keys = new Vector();
31         this.values = new HashMap();
32         //this.sorts = null;
33
34         this.output = new OutputCollector();
35         this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
36     }
37
38     public MapReduceBase getMapreducer() {
39         return mapreducer;
40     }
41
42     public void setMapreducer(MapReduceBase mapreducer) {
43         this.mapreducer = mapreducer;
44     }
45
46     public void sortgroup() {
47         /*if(ID % 2 == 1) {
48             int a[] = new int[1];
49             int temp = a[1];
50         }*/
51         
52         // group values associated to the same key
53         //System.printString("================================\n");
54         if(interoutputs == null) {
55             return;
56         }
57         //try{
58             for(int i = 0; i < interoutputs.size(); ++i) {
59                 FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
60                 byte[] b = new byte[1024 * 10];
61                 int length = iStream.read(b);
62                 if(length < 0) {
63                     System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
64                     System.exit(-1);
65                 }
66                 String content = new String(b, 0, length);
67                 //System.printString(content + "\n");
68                 int index = content.indexOf('\n');
69                 while(index != -1) {
70                     String line = content.substring(0, index);
71                     content = content.substring(index + 1);
72                     //System.printString(line + "\n");
73                     int tmpindex = line.indexOf(' ');
74                     String key = line.substring(0, tmpindex);
75                     String value = line.substring(tmpindex + 1);
76                     //System.printString(key + "; " + value + "\n");
77                     if(!this.values.containsKey(key)) {
78                         this.values.put(key, new Vector());
79                         this.keys.addElement(key);
80                     }
81                     ((Vector)this.values.get(key)).addElement(value);
82                     index = content.indexOf('\n');
83                 }
84                 iStream.close();
85             }
86             //System.printString("================================\n");
87
88             /*for(int i = 0; i < this.keys.size(); ++i) {
89                         System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
90                 }
91                 System.printString("\n");*/
92
93             // sort all the keys inside interoutputs
94             this.sorts = new int[this.keys.size()];
95             // insert sorting
96             this.sorts[0] = 0;
97             int tosort = 1;
98             for(; tosort < this.keys.size(); ++tosort) {
99                 int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
100                 int index = tosort;
101                 for(int i = tosort; i > 0; --i) {
102                     if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
103                         this.sorts[i] = this.sorts[i-1];
104                         index = i - 1;
105                     } else {
106                         //System.printString(i + "; " + tosort + "\n");
107                         index = i;
108                         i = 0;
109                     }
110                 }
111                 this.sorts[index] = tosort;
112             }
113             /*for(int i = 0; i < this.sorts.length; ++i) {
114                         System.printString(this.sorts[i] + "; ");
115                 }
116                 System.printString("\n");*/
117         /*} catch(IOException e) {
118             e.printStackTrace();
119             System.exit(-1);
120         }*/
121     }
122
123     public void reduce() {
124         /*if(ID % 2 == 1) {
125             int a[] = new int[1];
126             int temp = a[1];
127         }*/
128         
129         if(this.interoutputs != null) {
130             //return;
131         //}
132         for(int i = 0; i < this.sorts.length; ++i) {
133             String key = (String)this.keys.elementAt(this.sorts[i]);
134             Vector values = (Vector)this.values.get(key);
135             this.mapreducer.reduce(key, values, output);
136         }
137         }
138
139         //try{
140             // output all the result into some local file
141             int size = this.output.size();
142             FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
143             for(int i = 0; i < size; ++i) {
144                 String key = this.output.getKey(i);
145                 String value = this.output.getValue(i);
146                 // format: key value\n
147                 oStream.write(key.getBytes());
148                 oStream.write(' ');
149                 oStream.write(value.getBytes());
150                 oStream.write('\n');
151                 oStream.flush();
152             }
153             oStream.close();
154         /*} catch(Exception e) {
155             e.printStackTrace();
156             System.exit(-1);
157         }*/
158     }
159
160     public String getOutputFile() {
161         return this.outputfile;
162     }
163
164     public int getID() {
165         return this.ID;
166     }
167 }