60407fb358c474a2d2de5ed078a11c67fad52a98
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / MapWorker.java
1 public class MapWorker {
2     flag map;
3     flag partition;
4     flag mapoutput;
5
6     int ID;
7
8     int r;
9     String key;
10     String value;
11     OutputCollector output;
12
13     String[] locations;
14     FileOutputStream[] outputs;
15
16     public MapWorker(String key, String value, int r, int id) {
17         this.ID = id;
18         this.r = r;
19
20         this.key = key;
21         this.value = value;
22         this.output = new OutputCollector();
23
24         locations = new String[r];
25         for(int i = 0; i < r; ++i) {
26             StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
27             temp.append(String.valueOf(ID));
28             temp.append("-of-");
29             temp.append(String.valueOf(r));
30             temp.append("_");
31             temp.append(String.valueOf(i));
32             temp.append(".dat");
33             locations[i] = new String(temp);
34         }
35
36         outputs = new FileOutputStream[r];
37         for(int i = 0; i < r; ++i) {
38             outputs[i] = null;
39         }
40     }
41
42     public void map() {
43         /*if(ID % 2 == 1) {
44                 String temp = locations[locations.length];
45         }*/
46
47         MapReduceBase.map(key, value, output);
48     }
49
50     public void partition() {
51         /*if(ID % 2 == 1) {
52                 String temp = locations[locations.length];
53         }*/
54
55         int size = this.output.size();
56         for(int i = 0; i < size; ++i) {
57             String key = this.output.getKey(i);
58             String value = this.output.getValue(i);
59             // use the hashcode of key to decide which intermediate output
60             // this pair should be in
61             int index = (int)Math.abs(key.hashCode()) % this.r;
62             FileOutputStream oStream = outputs[index];
63             if(oStream == null) {
64                 // open the file
65                 String filepath = locations[index];
66                 oStream = new FileOutputStream(filepath, true); // append
67                 outputs[index] = oStream;
68             }
69             // format: key value\n
70             oStream.write(key.getBytes());
71             oStream.write(' ');
72             oStream.write(value.getBytes());
73             oStream.write('\n');
74             oStream.flush();
75         }
76
77         // close the output files
78         for(int i = 0; i < this.outputs.length; ++i) {
79             FileOutputStream temp = this.outputs[i];
80             if(temp != null) {
81                 temp.close();
82             }
83         }
84     }
85
86     public String outputFile(int i) {
87         if(outputs[i] != null) {
88             return locations[i];
89         } else {
90             return null;
91         }
92     }
93
94     public int getID() {
95         return this.ID;
96     }
97
98     public int getR() {
99         return this.r;
100     }
101
102 }