helpful progress reporting
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / MapWorker.java
1 public class MapWorker {
2     flag map;
3     flag partition;
4     flag mapoutput;
5
6     int ID;
7     int r;
8     String key;
9     String value;
10     OutputCollector output;
11     String locationPrefix;
12     boolean[] outputsexit;
13
14     public MapWorker(String key, String value, int r, int id) {
15         this.ID = id;
16         this.r = r;
17
18         this.key = key;
19         this.value = value;
20         this.output = new OutputCollector();
21         this.locationPrefix = "/scratch/mapreduce_opt/output-intermediate-map-";
22         
23         this.outputsexit = new boolean[r];
24     }
25
26     public void map() {
27         MapReduceBase.map(key, value, output);
28         this.key = null;
29         this.value = null;
30     }
31
32     public void partition() {
33         FileOutputStream[] outputs = new FileOutputStream[r];
34         for(int i = 0; i < r; ++i) {
35             outputs[i] = null;
36         }
37
38         int size = this.output.size();
39         for(int i = 0; i < size; ++i) {
40             String key = this.output.getKey(i);
41             String value = this.output.getValue(i);
42             // use the hashcode of key to decide which intermediate output
43             // this pair should be in
44             int index = (int)Math.abs(key.hashCode()) % this.r;
45             FileOutputStream oStream = outputs[index];
46             if(oStream == null) {
47                 // open the file
48                 String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
49                 oStream = new FileOutputStream(filepath, true); // append
50                 outputs[index] = oStream;
51                 this.outputsexit[index] = true;
52             }
53             // format: key value\n
54             oStream.write(key.getBytes());
55             oStream.write(' ');
56             oStream.write(value.getBytes());
57             oStream.write('\n');
58             oStream.flush();
59         }
60
61         // close the output files
62         for(int i = 0; i < outputs.length; ++i) {
63             FileOutputStream temp = outputs[i];
64             if(temp != null) {
65                 temp.close();
66                 outputs[i] = null;
67             }
68         }
69         
70         this.output = null;
71     }
72
73     public String outputFile(int i) {
74         if(outputsexit[i]) {
75             StringBuffer temp = new StringBuffer(this.locationPrefix);
76             temp.append(String.valueOf(ID));
77             temp.append("-of-");
78             temp.append(String.valueOf(r));
79             temp.append("_");
80             temp.append(String.valueOf(i));
81             temp.append(".dat");
82             return new String(temp);
83         } else {
84             return null;
85         }
86     }
87
88     public int getID() {
89         return this.ID;
90     }
91
92     public int getR() {
93         return this.r;
94     }
95
96 }