helpful progress reporting
[IRC.git] / Robust / src / Benchmarks / MapReduce / Java / Master.java
1 public class Master {
2
3     int m;
4     int r;
5     int[] mworkerStates; // array of map worker's state
6                          // 0: idle  1: process  2: finished 3: fail
7     int[] rworkerStates; // array of reduce worker's state
8     Vector[] interoutputs; // array of string vector containing
9                            // paths of intermediate outputs from
10                            // map worker
11     Splitter splitter;
12     String outputfile;  // path of final output file
13     boolean partial;
14
15     public Master(int m, int r, Splitter splitter) {
16         this.m = m;
17         this.r = r;
18         this.mworkerStates = new int[m];
19         this.rworkerStates = new int[r];
20         this.interoutputs = new Vector[r];
21         this.splitter = splitter;
22         this.outputfile = new String("/scratch/mapreduce_java/output.dat");
23         this.partial = false;
24     }
25
26     public int getR() {
27         return this.r;
28     }
29
30     public String getOutputFile() {
31         return this.outputfile;
32     }
33
34     public boolean isPartial() {
35         return this.partial;
36     }
37
38     public void setPartial(boolean partial) {
39         this.partial = partial || this.partial;
40     }
41
42     /*public void split() {
43         splitter.split();
44     }*/
45
46     public MapWorker[] assignMap() {
47         String[] contentsplits = splitter.split();//splitter.getSlices();
48         MapWorker[] mworkers = new MapWorker[contentsplits.length];
49         for(int i = 0; i < contentsplits.length; ++i) {
50             //System.printString("*************************\n");
51             //System.printString(contentsplits[i] + "\n");
52             //System.printString("*************************\n");
53             MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
54             mworkerStates[i] = 1;
55             mworkers[i] = mworker;
56         }
57         this.splitter = null;
58         return mworkers;
59     }
60
61     public void setMapFinish(int i) {
62         mworkerStates[i] = 2;
63     }
64
65     public void setMapFail(int i) {
66         mworkerStates[i] = 3;
67     }
68
69     public boolean isMapFinish() {
70         for(int i = 0; i < mworkerStates.length; ++i) {
71             if(mworkerStates[i] == 1) {
72                 return false;
73             }
74         }
75
76         return true;
77     }
78
79     public void addInterOutput(String interoutput) {
80         int start = interoutput.lastindexOf('_');
81         int end = interoutput.indexOf('.');
82         int index = Integer.parseInt(interoutput.subString(start + 1, end));
83         //System.printString(interoutput.subString(start + 1, end) + "\n");
84         if(interoutputs[index] == null) {
85             interoutputs[index] = new Vector();
86         }
87         interoutputs[index].addElement(interoutput);
88     }
89
90     public ReduceWorker[] assignReduce() {
91         ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
92         for(int i = 0; i < interoutputs.length; ++i) {
93             ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
94             rworkerStates[i] = 1;
95             rworkers[i] = rworker;
96             this.interoutputs[i] = null;
97         }
98         this.interoutputs.clear();
99         return rworkers;
100     }
101
102     public void setReduceFinish(int i) {
103         rworkerStates[i] = 2;
104     }
105
106     public void setReduceFail(int i) {
107         rworkerStates[i] = 3;
108     }
109
110     public boolean isReduceFinish() {
111         for(int i = 0; i < rworkerStates.length; ++i) {
112             if(rworkerStates[i] == 1) {
113                 return false;
114             }
115         }
116
117         return true;
118     }
119
120     public void collectROutput(String file) {
121         FileInputStream iStream = new FileInputStream(file);
122         FileOutputStream oStream = new FileOutputStream(outputfile, true);
123         byte[] b = new byte[1024 * 10];
124         int length = iStream.read(b);
125         if(length < 0) {
126             System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
127             System.exit(-1);
128         }
129         //System.printString(new String(b, 0, length) + "\n");
130         oStream.write(b, 0, length);
131         iStream.close();
132         oStream.close();
133     }
134 }