start of new file
[IRC.git] / Robust / src / Benchmarks / MapReduce / Nor / ReduceWorker.java
1 public class ReduceWorker {
2     flag sortgroup;
3     flag reduce;
4     flag reduceoutput;
5
6     int ID;
7     Vector interoutputs;  // string vector containing paths
8     // of intermediate outputs from map worker
9     Vector keys;
10     HashMap values; // hashmap map key to vector of string vector
11     int[] sorts; // array record the sort of keys
12     OutputCollector output;
13     String outputfile;  // path of the intermediate output file
14
15     public ReduceWorker(Vector interoutputs, int id) {
16         this.ID = id;
17         this.interoutputs = interoutputs;
18
19         this.keys = new Vector();
20         this.values = new HashMap();
21         //this.sorts = null;
22
23         this.output = new OutputCollector();
24         this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
25     }
26
27     public void sortgroup() {
28         /*if(ID % 2 == 1) {
29                 int a[] = new int[1];
30                 int temp = a[1];
31         }*/
32
33         // group values associated to the same key
34         //System.printString("================================\n");
35         if(interoutputs == null) {
36             return;
37         }
38         for(int i = 0; i < interoutputs.size(); ++i) {
39             FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
40             byte[] b = new byte[1024 * 10];
41             int length = iStream.read(b);
42             if(length < 0) {
43                 System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
44                 System.exit(-1);
45             }
46             String content = new String(b, 0, length);
47             //System.printString(content + "\n");
48             int index = content.indexOf('\n');
49             while(index != -1) {
50                 String line = content.subString(0, index);
51                 content = content.subString(index + 1);
52                 //System.printString(line + "\n");
53                 int tmpindex = line.indexOf(' ');
54                 String key = line.subString(0, tmpindex);
55                 String value = line.subString(tmpindex + 1);
56                 //System.printString(key + "; " + value + "\n");
57                 if(!this.values.containsKey(key)) {
58                     this.values.put(key, new Vector());
59                     this.keys.addElement(key);
60                 }
61                 ((Vector)this.values.get(key)).addElement(value);
62                 index = content.indexOf('\n');
63             }
64             iStream.close();
65         }
66         //System.printString("================================\n");
67
68         /*for(int i = 0; i < this.keys.size(); ++i) {
69                         System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
70                 }
71                 System.printString("\n");*/
72
73         // sort all the keys inside interoutputs
74         this.sorts = new int[this.keys.size()];
75         // insert sorting
76         this.sorts[0] = 0;
77         int tosort = 1;
78         for(; tosort < this.keys.size(); ++tosort) {
79             int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
80             int index = tosort;
81             for(int i = tosort; i > 0; --i) {
82                 if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
83                     this.sorts[i] = this.sorts[i-1];
84                     index = i - 1;
85                 } else {
86                     //System.printString(i + "; " + tosort + "\n");
87                     index = i;
88                     i = 0;
89                 }
90             }
91             this.sorts[index] = tosort;
92         }
93         /*for(int i = 0; i < this.sorts.length; ++i) {
94                         System.printString(this.sorts[i] + "; ");
95                 }
96                 System.printString("\n");*/
97     }
98
99     public void reduce() {
100         /*if(ID % 2 == 1) {
101                 int a[] = new int[1];
102                 int temp = a[1];
103         }*/
104
105         if(this.interoutputs != null) {
106            // return;
107         //}
108         for(int i = 0; i < this.sorts.length; ++i) {
109             String key = (String)this.keys.elementAt(this.sorts[i]);
110             Vector values = (Vector)this.values.get(key);
111             MapReduceBase.reduce(key, values, output);
112         }
113         }
114
115         // output all the result into some local file
116         int size = this.output.size();
117         FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
118         for(int i = 0; i < size; ++i) {
119             String key = this.output.getKey(i);
120             String value = this.output.getValue(i);
121             // format: key value\n
122             oStream.write(key.getBytes());
123             oStream.write(' ');
124             oStream.write(value.getBytes());
125             oStream.write('\n');
126             oStream.flush();
127         }
128         oStream.close();
129     }
130
131     public String getOutputFile() {
132         return this.outputfile;
133     }
134
135     public int getID() {
136         return this.ID;
137     }
138 }