start of new file
[IRC.git] / Robust / src / Benchmarks / MapReduce / Java / Master.java
1 //package mapreduce;
2
3 /*import java.io.FileInputStream;
4 import java.io.FileOutputStream;
5 import java.util.Vector;*/
6
7 public class Master {
8
9     int m;
10     int r;
11     int[] mworkerStates; // array of map worker's state
12     // 0: idle  1: process  2: finished 3: fail
13     int[] rworkerStates; // array of reduce worker's state
14     Vector[] interoutputs; // array of string vector containing
15     // paths of intermediate outputs from
16     // map worker
17
18     Splitter splitter;
19
20     String outputfile;  // path of final output file
21
22     boolean partial;
23
24     public Master(int m, int r, Splitter splitter) {
25         this.m = m;
26         this.r = r;
27
28         mworkerStates = new int[m];
29         rworkerStates = new int[r];
30         for(int i = 0; i < m; ++i) {
31             mworkerStates[i] = 0;
32         }
33         for(int i = 0; i < r; ++i) {
34             rworkerStates[i] = 0;
35         }
36
37         interoutputs = new Vector[r];
38         for(int i = 0; i < r; ++i) {
39             interoutputs[i] = null;
40         }
41
42         this.splitter = splitter;
43         this.outputfile = new String("/scratch/mapreduce_java/output.dat");
44
45         this.partial = false;
46     }
47
48     public int getR() {
49         return this.r;
50     }
51
52     public String getOutputFile() {
53         return this.outputfile;
54     }
55
56     public boolean isPartial() {
57         return this.partial;
58     }
59
60     public void setPartial(boolean partial) {
61         this.partial = partial || this.partial;
62     }
63
64     public void split() {
65         splitter.split();
66     }
67
68     public MapWorker[] assignMap() {
69         String[] contentsplits = splitter.getSlices();
70         MapWorker[] mworkers = new MapWorker[contentsplits.length];
71         for(int i = 0; i < contentsplits.length; ++i) {
72             //System.printString("*************************\n");
73             //System.printString(contentsplits[i] + "\n");
74             //System.printString("*************************\n");
75             MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
76             mworkerStates[i] = 1;
77             mworkers[i] = mworker;
78         }
79         return mworkers;
80     }
81
82     public void setMapFinish(int i) {
83         mworkerStates[i] = 2;
84     }
85
86     public void setMapFail(int i) {
87         mworkerStates[i] = 3;
88     }
89
90     public boolean isMapFinish() {
91         for(int i = 0; i < mworkerStates.length; ++i) {
92             if(mworkerStates[i] == 1) {
93                 return false;
94             }
95         }
96
97         return true;
98     }
99
100     public void addInterOutput(String interoutput) {
101         int start = interoutput.lastindexOf('_');
102         int end = interoutput.indexOf('.');
103         int index = Integer.parseInt(interoutput.subString(start + 1, end));
104         //System.printString(interoutput.subString(start + 1, end) + "\n");
105         if(interoutputs[index] == null) {
106             interoutputs[index] = new Vector();
107         }
108         interoutputs[index].addElement(interoutput);
109     }
110
111     public ReduceWorker[] assignReduce() {
112         ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
113         for(int i = 0; i < interoutputs.length; ++i) {
114             ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
115             rworkerStates[i] = 1;
116             rworkers[i] = rworker;
117         }
118         return rworkers;
119     }
120
121     public void setReduceFinish(int i) {
122         rworkerStates[i] = 2;
123     }
124
125     public void setReduceFail(int i) {
126         rworkerStates[i] = 3;
127     }
128
129     public boolean isReduceFinish() {
130         for(int i = 0; i < rworkerStates.length; ++i) {
131             if(rworkerStates[i] == 1) {
132                 return false;
133             }
134         }
135
136         return true;
137     }
138
139     public void collectROutput(String file) {
140         //try{
141             FileInputStream iStream = new FileInputStream(file);
142             FileOutputStream oStream = new FileOutputStream(outputfile, true);
143             byte[] b = new byte[1024 * 10];
144             int length = iStream.read(b);
145             if(length < 0) {
146                 System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
147                 System.exit(-1);
148             }
149             //System.printString(new String(b, 0, length) + "\n");
150             oStream.write(b, 0, length);
151             iStream.close();
152             oStream.close();
153         /*} catch(Exception e) {
154             e.printStackTrace();
155             System.exit(-1);
156         }*/
157     }
158 }