3 /*import java.io.FileInputStream;
4 import java.io.FileOutputStream;
5 import java.io.IOException;
6 import java.util.HashMap;
7 import java.util.Vector;*/
9 import mapreduce.MapReduceBase;
11 public class ReduceWorker {
14 MapReduceBase mapreducer;
16 Vector interoutputs; // string vector containing paths
17 // of intermediate outputs from map worker
19 HashMap values; // hashmap map key to vector of string vector
20 int[] sorts; // array record the sort of keys
21 OutputCollector output;
22 String outputfile; // path of the intermediate output file
24 public ReduceWorker(Vector interoutputs, int id) {
26 this.mapreducer = null;
28 this.interoutputs = interoutputs;
30 this.keys = new Vector();
31 this.values = new HashMap();
34 this.output = new OutputCollector();
35 this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
38 public MapReduceBase getMapreducer() {
42 public void setMapreducer(MapReduceBase mapreducer) {
43 this.mapreducer = mapreducer;
46 public void sortgroup() {
52 // group values associated to the same key
53 //System.printString("================================\n");
54 if(interoutputs == null) {
58 for(int i = 0; i < interoutputs.size(); ++i) {
59 FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
60 byte[] b = new byte[1024 * 10];
61 int length = iStream.read(b);
63 System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
66 String content = new String(b, 0, length);
67 //System.printString(content + "\n");
68 int index = content.indexOf('\n');
70 String line = content.substring(0, index);
71 content = content.substring(index + 1);
72 //System.printString(line + "\n");
73 int tmpindex = line.indexOf(' ');
74 String key = line.substring(0, tmpindex);
75 String value = line.substring(tmpindex + 1);
76 //System.printString(key + "; " + value + "\n");
77 if(!this.values.containsKey(key)) {
78 this.values.put(key, new Vector());
79 this.keys.addElement(key);
81 ((Vector)this.values.get(key)).addElement(value);
82 index = content.indexOf('\n');
86 //System.printString("================================\n");
88 /*for(int i = 0; i < this.keys.size(); ++i) {
89 System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
91 System.printString("\n");*/
93 // sort all the keys inside interoutputs
94 this.sorts = new int[this.keys.size()];
98 for(; tosort < this.keys.size(); ++tosort) {
99 int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
101 for(int i = tosort; i > 0; --i) {
102 if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
103 this.sorts[i] = this.sorts[i-1];
106 //System.printString(i + "; " + tosort + "\n");
111 this.sorts[index] = tosort;
113 /*for(int i = 0; i < this.sorts.length; ++i) {
114 System.printString(this.sorts[i] + "; ");
116 System.printString("\n");*/
117 /*} catch(IOException e) {
123 public void reduce() {
125 int a[] = new int[1];
129 if(this.interoutputs != null) {
132 for(int i = 0; i < this.sorts.length; ++i) {
133 String key = (String)this.keys.elementAt(this.sorts[i]);
134 Vector values = (Vector)this.values.get(key);
135 this.mapreducer.reduce(key, values, output);
140 // output all the result into some local file
141 int size = this.output.size();
142 FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
143 for(int i = 0; i < size; ++i) {
144 String key = this.output.getKey(i);
145 String value = this.output.getValue(i);
146 // format: key value\n
147 oStream.write(key.getBytes());
149 oStream.write(value.getBytes());
154 /*} catch(Exception e) {
160 public String getOutputFile() {
161 return this.outputfile;