start of new file
[IRC.git] / Robust / src / Benchmarks / Scheduling / MapReduce / Master.java
1 public class Master {
2     flag split;
3     flag assignMap;
4     flag mapoutput;
5     flag mapfinished;
6     flag assignReduce;
7     flag reduceoutput;
8     flag reducefinished;
9     flag output;
10
11     int m;
12     int r;
13     int[] mworkerStates; // array of map worker's state
14     // 0: idle  1: process  2: finished 3: fail
15     int finishmworker;
16     int[] rworkerStates; // array of reduce worker's state
17     int finishrworker;
18     Vector[] interoutputs; // array of OutputCollector vector containing
19     // paths of intermediate outputs from
20     // map worker
21
22     Splitter splitter;
23
24     //String outputfile;  // path of final output file
25     OutputCollector output;
26
27     //boolean partial;
28
29     public Master(int m, int r, Splitter splitter) {
30         this.m = m;
31         this.r = r;
32
33         mworkerStates = new int[m];
34         rworkerStates = new int[r];
35         for(int i = 0; i < m; ++i) {
36             mworkerStates[i] = 0;
37         }
38         for(int i = 0; i < r; ++i) {
39             rworkerStates[i] = 0;
40         }
41
42         interoutputs = new Vector[r];
43         for(int i = 0; i < r; ++i) {
44             interoutputs[i] = null;
45         }
46
47         this.splitter = splitter;
48         //this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
49         this.output = new OutputCollector();
50
51         //this.partial = false;
52         this.finishmworker = 0;
53         this.finishrworker = 0;
54     }
55
56     public int getR() {
57         return this.r;
58     }
59
60     /*public String getOutputFile() {
61         return this.outputfile;
62     }*/
63
64     /*public boolean isPartial() {
65         return this.partial;
66     }
67
68     public void setPartial(boolean partial) {
69         this.partial = partial || this.partial;
70     }*/
71
72     public void split() {
73         splitter.split();
74     }
75
76     /*public void assignMap() {
77         String[] contentsplits = splitter.getSlices();
78         for(int i = 0; i < contentsplits.length; ++i) {
79             //System.printString("*************************\n");
80             //System.printString(contentsplits[i] + "\n");
81             //System.printString("*************************\n");
82             MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
83             mworkerStates[i] = 1;
84         }
85     }*/
86     
87     public void setMapInit(int i) {
88         mworkerStates[i] = 1;
89     }
90
91     public void setMapFinish(int i) {
92         finishmworker++;
93         mworkerStates[i] = 2;
94     }
95
96     public void setMapFail(int i) {
97         mworkerStates[i] = 3;
98     }
99
100     public boolean isMapFinish() {
101         /*
102         //System.printString("check map finish\n");
103         for(int i = 0; i < mworkerStates.length; ++i) {
104             if(mworkerStates[i] == 1) {
105                 return false;
106             }
107         }
108
109         return true;*/
110         return this.finishmworker == this.m;
111     }
112
113     public void addInterOutput(OutputCollector interoutput, int index) {
114         if(interoutputs[index] == null) {
115             interoutputs[index] = new Vector();
116         }
117         interoutputs[index].addElement(interoutput);
118     }
119
120     /*public void assignReduce() {
121         for(int i = 0; i < interoutputs.length; ++i) {
122             ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
123             rworkerStates[i] = 1;
124         }
125     }*/
126     
127     public void setReduceInit(int i) {
128         rworkerStates[i] = 1;
129     }
130
131     public void setReduceFinish(int i) {
132         finishrworker++;
133         rworkerStates[i] = 2;
134     }
135
136     public void setReduceFail(int i) {
137         rworkerStates[i] = 3;
138     }
139
140     public boolean isReduceFinish() {
141         //System.printI(0xa0);
142         /*
143         for(int i = 0; i < rworkerStates.length; ++i) {
144             if(rworkerStates[i] == 1) {
145                 //System.printI(0);
146                 return false;
147             }
148         }
149
150         //System.printI(1);
151         return true;*/
152         return this.finishrworker == this.r;
153     }
154
155     public void collectROutput(OutputCollector file) {
156         int size = file.size();
157         for(int i = 0; i < size; ++i) {
158             String key = file.getKey(i);
159             String value = file.getValue(i);
160             this.output.emit(key, value);
161         }
162     }
163     
164     public Vector[] getInteroutputs() {
165         return this.interoutputs;
166     }
167     
168     public Splitter getSplitter() {
169         return this.splitter;
170     }
171 }