+//package mapreduce;
+public class Configuration {
+ MapReduceBase mapreducer;
+ int m;
+ int r;
+ char seperator;
+ String inputfile;
+ public Configuration() {
+ this.mapreducer = null;
+ }
+ public MapReduceBase getMapReduce() {
+ return this.mapreducer;
+ }
+ public void setMapReduceClass(MapReduceBase mapreducer) {
+ this.mapreducer = mapreducer;
+ }
+ public int getM() {
+ return m;
+ }
+ public void setM(int m) {
+ this.m = m;
+ }
+ public int getR() {
+ return r;
+ }
+ public void setR(int r) {
+ this.r = r;
+ }
+ public char getSeperator() {
+ return seperator;
+ }
+ public void setSeperator(char seperator) {
+ this.seperator = seperator;
+ }
+ public String getInputfile() {
+ return inputfile;
+ }
+ public void setInputfile(String inputfile) {
+ this.inputfile = inputfile;
+ }
+//package mapreduce;
+public class Configured {
+ Configuration conf;
+ public Configured() {
+ this.conf = null;
+ }
+ public Configured(Configuration conf) {
+ this.conf = conf;
+ }
+ public Configuration getConf() {
+ return this.conf;
+ }
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+//package mapreduce;
+public class JobClient{
+ public JobClient() {}
+ public static void runJob(Configuration conf) {
+ Splitter splitter = new Splitter(conf.getInputfile(), conf.getM(), conf.getSeperator());
+ Master master = new Master(conf.getM(), conf.getR(), splitter);
+ // split input file
+ System.printString("Split\n");
+ master.split();
+ // do 'map'
+ System.printString("Map\n");
+ MapWorker[] mworkers = master.assignMap();
+ for(int i = 0; i < mworkers.length; ++i) {
+ MapWorker mworker = mworkers[i];
+ mworker.setMapreducer(conf.getMapReduce());
+ mworker.map();
+ mworker.partition();
+ }
+ // register intermediate output from map workers to master
+ System.printString("Mapoutput\n");
+ for(int i = 0; i < mworkers.length; ++i) {
+ for(int j = 0; j < conf.getR(); ++j) {
+ String temp = mworkers[i].outputFile(j);
+ if(temp != null) {
+ master.addInterOutput(temp);
+ }
+ }
+ master.setMapFinish(mworkers[i].getID());
+ }
+ //assert(master.isMapFinish());
+ // do 'reduce'
+ System.printString("Reduce\n");
+ ReduceWorker[] rworkers = master.assignReduce();
+ for(int i = 0; i < rworkers.length; ++i) {
+ ReduceWorker rworker = rworkers[i];
+ rworker.setMapreducer(conf.getMapReduce());
+ rworker.sortgroup();
+ rworker.reduce();
+ }
+ // merge all the intermediate output from reduce workers to master
+ System.printString("Merge\n");
+ for(int i = 0; i < rworkers.length; ++i) {
+ master.collectROutput(rworkers[i].getOutputFile());
+ master.setReduceFinish(rworkers[i].getID());
+ }
+ //assert(master.isReduceFinish());
+ System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ }
+//package mapreduce;
+//import java.util.Vector;
+public /*abstract*/ class MapReduceBase {
+ public MapReduceBase() {}
+ public /*abstract*/ void map(String key, String value, OutputCollector output);
+ public /*abstract*/ void reduce(String key, Vector values, OutputCollector output);
+//package mapreduce;
+//import java.io.FileOutputStream;
+public class MapWorker {
+ int ID;
+ MapReduceBase mapreducer;
+ int r;
+ String key;
+ String value;
+ OutputCollector output;
+ String[] locations;
+ FileOutputStream[] outputs;
+ public MapWorker(String key, String value, int r, int id) {
+ this.ID = id;
+ this.mapreducer = null;
+ this.r = r;
+ this.key = key;
+ this.value = value;
+ this.output = new OutputCollector();
+ locations = new String[r];
+ for(int i = 0; i < r; ++i) {
+ StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ locations[i] = new String(temp);
+ }
+ outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
+ }
+ public MapReduceBase getMapreducer() {
+ return mapreducer;
+ }
+ public void setMapreducer(MapReduceBase mapreducer) {
+ this.mapreducer = mapreducer;
+ }
+ public void map() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+ this.mapreducer.map(key, value, output);
+ }
+ public void partition() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+ //try{
+ int size = this.output.size();
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // use the hashcode of key to decide which intermediate output
+ // this pair should be in
+ //int hash = key.hashCode();
+ int index = (int)Math.abs(key.hashCode()) % this.r;
+ FileOutputStream oStream = outputs[index];
+ if(oStream == null) {
+ // open the file
+ String filepath = locations[index];
+ oStream = new FileOutputStream(filepath, true); // append
+ outputs[index] = oStream;
+ }
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ // close the output files
+ for(int i = 0; i < this.outputs.length; ++i) {
+ FileOutputStream temp = this.outputs[i];
+ if(temp != null) {
+ temp.close();
+ }
+ }
+ /*} catch(Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+ public String outputFile(int i) {
+ if(outputs[i] != null) {
+ return locations[i];
+ } else {
+ return null;
+ }
+ }
+ public int getID() {
+ return this.ID;
+ }
+ public int getR() {
+ return this.r;
+ }
+//package mapreduce;
+/*import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.util.Vector;*/
+public class Master {
+ int m;
+ int r;
+ int[] mworkerStates; // array of map worker's state
+ // 0: idle 1: process 2: finished 3: fail
+ int[] rworkerStates; // array of reduce worker's state
+ Vector[] interoutputs; // array of string vector containing
+ // paths of intermediate outputs from
+ // map worker
+ Splitter splitter;
+ String outputfile; // path of final output file
+ boolean partial;
+ public Master(int m, int r, Splitter splitter) {
+ this.m = m;
+ this.r = r;
+ mworkerStates = new int[m];
+ rworkerStates = new int[r];
+ for(int i = 0; i < m; ++i) {
+ mworkerStates[i] = 0;
+ }
+ for(int i = 0; i < r; ++i) {
+ rworkerStates[i] = 0;
+ }
+ interoutputs = new Vector[r];
+ for(int i = 0; i < r; ++i) {
+ interoutputs[i] = null;
+ }
+ this.splitter = splitter;
+ this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+ this.partial = false;
+ }
+ public int getR() {
+ return this.r;
+ }
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+ public boolean isPartial() {
+ return this.partial;
+ }
+ public void setPartial(boolean partial) {
+ this.partial = partial || this.partial;
+ }
+ public void split() {
+ splitter.split();
+ }
+ public MapWorker[] assignMap() {
+ String[] contentsplits = splitter.getSlices();
+ MapWorker[] mworkers = new MapWorker[contentsplits.length];
+ for(int i = 0; i < contentsplits.length; ++i) {
+ //System.printString("*************************\n");
+ //System.printString(contentsplits[i] + "\n");
+ //System.printString("*************************\n");
+ MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i);
+ mworkerStates[i] = 1;
+ mworkers[i] = mworker;
+ }
+ return mworkers;
+ }
+ public void setMapFinish(int i) {
+ mworkerStates[i] = 2;
+ }
+ public void setMapFail(int i) {
+ mworkerStates[i] = 3;
+ }
+ public boolean isMapFinish() {
+ for(int i = 0; i < mworkerStates.length; ++i) {
+ if(mworkerStates[i] == 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void addInterOutput(String interoutput) {
+ int start = interoutput.indexOf('_');
+ int end = interoutput.indexOf('.');
+ int index = Integer.parseInt(interoutput.substring(start + 1, end));
+ //System.printString(interoutput.subString(start + 1, end) + "\n");
+ if(interoutputs[index] == null) {
+ interoutputs[index] = new Vector();
+ }
+ interoutputs[index].addElement(interoutput);
+ }
+ public ReduceWorker[] assignReduce() {
+ ReduceWorker[] rworkers = new ReduceWorker[interoutputs.length];
+ for(int i = 0; i < interoutputs.length; ++i) {
+ ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
+ rworkerStates[i] = 1;
+ rworkers[i] = rworker;
+ }
+ return rworkers;
+ }
+ public void setReduceFinish(int i) {
+ rworkerStates[i] = 2;
+ }
+ public void setReduceFail(int i) {
+ rworkerStates[i] = 3;
+ }
+ public boolean isReduceFinish() {
+ for(int i = 0; i < rworkerStates.length; ++i) {
+ if(rworkerStates[i] == 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void collectROutput(String file) {
+ //try{
+ FileInputStream iStream = new FileInputStream(file);
+ FileOutputStream oStream = new FileOutputStream(outputfile, true);
+ byte[] b = new byte[1024 * 100];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+ System.exit(-1);
+ }
+ //System.printString(new String(b, 0, length) + "\n");
+ oStream.write(b, 0, length);
+ iStream.close();
+ oStream.close();
+ /*} catch(Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+//package mapreduce;
+//import java.util.Vector;
+public class OutputCollector {
+ Vector keys;
+ Vector values;
+ public OutputCollector() {
+ this.keys = new Vector();
+ this.values = new Vector();
+ }
+ public void emit(String key, String value) {
+ this.keys.addElement(key);
+ this.values.addElement(value);
+ }
+ public int size() {
+ return this.keys.size();
+ }
+ public String getKey(int i) {
+ return (String)this.keys.elementAt(i);
+ }
+ public String getValue(int i) {
+ return (String)this.values.elementAt(i);
+ }
+//package mapreduce;
+/*import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Vector;*/
+import mapreduce.MapReduceBase;
+public class ReduceWorker {
+ int ID;
+ MapReduceBase mapreducer;
+ Vector interoutputs; // string vector containing paths
+ // of intermediate outputs from map worker
+ Vector keys;
+ HashMap values; // hashmap map key to vector of string vector
+ int[] sorts; // array record the sort of keys
+ OutputCollector output;
+ String outputfile; // path of the intermediate output file
+ public ReduceWorker(Vector interoutputs, int id) {
+ this.ID = id;
+ this.mapreducer = null;
+ this.interoutputs = interoutputs;
+ this.keys = new Vector();
+ this.values = new HashMap();
+ //this.sorts = null;
+ this.output = new OutputCollector();
+ this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+ }
+ public MapReduceBase getMapreducer() {
+ return mapreducer;
+ }
+ public void setMapreducer(MapReduceBase mapreducer) {
+ this.mapreducer = mapreducer;
+ }
+ public void sortgroup() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+ // group values associated to the same key
+ //System.printString("================================\n");
+ if(interoutputs == null) {
+ return;
+ }
+ //try{
+ for(int i = 0; i < interoutputs.size(); ++i) {
+ FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+ byte[] b = new byte[1024 * 100];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ while(index != -1) {
+ String line = content.substring(0, index);
+ content = content.substring(index + 1);
+ //System.printString(line + "\n");
+ int tmpindex = line.indexOf(' ');
+ String key = line.substring(0, tmpindex);
+ String value = line.substring(tmpindex + 1);
+ //System.printString(key + "; " + value + "\n");
+ if(!this.values.containsKey(key)) {
+ this.values.put(key, new Vector());
+ this.keys.addElement(key);
+ }
+ ((Vector)this.values.get(key)).addElement(value);
+ index = content.indexOf('\n');
+ }
+ iStream.close();
+ }
+ //System.printString("================================\n");
+ /*for(int i = 0; i < this.keys.size(); ++i) {
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
+ // sort all the keys inside interoutputs
+ this.sorts = new int[this.keys.size()];
+ // insert sorting
+ this.sorts[0] = 0;
+ int tosort = 1;
+ for(; tosort < this.keys.size(); ++tosort) {
+ int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+ int index = tosort;
+ for(int i = tosort; i > 0; --i) {
+ if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+ this.sorts[i] = this.sorts[i-1];
+ index = i - 1;
+ } else {
+ //System.printString(i + "; " + tosort + "\n");
+ index = i;
+ i = 0;
+ }
+ }
+ this.sorts[index] = tosort;
+ }
+ /*for(int i = 0; i < this.sorts.length; ++i) {
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
+ /*} catch(IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+ public void reduce() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+ if(this.interoutputs == null) {
+ return;
+ }
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ this.mapreducer.reduce(key, values, output);
+ }
+ //try{
+ // output all the result into some local file
+ int size = this.output.size();
+ FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ oStream.close();
+ /*} catch(Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+ public int getID() {
+ return this.ID;
+ }
+//package mapreduce;
+//import java.io.FileInputStream;
+//import java.io.IOException;
+public class Splitter {
+ String filename;
+ String content;
+ int length;
+ int[] splits;
+ String[] slices;
+ public Splitter(String path, int splitNum, char seperator) {
+ //try{
+ //System.printString("Top of Splitter's constructor\n");
+ filename = path;
+ FileInputStream iStream = new FileInputStream(filename);
+ byte[] b = new byte[1024 * 10];
+ length = iStream.read(b);
+ if(length < 0) {
+ System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
+ System.exit(-1);
+ }
+ content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ iStream.close();
+ if(splitNum == 1) {
+ slices = new String[1];
+ slices[0] = content;
+ } else {
+ splits = new int[splitNum - 1];
+ int index = 0;
+ int span = length / splitNum;
+ int temp = 0;
+ for(int i = 0; i < splitNum - 1; ++i) {
+ temp += span;
+ if(temp > index) {
+ index = temp;
+ while((content.charAt(index) != seperator) && (index != length - 1)) {
+ ++index;
+ }
+ }
+ splits[i] = index;
+ }
+ this.slices = new String[splits.length + 1];
+ for(int i = 0; i < this.slices.length; ++i) {
+ this.slices[i] = null;
+ }
+ }
+ /*} catch(IOException e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+ public void split() {
+ if(slices.length == 1) {
+ return;
+ }
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.substring(start, end);
+ }
+ start = end + 1;
+ }
+ slices[slices.length - 1] = content.substring(start);
+ }
+ public String getFilename() {
+ return filename;
+ }
+ public String[] getSlices() {
+ return this.slices;
+ }
+//package mapreduce;
+public /*interface*/ class Tool {
+ public int run(String[] args);
+//package mapreduce;
+public class ToolRunner {
+ public static int run(Tool tool, String[] args) {
+ return tool.run(args);
+ }
+//import java.io.FileInputStream;
+//import java.util.Vector;
+/*import mapreduce.Configuration;
+import mapreduce.Configured;
+import mapreduce.JobClient;
+import mapreduce.MapReduceBase;
+import mapreduce.OutputCollector;
+import mapreduce.Tool;
+import mapreduce.ToolRunner;*/
+ * Counts the words in each line.
+ * For each line of input, break the line into words and emit them as
+ * (<b>word</b>, <b>1</b>).
+ */
+ public class MapReduceClass extends MapReduceBase {
+ public MapReduceClass() {}
+ public void map(String key, String value, OutputCollector output) {
+ int n = value.length();
+ for (int i = 0; i < n; ) {
+ // Skip past leading whitespace
+ while ((i < n) && isspace(value.charAt(i))) {
+ ++i;
+ }
+ // Find word end
+ int start = i;
+ while ((i < n) && !isspace(value.charAt(i))) {
+ i++;
+ }
+ if (start < i) {
+ output.emit(value.substring(start, i), "1");
+ }
+ }
+ }
+ public void reduce(String key, Vector values, OutputCollector output) {
+ // Iterate over all entries with the
+ // // same key and add the values
+ int value = 0;
+ for(int i = 0; i < values.size(); ++i) {
+ value += Integer.parseInt((String)values.elementAt(i));
+ }
+ // Emit sum for input->key()
+ output.emit(key, String.valueOf(value));
+ }
+ boolean isspace(char c) {
+ if((c == ' ') ||
+ (c == '.') ||
+ (c == '!') ||
+ (c == '?') ||
+ (c == '"') ||
+ (c == '\n')) {
+ return true;
+ }
+ return false;
+ }
+ }
+public class WordCounter /*implements*/extends Tool {
+ public WordCounter() {}
+ static int printUsage() {
+ System./*out.println*/printString("<conffile>\n");
+ return -1;
+ }
+ /**
+ * The main driver for word count map/reduce program.
+ * Invoke this method to submit the map/reduce job.
+ * @throws IOException When there is communication problems with the
+ * job tracker.
+ */
+ public int run(String[] args) {
+ //try {
+ MapReduceClass mapreducer = new MapReduceClass();
+ FileInputStream iStream = new FileInputStream(args[0]);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0 ) {
+ System./*out.println*/printString("Error! Can not read from configure file: " + args[0] + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.out.println(content);
+ int index = content.indexOf('\n');
+ String inputfile = content.substring(0, index);
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ int m = Integer.parseInt(content.substring(0, index));
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ int r = Integer.parseInt(content.substring(0, index));
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ String temp = content.substring(0, index);
+ char seperator = temp.charAt(0);
+ //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
+ Configuration conf = new Configuration();
+ conf.setMapReduceClass(mapreducer);
+ conf.setInputfile(inputfile);
+ conf.setM(m);
+ conf.setR(r);
+ conf.setSeperator(seperator);
+ JobClient.runJob(conf);
+ /*} catch (Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ return 0;
+ }
+ public static void main(String[] args) /*throws Exception*/ {
+ int res = ToolRunner.run(new WordCounter(), args);
+ System.exit(res);
+ }
+task startup(StartupObject s{initialstate}) {
+ // read in configuration parameters
+ // System.printString("Top of task startup\n");
+ String path = new String("/home/jzhou/mapreduce/conf.txt");
+ FileInputStream iStream = new FileInputStream(path);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0 ) {
+ System.printString("Error! Can not read from configure file: " + path + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ String inputfile = content.subString(0, index);
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ int m = Integer.parseInt(content.subString(0, index));
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ int r = Integer.parseInt(content.subString(0, index));
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ String temp = content.subString(0, index);
+ char seperator = temp.charAt(0);
+ //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
+ Splitter splitter = new Splitter(inputfile, m, seperator);
+ Master master = new Master(m, r, splitter){split};
+ taskexit(s{!initialstate});
+//Split the input file into M pieces
+task split(Master master{split}) {
+ System.printString("Top of task split\n");
+ master.split();
+ taskexit(master{!split, assignMap});
+//Select a map worker to handle one of the pieces of input file
+task assignMap(Master master{assignMap}) {
+ System.printString("Top of task assignMap\n");
+ master.assignMap();
+ taskexit(master{!assignMap, mapoutput});
+//MapWorker do 'map' function on a input file piece
+task map(MapWorker mworker{map}) {
+ System.printString("Top of task map\n");
+ mworker.map();
+ taskexit(mworker{!map, partition});
+//Partition the intermediate key/value pair generated
+//into R intermediate local files
+task partition(MapWorker mworker{partition}) {
+ System.printString("Top of task partition\n");
+ mworker.partition();
+ taskexit(mworker{!partition, mapoutput});
+//Register the intermediate ouput from map worker to master
+task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
+ System.printString("Top of task mapOutput\n");
+ if(isavailable(mworker)) {
+ int total = master.getR();
+ for(int i = 0; i < total; ++i) {
+ String temp = mworker.outputFile(i);
+ if(temp != null) {
+ master.addInterOutput(temp);
+ }
+ }
+ master.setMapFinish(mworker.getID());
+ } else {
+ master.setMapFail(mworker.getID());
+ master.setPartial(true);
+ }
+ if(master.isMapFinish()) {
+ taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
+ }
+ taskexit(mworker{!mapoutput});
+//Assign the list of intermediate output associated to one key to
+//a reduce worker
+task assignReduce(Master master{assignReduce}) {
+ System.printString("Top of task assignReduce\n");
+ master.assignReduce();
+ taskexit(master{!assignReduce, reduceoutput});
+//First do sort and group on the intermediate key/value pairs assigned
+//to reduce worker
+task sortgroup(ReduceWorker rworker{sortgroup}) {
+ System.printString("Top of task sortgroup\n");
+ rworker.sortgroup();
+ taskexit(rworker{!sortgroup, reduce});
+//Do 'reduce' function
+task reduce(ReduceWorker rworker{reduce}) {
+ System.printString("Top of task reduce\n");
+ rworker.reduce();
+ taskexit(rworker{!reduce, reduceoutput});
+//Collect the output into master
+task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
+ System.printString("Top of task reduceOutput\n");
+ if(isavailable(rworker)) {
+ master.collectROutput(rworker.getOutputFile());
+ master.setReduceFinish(rworker.getID());
+ } else {
+ master.setReduceFail(rworker.getID());
+ master.setPartial(true);
+ }
+ if(master.isReduceFinish()) {
+ //System.printString("reduce finish\n");
+ taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
+ }
+ taskexit(rworker{!reduceoutput});
+task output(Master master{output}) {
+ System.printString("Top of task output\n");
+ if(master.isPartial()) {
+ System.printString("Partial! The result may not be right due to some failure!\n");
+ }
+ System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ taskexit(master{!output});
+public class MapReduceBase {
+ public static void map(String key, String value, OutputCollector output) {
+ int n = value.length();
+ for (int i = 0; i < n; ) {
+ // Skip past leading whitespace
+ while ((i < n) && isspace(value.charAt(i))) {
+ ++i;
+ }
+ // Find word end
+ int start = i;
+ while ((i < n) && !isspace(value.charAt(i))) {
+ i++;
+ }
+ if (start < i) {
+ output.emit(value.subString(start, i), "1");
+ //System.printString(value.subString(start,i) + "\n");
+ }
+ }
+ }
+ public static void reduce(String key, Vector values, OutputCollector output) {
+ // Iterate over all entries with the
+ // // same key and add the values
+ int value = 0;
+ for(int i = 0; i < values.size(); ++i) {
+ value += Integer.parseInt((String)values.elementAt(i));
+ }
+ // Emit sum for input->key()
+ output.emit(key, String.valueOf(value));
+ }
+ static boolean isspace(char c) {
+ if((c == ' ') ||
+ (c == ',') ||
+ (c == '.') ||
+ (c == '!') ||
+ (c == '?') ||
+ (c == '"') ||
+ (c == '(') ||
+ (c == ')') ||
+ (c == '[') ||
+ (c == ']') ||
+ (c == '{') ||
+ (c == '}') ||
+ (c == '\n')) {
+ return true;
+ }
+ return false;
+ }
+public class MapWorker {
+ flag map;
+ flag partition;
+ flag mapoutput;
+ int ID;
+ int r;
+ String key;
+ String value;
+ OutputCollector output;
+ String[] locations;
+ FileOutputStream[] outputs;
+ public MapWorker(String key, String value, int r, int id) {
+ this.ID = id;
+ this.r = r;
+ this.key = key;
+ this.value = value;
+ this.output = new OutputCollector();
+ locations = new String[r];
+ for(int i = 0; i < r; ++i) {
+ StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ locations[i] = new String(temp);
+ }
+ outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
+ }
+ public void map() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+ MapReduceBase.map(key, value, output);
+ }
+ public void partition() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+ int size = this.output.size();
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // use the hashcode of key to decide which intermediate output
+ // this pair should be in
+ int index = (int)Math.abs(key.hashCode()) % this.r;
+ FileOutputStream oStream = outputs[index];
+ if(oStream == null) {
+ // open the file
+ String filepath = locations[index];
+ oStream = new FileOutputStream(filepath, true); // append
+ outputs[index] = oStream;
+ }
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ // close the output files
+ for(int i = 0; i < this.outputs.length; ++i) {
+ FileOutputStream temp = this.outputs[i];
+ if(temp != null) {
+ temp.close();
+ }
+ }
+ }
+ public String outputFile(int i) {
+ if(outputs[i] != null) {
+ return locations[i];
+ } else {
+ return null;
+ }
+ }
+ public int getID() {
+ return this.ID;
+ }
+ public int getR() {
+ return this.r;
+ }
+public class Master {
+ flag split;
+ flag assignMap;
+ flag mapoutput;
+ flag mapfinished;
+ flag assignReduce;
+ flag reduceoutput;
+ flag reducefinished;
+ flag output;
+ int m;
+ int r;
+ int[] mworkerStates; // array of map worker's state
+ // 0: idle 1: process 2: finished 3: fail
+ int[] rworkerStates; // array of reduce worker's state
+ Vector[] interoutputs; // array of string vector containing
+ // paths of intermediate outputs from
+ // map worker
+ Splitter splitter;
+ String outputfile; // path of final output file
+ boolean partial;
+ public Master(int m, int r, Splitter splitter) {
+ this.m = m;
+ this.r = r;
+ mworkerStates = new int[m];
+ rworkerStates = new int[r];
+ for(int i = 0; i < m; ++i) {
+ mworkerStates[i] = 0;
+ }
+ for(int i = 0; i < r; ++i) {
+ rworkerStates[i] = 0;
+ }
+ interoutputs = new Vector[r];
+ for(int i = 0; i < r; ++i) {
+ interoutputs[i] = null;
+ }
+ this.splitter = splitter;
+ this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+ this.partial = false;
+ }
+ public int getR() {
+ return this.r;
+ }
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+ public boolean isPartial() {
+ return this.partial;
+ }
+ public void setPartial(boolean partial) {
+ this.partial = partial || this.partial;
+ }
+ public void split() {
+ splitter.split();
+ }
+ public void assignMap() {
+ String[] contentsplits = splitter.getSlices();
+ for(int i = 0; i < contentsplits.length; ++i) {
+ //System.printString("*************************\n");
+ //System.printString(contentsplits[i] + "\n");
+ //System.printString("*************************\n");
+ MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
+ mworkerStates[i] = 1;
+ }
+ }
+ public void setMapFinish(int i) {
+ mworkerStates[i] = 2;
+ }
+ public void setMapFail(int i) {
+ mworkerStates[i] = 3;
+ }
+ public boolean isMapFinish() {
+ for(int i = 0; i < mworkerStates.length; ++i) {
+ if(mworkerStates[i] == 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void addInterOutput(String interoutput) {
+ int start = interoutput.indexOf('_');
+ int end = interoutput.indexOf('.');
+ int index = Integer.parseInt(interoutput.subString(start + 1, end));
+ //System.printString(interoutput.subString(start + 1, end) + "\n");
+ if(interoutputs[index] == null) {
+ interoutputs[index] = new Vector();
+ }
+ interoutputs[index].addElement(interoutput);
+ }
+ public void assignReduce() {
+ for(int i = 0; i < interoutputs.length; ++i) {
+ ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
+ rworkerStates[i] = 1;
+ }
+ }
+ public void setReduceFinish(int i) {
+ rworkerStates[i] = 2;
+ }
+ public void setReduceFail(int i) {
+ rworkerStates[i] = 3;
+ }
+ public boolean isReduceFinish() {
+ for(int i = 0; i < rworkerStates.length; ++i) {
+ if(rworkerStates[i] == 1) {
+ return false;
+ }
+ }
+ return true;
+ }
+ public void collectROutput(String file) {
+ FileInputStream iStream = new FileInputStream(file);
+ FileOutputStream oStream = new FileOutputStream(outputfile, true);
+ byte[] b = new byte[1024 * 100];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+ System.exit(-1);
+ }
+ //System.printString(new String(b, 0, length) + "\n");
+ oStream.write(b, 0, length);
+ iStream.close();
+ oStream.close();
+ }
+public class OutputCollector {
+ Vector keys;
+ Vector values;
+ public OutputCollector() {
+ this.keys = new Vector();
+ this.values = new Vector();
+ }
+ public void emit(String key, String value) {
+ this.keys.addElement(key);
+ this.values.addElement(value);
+ }
+ public int size() {
+ return this.keys.size();
+ }
+ public String getKey(int i) {
+ return (String)this.keys.elementAt(i);
+ }
+ public String getValue(int i) {
+ return (String)this.values.elementAt(i);
+ }
+public class ReduceWorker {
+ flag sortgroup;
+ flag reduce;
+ flag reduceoutput;
+ int ID;
+ Vector interoutputs; // string vector containing paths
+ // of intermediate outputs from map worker
+ Vector keys;
+ HashMap values; // hashmap map key to vector of string vector
+ int[] sorts; // array record the sort of keys
+ OutputCollector output;
+ String outputfile; // path of the intermediate output file
+ public ReduceWorker(Vector interoutputs, int id) {
+ this.ID = id;
+ this.interoutputs = interoutputs;
+ this.keys = new Vector();
+ this.values = new HashMap();
+ //this.sorts = null;
+ this.output = new OutputCollector();
+ this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+ }
+ public void sortgroup() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+ // group values associated to the same key
+ //System.printString("================================\n");
+ if(interoutputs == null) {
+ return;
+ }
+ for(int i = 0; i < interoutputs.size(); ++i) {
+ FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+ byte[] b = new byte[1024 * 100];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ while(index != -1) {
+ String line = content.subString(0, index);
+ content = content.subString(index + 1);
+ //System.printString(line + "\n");
+ int tmpindex = line.indexOf(' ');
+ String key = line.subString(0, tmpindex);
+ String value = line.subString(tmpindex + 1);
+ //System.printString(key + "; " + value + "\n");
+ if(!this.values.containsKey(key)) {
+ this.values.put(key, new Vector());
+ this.keys.addElement(key);
+ }
+ ((Vector)this.values.get(key)).addElement(value);
+ index = content.indexOf('\n');
+ }
+ iStream.close();
+ }
+ //System.printString("================================\n");
+ /*for(int i = 0; i < this.keys.size(); ++i) {
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
+ // sort all the keys inside interoutputs
+ this.sorts = new int[this.keys.size()];
+ // insert sorting
+ this.sorts[0] = 0;
+ int tosort = 1;
+ for(; tosort < this.keys.size(); ++tosort) {
+ int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+ int index = tosort;
+ for(int i = tosort; i > 0; --i) {
+ if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+ this.sorts[i] = this.sorts[i-1];
+ index = i - 1;
+ } else {
+ //System.printString(i + "; " + tosort + "\n");
+ index = i;
+ i = 0;
+ }
+ }
+ this.sorts[index] = tosort;
+ }
+ /*for(int i = 0; i < this.sorts.length; ++i) {
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
+ }
+ public void reduce() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+ if(this.interoutputs != null) {
+ // return;
+ //}
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ MapReduceBase.reduce(key, values, output);
+ }
+ }
+ // output all the result into some local file
+ int size = this.output.size();
+ FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ oStream.close();
+ }
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+ public int getID() {
+ return this.ID;
+ }
+public class Splitter {
+ String filename;
+ String content;
+ int length;
+ int[] splits;
+ String[] slices;
+ public Splitter(String path, int splitNum, char seperator) {
+ //System.printString("Top of Splitter's constructor\n");
+ filename = path;
+ FileInputStream iStream = new FileInputStream(filename);
+ byte[] b = new byte[1024 * 10];
+ length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from input file: " + filename + "\n");
+ System.exit(-1);
+ }
+ content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ iStream.close();
+ if(splitNum == 1) {
+ slices = new String[1];
+ slices[0] = content;
+ } else {
+ splits = new int[splitNum - 1];
+ int index = 0;
+ int span = length / splitNum;
+ int temp = 0;
+ for(int i = 0; i < splitNum - 1; ++i) {
+ temp += span;
+ if(temp > index) {
+ index = temp;
+ while((content.charAt(index) != seperator) && (index != length - 1)) {
+ ++index;
+ }
+ }
+ splits[i] = index;
+ }
+ this.slices = new String[splits.length + 1];
+ for(int i = 0; i < this.slices.length; ++i) {
+ this.slices[i] = null;
+ }
+ }
+ }
+ public void split() {
+ if(slices.length == 1) {
+ return;
+ }
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.subString(start, end);
+ }
+ start = end + 1;
+ }
+ slices[slices.length - 1] = content.subString(start);
+ }
+ public String getFilename() {
+ return filename;
+ }
+ public String[] getSlices() {
+ return this.slices;
+ }
+public class Estimator {
+ int stages;
+ int time;
+ double variance;
+ double[] probtable;
+ boolean partial;
+ public Estimator(int stages) {
+ this.stages = stages;
+ this.time = 0;
+ this.variance = 0;
+ this.probtable = new double[31];
+ int i = 0;
+ this.probtable[i++] = 0.5000;
+ this.probtable[i++] = 0.5398;
+ this.probtable[i++] = 0.5793;
+ this.probtable[i++] = 0.6179;
+ this.probtable[i++] = 0.6554;
+ this.probtable[i++] = 0.6915;
+ this.probtable[i++] = 0.7257;
+ this.probtable[i++] = 0.7580;
+ this.probtable[i++] = 0.7881;
+ this.probtable[i++] = 0.8159;
+ this.probtable[i++] = 0.8413;
+ this.probtable[i++] = 0.8643;
+ this.probtable[i++] = 0.8849;
+ this.probtable[i++] = 0.9032;
+ this.probtable[i++] = 0.9192;
+ this.probtable[i++] = 0.9332;
+ this.probtable[i++] = 0.9452;
+ this.probtable[i++] = 0.9554;
+ this.probtable[i++] = 0.9641;
+ this.probtable[i++] = 0.9713;
+ this.probtable[i++] = 0.9772;
+ this.probtable[i++] = 0.9821;
+ this.probtable[i++] = 0.9861;
+ this.probtable[i++] = 0.9893;
+ this.probtable[i++] = 0.9918;
+ this.probtable[i++] = 0.9938;
+ this.probtable[i++] = 0.9953;
+ this.probtable[i++] = 0.9965;
+ this.probtable[i++] = 0.9974;
+ this.probtable[i++] = 0.9981;
+ this.probtable[i++] = 0.9987;
+ this.partial = false;
+ }
+ public boolean estimate(int time, double variance2, boolean fake) {
+ if(!fake) {
+ this.time += time;
+ this.variance += variance2;
+ } else {
+ this.partial = true;
+ }
+ --this.stages;
+ if(this.stages == 0) {
+ //System.out.print("variance2: " + (int)(this.variance*100) + "(/100); ");
+ this.variance = Math.sqrt(this.variance);
+ //System.out.println("variance: " + (int)(this.variance*100) + "(/100)");
+ return true;
+ }
+ return false;
+ }
+ public double getProbability(int x, int y) {
+ int l = x;
+ int r = y;
+ if(x > y) {
+ l = y;
+ r = x;
+ }
+ double prob = prob(r) - prob(l);
+ return prob;
+ }
+ private double prob(int s) {
+ int tmp = (int)((s - this.time) * 10 / this.variance);
+ //System.out.println(tmp);
+ int abs = (int)Math.abs(tmp);
+ double prob = 0;
+ if(abs > this.probtable.length - 1) {
+ prob = 1;
+ } else {
+ prob = this.probtable[abs];
+ }
+ if(tmp < 0) {
+ return 1.0 - prob;
+ } else {
+ return prob;
+ }
+ }
+ public int getTime() {
+ return this.time;
+ }
+ public double getVariance() {
+ return this.variance;
+ }
+ public boolean isPartial() {
+ return this.partial;
+ }
+import java.io.FileInputStream;
+public class PERT {
+ int stageNum;
+ Stage[] stages;
+ Estimator estimator;
+ public PERT() {
+ this.stageNum = -1;
+ //this.stages = null;
+ //this.estimator = null;
+ }
+ public Estimator getEstimator() {
+ return estimator;
+ }
+ public void setEstimator(Estimator estimator) {
+ this.estimator = estimator;
+ }
+ public void setStageNum(int stageNum) {
+ this.stageNum = stageNum;
+ }
+ public void createStages() {
+ this.stages = new Stage[this.stageNum];
+ for(int i = 0; i < stageNum; ++i) {
+ this.stages[i] = new Stage(i);
+ }
+ }
+ public void sampling() {
+ for(int i = 0; i < this.stageNum; ++i) {
+ this.stages[i].sampling();
+ }
+ }
+ public void estimate() {
+ for(int i = 0; i < this.stageNum; ++i) {
+ this.stages[i].estimate();
+ }
+ }
+ public void merge() {
+ for(int i = 0; i < this.stageNum; ++i) {
+ Stage tmp = this.stages[i];
+ this.estimator.estimate(tmp.getAntTime(), tmp.getAntVariance2(), false);
+ }
+ }
+ public static void main(String args[]) {
+// try{
+ PERT pert = new PERT();
+ String path = new String("/home/jzhou/pert/conf.txt");
+ FileInputStream iStream = new FileInputStream(path);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System./*out.println*/printString("Error! Can not read from configure file: " + path + "\n");
+ System.exit(-1);
+ }
+ iStream.close();
+ String content = new String(b, 0, length);
+ int index = content.indexOf('\n');
+ int stage = Integer.parseInt(content.substring(0, index));
+ Estimator estimator = new Estimator(stage);
+ pert.setStageNum(stage);
+ pert.setEstimator(estimator);
+ pert.createStages();
+ pert.sampling();
+ pert.estimate();
+ pert.merge();
+ path = new String("/home/jzhou/pert/prob.txt");
+ iStream = new FileInputStream(path);
+ byte c[] = new byte[1024];
+ length = iStream.read(c);
+ if(length < 0) {
+ System./*out.println*/printString("Error! Can not read from input file: " + path + "\n");
+ System.exit(-1);
+ }
+ iStream.close();
+ content = new String(c, 0, length);
+ index = content.indexOf('\n');
+ int x = Integer.parseInt(content.substring(0, index));
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ int y = Integer.parseInt(content.substring(0, index));
+ //System.out.println("x: " + x + "; y: " + y);
+ System./*out.println*/printString("The anticipate days need to finish this project is: " + pert.getEstimator().getTime() + "\n");
+ System./*out.println*/printString("And the anticipate variance is: " + (int)(pert.getEstimator().getVariance()*100) + "(/100)\n");
+ double prob = pert.getEstimator().getProbability(x, y);
+ System./*out.println*/printString("The probability of this project to be finished in " + x + " to " + y + " days is: " + (int)(prob*100) + "(/100)\n");
+/* } catch(Exception e) {
+ e.printStackTrace();
+ System.exit(-1);
+ }*/
+ }
+import java.util.Random;
+public class Stage {
+ int ID;
+ int[] samplings;
+ int optime;
+ int nortime;
+ int petime;
+ int time;
+ double variance2;
+ public Stage(int id) {
+ this.ID = id;
+ this.samplings = new int[10];
+ for(int i = 0; i < this.samplings.length; ++i) {
+ this.samplings[i] = 0;
+ }
+ this.optime = 0;
+ this.nortime = 0;
+ this.petime = 0;
+ this.time = 0;
+ this.variance2 = 0;
+ }
+ public void sampling() {
+ /*if(ID % 2 == 1) {
+ int tmp = samplings[samplings.length];
+ }*/
+ Random r = new Random();
+ int tint = 0;
+ for(int i = 0; i < this.samplings.length; ++i) {
+ do {
+ tint = r.nextInt()%50;
+ } while(tint <= 0);
+ this.samplings[i] = tint;
+ System./*out.print*/printString(tint + "; ");
+ }
+ System.printString("\n");//out.println();
+ }
+ public void estimate() {
+ /*if(ID % 2 == 1) {
+ int tmp = samplings[samplings.length];
+ }*/
+ int highest = this.samplings[0];
+ int lowest = this.samplings[0];
+ int sum = this.samplings[0];
+ for(int i = 1; i < this.samplings.length; ++i) {
+ int temp = this.samplings[i];
+ if(temp > highest) {
+ highest = temp;
+ } else if(temp < lowest) {
+ lowest = temp;
+ }
+ sum += temp;
+ }
+ sum = sum - highest - lowest;
+ int ordinary = sum / (this.samplings.length - 2);
+ this.optime = lowest;;
+ this.petime = highest;
+ this.nortime = ordinary;
+ this.time = (this.optime + 4 * this.nortime + this.petime) / 6;
+ this.variance2 = (double)(this.optime - this.petime) * (double)(this.optime - this.petime) / 36.0;
+ //System.out.println("Op time: " + this.optime + "; Nor time: " + this.nortime + "; Pe time: " + this.petime + "; variance2: " + (int)(this.variance2*100) + "(/100)");
+ }
+ public int getAntTime() {
+ return this.time;
+ }
+ public double getAntVariance2() {
+ return this.variance2;
+ }
+public class Estimator {
+ flag estimate;
+ flag prob;
+ int stages;
+ int time;
+ double variance;
+ double[] probtable;
+ boolean partial;
+ public Estimator(int stages) {
+ this.stages = stages;
+ this.time = 0;
+ this.variance = 0;
+ this.probtable = new double[31];
+ int i = 0;
+ this.probtable[i++] = 0.5000;
+ this.probtable[i++] = 0.5398;
+ this.probtable[i++] = 0.5793;
+ this.probtable[i++] = 0.6179;
+ this.probtable[i++] = 0.6554;
+ this.probtable[i++] = 0.6915;
+ this.probtable[i++] = 0.7257;
+ this.probtable[i++] = 0.7580;
+ this.probtable[i++] = 0.7881;
+ this.probtable[i++] = 0.8159;
+ this.probtable[i++] = 0.8413;
+ this.probtable[i++] = 0.8643;
+ this.probtable[i++] = 0.8849;
+ this.probtable[i++] = 0.9032;
+ this.probtable[i++] = 0.9192;
+ this.probtable[i++] = 0.9332;
+ this.probtable[i++] = 0.9452;
+ this.probtable[i++] = 0.9554;
+ this.probtable[i++] = 0.9641;
+ this.probtable[i++] = 0.9713;
+ this.probtable[i++] = 0.9772;
+ this.probtable[i++] = 0.9821;
+ this.probtable[i++] = 0.9861;
+ this.probtable[i++] = 0.9893;
+ this.probtable[i++] = 0.9918;
+ this.probtable[i++] = 0.9938;
+ this.probtable[i++] = 0.9953;
+ this.probtable[i++] = 0.9965;
+ this.probtable[i++] = 0.9974;
+ this.probtable[i++] = 0.9981;
+ this.probtable[i++] = 0.9987;
+ this.partial = false;
+ }
+ public boolean estimate(int time, double variance2, boolean fake) {
+ if(!fake) {
+ this.time += time;
+ this.variance += variance2;
+ } else {
+ this.partial = true;
+ }
+ --this.stages;
+ if(this.stages == 0) {
+ //System.printString("variance2: " + (int)(this.variance*100) + "(/100); ");
+ this.variance = Math.sqrt(this.variance);
+ //System.printString("variance: " + (int)(this.variance*100) + "(/100)\n");
+ return true;
+ }
+ return false;
+ }
+ public double getProbability(int x, int y) {
+ int l = x;
+ int r = y;
+ if(x > y) {
+ l = y;
+ r = x;
+ }
+ double prob = prob(r) - prob(l);
+ return prob;
+ }
+ private double prob(int s) {
+ int tmp = (int)((s - this.time) * 10 / this.variance);
+ //System.printString(tmp + "\n");
+ int abs = (int)Math.abs(tmp);
+ double prob = 0;
+ if(abs > this.probtable.length - 1) {
+ prob = 1;
+ } else {
+ prob = this.probtable[abs];
+ }
+ if(tmp < 0) {
+ return 1.0 - prob;
+ } else {
+ return prob;
+ }
+ }
+ public int getTime() {
+ return this.time;
+ }
+ public double getVariance() {
+ return this.variance;
+ }
+ public boolean isPartial() {
+ return this.partial;
+ }
+task startup(StartupObject s{initialstate}) {
+ // read in configuration parameters
+ //System.printString("Top of task startup\n");
+ String path = new String("/home/jzhou/pert/conf.txt");
+ FileInputStream iStream = new FileInputStream(path);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from configure file: " + path + "\n");
+ System.exit(-1);
+ }
+ iStream.close();
+ String content = new String(b, 0, length);
+ int index = content.indexOf('\n');
+ int stages = Integer.parseInt(content.subString(0, index));
+ Estimator estimator = new Estimator(stages){estimate};
+ for(int i = 0; i < stages; ++i) {
+ Stage stage = new Stage(i){sampling};
+ }
+ taskexit(s{!initialstate});
+task sampling(Stage s{sampling}) {
+ //System.printString("Top of task sampling\n");
+ s.sampling();
+ taskexit(s{!sampling, estimate});
+task estimateStage(Stage s{estimate}) {
+ //System.printString("Top of task estimateStage\n");
+ s.estimate();
+ taskexit(s{!estimate, merge});
+task estimate(Estimator e{estimate}, optional Stage s{merge}) {
+ //System.printString("Top of task estimate\n");
+ boolean fake = false;
+ if(!isavailable(s)) {
+ fake = true;
+ }
+ boolean finish = e.estimate(s.getAntTime(), s.getAntVariance2(), fake);
+ if(finish) {
+ taskexit(e{!estimate, prob}, s{!merge});
+ } else {
+ taskexit(s{!merge});
+ }
+task prob(Estimator e{prob}) {
+ //System.printString("Top of task prob\n");
+ if(e.isPartial()) {
+ System.printString("There are some sampling data unavailable. The anticipate probability may be greater than it should be!\n");
+ }
+ String path = new String("/home/jzhou/pert/prob.txt");
+ FileInputStream iStream = new FileInputStream(path);
+ byte b[] = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from input file: " + path + "\n");
+ System.exit(-1);
+ }
+ iStream.close();
+ String content = new String(b, 0, length);
+ int index = content.indexOf('\n');
+ int x = Integer.parseInt(content.subString(0, index));
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ int y = Integer.parseInt(content.subString(0, index));
+ //System.printString("x: " + x + "; y: " + y + "\n");
+ System.printString("The anticipate days need to finish this project is: " + e.getTime() + "\n");
+ System.printString("And the anticipate variance is: " + (int)(e.getVariance()*100) + "(/100)\n");
+ double prob = e.getProbability(x, y);
+ System.printString("The probability of this project to be finished in " + x + " to " + y + " days is: " + (int)(prob*100) + "%\n");
+ taskexit(e{!prob});
+public class Stage {
+ flag sampling;
+ flag estimate;
+ flag merge;
+ int ID;
+ int[] samplings;
+ int optime;
+ int nortime;
+ int petime;
+ int time;
+ double variance2;
+ public Stage(int id) {
+ this.ID = id;
+ this.samplings = new int[10];
+ for(int i = 0; i < this.samplings.length; ++i) {
+ this.samplings[i] = 0;
+ }
+ this.optime = 0;
+ this.nortime = 0;
+ this.petime = 0;
+ this.time = 0;
+ this.variance2 = 0;
+ }
+ public void sampling() {
+ /*if(ID % 2 == 1) {
+ int tmp = samplings[samplings.length];
+ }*/
+ Random r = new Random();
+ int tint = 0;
+ for(int i = 0; i < this.samplings.length; ++i) {
+ do {
+ tint = r.nextInt()%50;
+ } while(tint <= 0);
+ this.samplings[i] = tint;
+ //System.printString(tint + "; ");
+ }
+ //System.printString("\n");
+ }
+ public void estimate() {
+ /*if(ID % 2 == 1) {
+ int tmp = samplings[samplings.length];
+ }*/
+ int highest = this.samplings[0];
+ int lowest = this.samplings[0];
+ int sum = this.samplings[0];
+ for(int i = 1; i < this.samplings.length; ++i) {
+ int temp = this.samplings[i];
+ if(temp > highest) {
+ highest = temp;
+ } else if(temp < lowest) {
+ lowest = temp;
+ }
+ sum += temp;
+ }
+ sum = sum - highest - lowest;
+ int ordinary = sum / (this.samplings.length - 2);
+ this.optime = lowest;;
+ this.petime = highest;
+ this.nortime = ordinary;
+ this.time = (this.optime + 4 * this.nortime + this.petime) / 6;
+ this.variance2 = (double)(this.optime - this.petime) * (double)(this.optime - this.petime) / 36.0;
+ //System.printString("Op time: " + this.optime + "; Nor time: " + this.nortime + "; Pe time: " + this.petime + "; variance2: " + (int)(this.variance2*100) + "(/100)\n");
+ }
+ public int getAntTime() {
+ return this.time;
+ }
+ public double getAntVariance2() {
+ return this.variance2;
+ }