helpful progress reporting
[IRC.git] / Robust / src / Benchmarks / MapReduce / Nor / MapWorker.java
1 public class MapWorker {
2     flag map;
3     flag partition;
4     flag mapoutput;
5
6     int ID;
7     int r;
8     String key;
9     String value;
10     OutputCollector output;
11     String locationPrefix;
12     boolean[] outputsexit;
13
14     public MapWorker(String key, String value, int r, int id) {
15         this.ID = id;
16         this.r = r;
17
18         this.key = key;
19         this.value = value;
20         this.output = new OutputCollector();
21         this.locationPrefix = "/scratch/mapreduce_nor/output-intermediate-map-";
22         
23         this.outputsexit = new boolean[r];
24     }
25
26     public void map() {
27         MapReduceBase.map(key, value, output);
28         this.key = null;
29         this.value = null;
30     }
31
32     public void partition() {
33         FileOutputStream[] outputs = new FileOutputStream[r];
34         for(int i = 0; i < r; ++i) {
35             outputs[i] = null;
36         }
37         
38         int size = this.output.size();
39         for(int i = 0; i < size; ++i) {
40             String key = this.output.getKey(i);
41             String value = this.output.getValue(i);
42             // use the hashcode of key to decide which intermediate output
43             // this pair should be in
44             int index = (int)Math.abs(key.hashCode()) % this.r;
45             FileOutputStream oStream = outputs[index];
46             if(oStream == null) {
47                 // open the file
48                 String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
49                 //System.printString("partition: " + filepath + "\n");
50                 oStream = new FileOutputStream(filepath, true); // append
51                 outputs[index] = oStream;
52                 this.outputsexit[index] = true;
53             }
54             // format: key value\n
55             oStream.write(key.getBytes());
56             oStream.write(' ');
57             oStream.write(value.getBytes());
58             oStream.write('\n');
59             oStream.flush();
60         }
61
62         // close the output files
63         for(int i = 0; i < outputs.length; ++i) {
64             FileOutputStream temp = outputs[i];
65             if(temp != null) {
66                 temp.close();
67                 outputs[i] = null;
68             }
69         }
70         
71         this.output = null;
72     }
73
74     public String outputFile(int i) {
75         if(outputsexit[i]) {
76             StringBuffer temp = new StringBuffer(this.locationPrefix);
77             temp.append(String.valueOf(ID));
78             temp.append("-of-");
79             temp.append(String.valueOf(r));
80             temp.append("_");
81             temp.append(String.valueOf(i));
82             temp.append(".dat");
83             return new String(temp);
84         } else {
85             return null;
86         }
87     }
88
89     public int getID() {
90         return this.ID;
91     }
92
93     public int getR() {
94         return this.r;
95     }
96
97 }