3 //import java.io.FileOutputStream;
5 public class MapWorker {
8 MapReduceBase mapreducer;
13 OutputCollector output;
16 FileOutputStream[] outputs;
18 public MapWorker(String key, String value, int r, int id) {
20 this.mapreducer = null;
25 this.output = new OutputCollector();
27 locations = new String[r];
28 for(int i = 0; i < r; ++i) {
29 StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
30 temp.append(String.valueOf(ID));
32 temp.append(String.valueOf(r));
34 temp.append(String.valueOf(i));
36 locations[i] = new String(temp);
39 outputs = new FileOutputStream[r];
40 for(int i = 0; i < r; ++i) {
45 public MapReduceBase getMapreducer() {
49 public void setMapreducer(MapReduceBase mapreducer) {
50 this.mapreducer = mapreducer;
55 String temp = locations[locations.length];
58 this.mapreducer.map(key, value, output);
61 public void partition() {
63 String temp = locations[locations.length];
67 int size = this.output.size();
68 for(int i = 0; i < size; ++i) {
69 String key = this.output.getKey(i);
70 String value = this.output.getValue(i);
71 // use the hashcode of key to decide which intermediate output
72 // this pair should be in
73 //int hash = key.hashCode();
74 int index = (int)Math.abs(key.hashCode()) % this.r;
75 FileOutputStream oStream = outputs[index];
78 String filepath = locations[index];
79 oStream = new FileOutputStream(filepath, true); // append
80 outputs[index] = oStream;
82 // format: key value\n
83 oStream.write(key.getBytes());
85 oStream.write(value.getBytes());
90 // close the output files
91 for(int i = 0; i < this.outputs.length; ++i) {
92 FileOutputStream temp = this.outputs[i];
97 /*} catch(Exception e) {
103 public String outputFile(int i) {
104 if(outputs[i] != null) {