f5b591179157034cfd5363b96c7fc1099cb7c2b3
[IRC.git] / Robust / src / Benchmarks / MapReduce / Java / MapWorker.java
1 //package mapreduce;
2
3 //import java.io.FileOutputStream;
4
5 public class MapWorker {
6
7     int ID;
8     MapReduceBase mapreducer;
9
10     int r;
11     String key;
12     String value;
13     OutputCollector output;
14
15     String[] locations;
16     FileOutputStream[] outputs;
17
18     public MapWorker(String key, String value, int r, int id) {
19         this.ID = id;
20         this.mapreducer = null;
21
22         this.r = r;
23         this.key = key;
24         this.value = value;
25         this.output = new OutputCollector();
26
27         locations = new String[r];
28         for(int i = 0; i < r; ++i) {
29             StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
30             temp.append(String.valueOf(ID));
31             temp.append("-of-");
32             temp.append(String.valueOf(r));
33             temp.append("_");
34             temp.append(String.valueOf(i));
35             temp.append(".dat");
36             locations[i] = new String(temp);
37         }
38
39         outputs = new FileOutputStream[r];
40         for(int i = 0; i < r; ++i) {
41             outputs[i] = null;
42         }
43     }
44
45     public MapReduceBase getMapreducer() {
46         return mapreducer;
47     }
48
49     public void setMapreducer(MapReduceBase mapreducer) {
50         this.mapreducer = mapreducer;
51     }
52
53     public void map() {
54         /*if(ID % 2 == 1) {
55             String temp = locations[locations.length];
56         }*/
57         
58         this.mapreducer.map(key, value, output);
59     }
60
61     public void partition() {
62         /*if(ID % 2 == 1) {
63             String temp = locations[locations.length];
64         }*/
65         
66         //try{
67             int size = this.output.size();
68             for(int i = 0; i < size; ++i) {
69                 String key = this.output.getKey(i);
70                 String value = this.output.getValue(i);
71                 // use the hashcode of key to decide which intermediate output
72                 // this pair should be in
73                 //int hash = key.hashCode();
74                 int index = (int)Math.abs(key.hashCode()) % this.r;
75                 FileOutputStream oStream = outputs[index];
76                 if(oStream == null) {
77                     // open the file
78                     String filepath = locations[index];
79                     oStream = new FileOutputStream(filepath, true); // append
80                     outputs[index] = oStream;
81                 }
82                 // format: key value\n
83                 oStream.write(key.getBytes());
84                 oStream.write(' ');
85                 oStream.write(value.getBytes());
86                 oStream.write('\n');
87                 oStream.flush();
88             }
89
90             // close the output files
91             for(int i = 0; i < this.outputs.length; ++i) {
92                 FileOutputStream temp = this.outputs[i];
93                 if(temp != null) {
94                     temp.close();
95                 }
96             }
97         /*} catch(Exception e) {
98             e.printStackTrace();
99             System.exit(-1);
100         }*/
101     }
102
103     public String outputFile(int i) {
104         if(outputs[i] != null) {
105             return locations[i];
106         } else {
107             return null;
108         }
109     }
110
111     public int getID() {
112         return this.ID;
113     }
114
115     public int getR() {
116         return this.r;
117     }
118
119 }