changes to build script to increase java heap memory
[IRC.git] / Robust / src / Benchmarks / Scheduling / MapReduce / ReduceWorker.java
1 public class ReduceWorker {
2     flag sortgroup;
3     flag reduce;
4     flag reduceoutput;
5
6     int ID;
7     Vector interoutputs;  // string vector containing paths
8     // of intermediate outputs from map worker
9     Vector keys;
10     HashMap values; // hashmap map key to vector of string vector
11     int[] sorts; // array record the sort of keys
12     OutputCollector output;
13     //String outputfile;  // path of the intermediate output file
14
15     public ReduceWorker(Vector interoutputs, int id) {
16         this.ID = id;
17         this.interoutputs = interoutputs;
18
19         this.keys = new Vector();
20         this.values = new HashMap();
21         //this.sorts = null;
22
23         this.output = new OutputCollector();
24         //this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
25     }
26
27     public void sortgroup() {
28         // group values associated to the same key
29         //System.printString("================================\n");
30         if(interoutputs == null) {
31             return;
32         }
33         for(int i = 0; i < interoutputs.size(); ++i) {
34             OutputCollector tmpout = (OutputCollector)interoutputs.elementAt(i);
35             int size = tmpout.size();
36             for(int j= 0; j < size; ++j) {
37                 String key = tmpout.getKey(j);
38                 String value = tmpout.getValue(j);
39                 if(!this.values.containsKey(key)) {
40                     this.values.put(key, new Vector());
41                     this.keys.addElement(key);
42                 }
43                 ((Vector)this.values.get(key)).addElement(value);
44             }
45         }
46         //System.printString("================================\n");
47
48         // sort all the keys inside interoutputs
49         this.sorts = new int[this.keys.size()];
50         // insert sorting
51         this.sorts[0] = 0;
52         int tosort = 1;
53         for(; tosort < this.keys.size(); ++tosort) {
54             int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
55             int index = tosort;
56             for(int i = tosort; i > 0; --i) {
57                 if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
58                     this.sorts[i] = this.sorts[i-1];
59                     index = i - 1;
60                 } else {
61                     //System.printString(i + "; " + tosort + "\n");
62                     index = i;
63                     i = 0;
64                 }
65             }
66             this.sorts[index] = tosort;
67         }
68     }
69
70     public void reduce() {
71         if(this.interoutputs != null) {
72             for(int i = 0; i < this.sorts.length; ++i) {
73                 String key = (String)this.keys.elementAt(this.sorts[i]);
74                 Vector values = (Vector)this.values.get(key);
75                 MapReduceBase.reduce(key, values, this.output);
76             }
77         }
78
79         // output all the result into some local file
80         /*int size = this.output.size();
81         FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
82         for(int i = 0; i < size; ++i) {
83             String key = this.output.getKey(i);
84             String value = this.output.getValue(i);
85             // format: key value\n
86             oStream.write(key.getBytes());
87             oStream.write(' ');
88             oStream.write(value.getBytes());
89             oStream.write('\n');
90             oStream.flush();
91         }
92         oStream.close();*/
93     }
94
95     /*public String getOutputFile() {
96         return this.outputfile;
97     }*/
98     
99     public OutputCollector getOutput() {
100         return this.output;
101     }
102
103     public int getID() {
104         return this.ID;
105     }
106 }