13 int[] mworkerStates; // array of map worker's state
14 // 0: idle 1: process 2: finished 3: fail
16 int[] rworkerStates; // array of reduce worker's state
18 Vector[] interoutputs; // array of OutputCollector vector containing
19 // paths of intermediate outputs from
24 //String outputfile; // path of final output file
25 OutputCollector output;
29 public Master(int m, int r, Splitter splitter) {
33 mworkerStates = new int[m];
34 rworkerStates = new int[r];
35 for(int i = 0; i < m; ++i) {
38 for(int i = 0; i < r; ++i) {
42 interoutputs = new Vector[r];
43 for(int i = 0; i < r; ++i) {
44 interoutputs[i] = null;
47 this.splitter = splitter;
48 //this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
49 this.output = new OutputCollector();
51 //this.partial = false;
52 this.finishmworker = 0;
53 this.finishrworker = 0;
60 /*public String getOutputFile() {
61 return this.outputfile;
64 /*public boolean isPartial() {
68 public void setPartial(boolean partial) {
69 this.partial = partial || this.partial;
76 /*public void assignMap() {
77 String[] contentsplits = splitter.getSlices();
78 for(int i = 0; i < contentsplits.length; ++i) {
79 //System.printString("*************************\n");
80 //System.printString(contentsplits[i] + "\n");
81 //System.printString("*************************\n");
82 MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
87 public void setMapInit(int i) {
91 public void setMapFinish(int i) {
96 public void setMapFail(int i) {
100 public boolean isMapFinish() {
102 //System.printString("check map finish\n");
103 for(int i = 0; i < mworkerStates.length; ++i) {
104 if(mworkerStates[i] == 1) {
110 return this.finishmworker == this.m;
113 public void addInterOutput(OutputCollector interoutput, int index) {
114 if(interoutputs[index] == null) {
115 interoutputs[index] = new Vector();
117 interoutputs[index].addElement(interoutput);
120 /*public void assignReduce() {
121 for(int i = 0; i < interoutputs.length; ++i) {
122 ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
123 rworkerStates[i] = 1;
127 public void setReduceInit(int i) {
128 rworkerStates[i] = 1;
131 public void setReduceFinish(int i) {
133 rworkerStates[i] = 2;
136 public void setReduceFail(int i) {
137 rworkerStates[i] = 3;
140 public boolean isReduceFinish() {
141 //System.printI(0xa0);
143 for(int i = 0; i < rworkerStates.length; ++i) {
144 if(rworkerStates[i] == 1) {
152 return this.finishrworker == this.r;
155 public void collectROutput(OutputCollector file) {
156 int size = file.size();
157 for(int i = 0; i < size; ++i) {
158 String key = file.getKey(i);
159 String value = file.getValue(i);
160 this.output.emit(key, value);
164 public Vector[] getInteroutputs() {
165 return this.interoutputs;
168 public Splitter getSplitter() {
169 return this.splitter;