5 int[] mworkerStates; // array of map worker's state
6 // 0: idle 1: process 2: finished 3: fail
7 int[] rworkerStates; // array of reduce worker's state
8 Vector[] interoutputs; // array of string vector containing
9 // paths of intermediate outputs from
12 String outputfile; // path of final output file
15 public Master(int m, int r, Splitter splitter) {
18 this.mworkerStates = new int[m];
19 this.rworkerStates = new int[r];
20 this.interoutputs = new Vector[r];
21 this.splitter = splitter;
22 this.outputfile = new String("/scratch/mapreduce_java/output.dat");
30 public String getOutputFile() {
31 return this.outputfile;
34 public boolean isPartial() {
38 public void setPartial(boolean partial) {
39 this.partial = partial || this.partial;
42 /*public void split() {
46 public MapWorker[] assignMap() {
47 String[] contentsplits = splitter.split();//splitter.getSlices();
48 MapWorker[] mworkers = new MapWorker[contentsplits.length];
49 for(int i = 0; i < contentsplits.length; ++i) {
50 //System.printString("*************************\n");
51 //System.printString(contentsplits[i] + "\n");
52 //System.printString("*************************\n");
53 MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
55 mworkers[i] = mworker;
61 public void setMapFinish(int i) {
65 public void setMapFail(int i) {
69 public boolean isMapFinish() {
70 for(int i = 0; i < mworkerStates.length; ++i) {
71 if(mworkerStates[i] == 1) {
79 public void addInterOutput(String interoutput) {
80 int start = interoutput.lastindexOf('_');
81 int end = interoutput.indexOf('.');
82 int index = Integer.parseInt(interoutput.subString(start + 1, end));
83 //System.printString(interoutput.subString(start + 1, end) + "\n");
84 if(interoutputs[index] == null) {
85 interoutputs[index] = new Vector();
87 interoutputs[index].addElement(interoutput);
90 public ReduceWorker[] assignReduce() {
91 ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
92 for(int i = 0; i < interoutputs.length; ++i) {
93 ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
95 rworkers[i] = rworker;
96 this.interoutputs[i] = null;
98 this.interoutputs.clear();
102 public void setReduceFinish(int i) {
103 rworkerStates[i] = 2;
106 public void setReduceFail(int i) {
107 rworkerStates[i] = 3;
110 public boolean isReduceFinish() {
111 for(int i = 0; i < rworkerStates.length; ++i) {
112 if(rworkerStates[i] == 1) {
120 public void collectROutput(String file) {
121 FileInputStream iStream = new FileInputStream(file);
122 FileOutputStream oStream = new FileOutputStream(outputfile, true);
123 byte[] b = new byte[1024 * 10];
124 int length = iStream.read(b);
126 System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
129 //System.printString(new String(b, 0, length) + "\n");
130 oStream.write(b, 0, length);