start of new file
[IRC.git] / Robust / src / Benchmarks / Scheduling / MapReduce / MapWorker.java
1 public class MapWorker {
2     flag map;
3     flag partition;
4     flag mapoutput;
5
6     int ID;
7
8     int r;
9     String key;
10     String value;
11     OutputCollector output;
12
13     //String[] locations;
14     OutputCollector[] outputs;
15
16     public MapWorker(String key, String value, int r, int id) {
17         this.ID = id;
18         this.r = r;
19
20         this.key = key;
21         this.value = value;
22         this.output = new OutputCollector();
23
24         /*locations = new String[r];
25         for(int i = 0; i < r; ++i) {
26             StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-");
27             temp.append(String.valueOf(ID));
28             temp.append("-of-");
29             temp.append(String.valueOf(r));
30             temp.append("_");
31             temp.append(String.valueOf(i));
32             temp.append(".dat");
33             locations[i] = new String(temp);
34         }*/
35
36         outputs = new OutputCollector[r];
37         for(int i = 0; i < r; ++i) {
38             outputs[i] = null;
39         }
40     }
41
42     public void map() {
43         MapReduceBase.map(key, value, output);
44     }
45
46     public void partition() {
47         int size = this.output.size();
48         for(int i = 0; i < size; ++i) {
49             String key = this.output.getKey(i);
50             String value = this.output.getValue(i);
51             // use the hashcode of key to decide which intermediate output
52             // this pair should be in
53             int index = (int)Math.abs(key.hashCode()) % this.r;
54             OutputCollector oStream = outputs[index];
55             if(oStream == null) {
56                 // open the file
57                 oStream = new OutputCollector(); // append
58                 outputs[index] = oStream;
59             }
60             oStream.emit(key, "1");
61         }
62     }
63
64     public OutputCollector outputFile(int i) {
65         return outputs[i];
66     }
67
68     public int getID() {
69         return this.ID;
70     }
71
72     public int getR() {
73         return this.r;
74     }
75
76 }