1 public class MapWorker {
10 OutputCollector output;
11 String locationPrefix;
12 boolean[] outputsexit;
14 public MapWorker(String key, String value, int r, int id) {
20 this.output = new OutputCollector();
21 this.locationPrefix = "/scratch/mapreduce_opt/output-intermediate-map-";
23 this.outputsexit = new boolean[r];
27 MapReduceBase.map(key, value, output);
32 public void partition() {
33 FileOutputStream[] outputs = new FileOutputStream[r];
34 for(int i = 0; i < r; ++i) {
38 int size = this.output.size();
39 for(int i = 0; i < size; ++i) {
40 String key = this.output.getKey(i);
41 String value = this.output.getValue(i);
42 // use the hashcode of key to decide which intermediate output
43 // this pair should be in
44 int index = (int)Math.abs(key.hashCode()) % this.r;
45 FileOutputStream oStream = outputs[index];
48 String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
49 oStream = new FileOutputStream(filepath, true); // append
50 outputs[index] = oStream;
51 this.outputsexit[index] = true;
53 // format: key value\n
54 oStream.write(key.getBytes());
56 oStream.write(value.getBytes());
61 // close the output files
62 for(int i = 0; i < outputs.length; ++i) {
63 FileOutputStream temp = outputs[i];
73 public String outputFile(int i) {
75 StringBuffer temp = new StringBuffer(this.locationPrefix);
76 temp.append(String.valueOf(ID));
78 temp.append(String.valueOf(r));
80 temp.append(String.valueOf(i));
82 return new String(temp);