1 public class ReduceWorker {
7 Vector interoutputs; // string vector containing paths
8 // of intermediate outputs from map worker
10 HashMap values; // hashmap map key to vector of string vector
11 int[] sorts; // array record the sort of keys
12 OutputCollector output;
13 //String outputfile; // path of the intermediate output file
15 public ReduceWorker(Vector interoutputs, int id) {
17 this.interoutputs = interoutputs;
19 this.keys = new Vector();
20 this.values = new HashMap();
23 this.output = new OutputCollector();
24 //this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
27 public void sortgroup() {
28 // group values associated to the same key
29 //System.printString("================================\n");
30 if(interoutputs == null) {
33 for(int i = 0; i < interoutputs.size(); ++i) {
34 OutputCollector tmpout = (OutputCollector)interoutputs.elementAt(i);
35 int size = tmpout.size();
36 for(int j= 0; j < size; ++j) {
37 String key = tmpout.getKey(j);
38 String value = tmpout.getValue(j);
39 if(!this.values.containsKey(key)) {
40 this.values.put(key, new Vector());
41 this.keys.addElement(key);
43 ((Vector)this.values.get(key)).addElement(value);
46 //System.printString("================================\n");
48 // sort all the keys inside interoutputs
49 this.sorts = new int[this.keys.size()];
53 for(; tosort < this.keys.size(); ++tosort) {
54 int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
56 for(int i = tosort; i > 0; --i) {
57 if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
58 this.sorts[i] = this.sorts[i-1];
61 //System.printString(i + "; " + tosort + "\n");
66 this.sorts[index] = tosort;
70 public void reduce() {
71 if(this.interoutputs != null) {
72 for(int i = 0; i < this.sorts.length; ++i) {
73 String key = (String)this.keys.elementAt(this.sorts[i]);
74 Vector values = (Vector)this.values.get(key);
75 MapReduceBase.reduce(key, values, this.output);
79 // output all the result into some local file
80 /*int size = this.output.size();
81 FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
82 for(int i = 0; i < size; ++i) {
83 String key = this.output.getKey(i);
84 String value = this.output.getValue(i);
85 // format: key value\n
86 oStream.write(key.getBytes());
88 oStream.write(value.getBytes());
95 /*public String getOutputFile() {
96 return this.outputfile;
99 public OutputCollector getOutput() {