1 task t1(StartupObject s{initialstate}) {
2 System.printString("task t1\n");
4 String inputfile = "Manila International Airport Authority spokesman Octavio Lina said there were no injuries, but some of the 345 passengers vomited after disembarking, AP reported. Video of the incident shows passengers applauding as the plane landed safely.";
8 Splitter splitter = new Splitter(inputfile, m, seperator);
9 Master master = new Master(m, r, splitter){split};
11 taskexit(s{!initialstate});
14 //Split the input file into M pieces
15 task t2(Master master{split}) {
16 System.printString("task t2\n");
20 taskexit(master{!split, assignMap});
23 //Select a map worker to handle one of the pieces of input file
24 task t3(Master master{assignMap}) {
25 System.printString("task t3\n");
28 Splitter splitter = master.getSplitter();
29 String[] contentsplits = splitter.getSlices();
30 for(int i = 0; i < contentsplits.length; ++i) {
31 MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], master.getR(), i){map};
35 taskexit(master{!assignMap, mapoutput});
38 //MapWorker do 'map' function on a input file piece
39 task t4(MapWorker mworker{map}) {
40 System.printString("task t4\n");
44 taskexit(mworker{!map, partition});
47 //Partition the intermediate key/value pair generated
48 //into R intermediate local files
49 task t5(MapWorker mworker{partition}) {
50 System.printString("task t5\n");
54 taskexit(mworker{!partition, mapoutput});
57 //Register the intermediate ouput from map worker to master
58 task t6(Master master{mapoutput}, MapWorker mworker{mapoutput}) {
59 System.printString("task t6\n");
61 int total = master.getR();
62 for(int i = 0; i < total; ++i) {
63 OutputCollector temp = mworker.outputFile(i);
65 master.addInterOutput(temp, i);
68 master.setMapFinish(mworker.getID());
70 if(master.isMapFinish()) {
71 taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
74 taskexit(mworker{!mapoutput});
77 //Assign the list of intermediate output associated to one key to
79 task t7(Master master{assignReduce}) {
80 System.printString("task t7\n");
82 //master.assignReduce();
83 Vector[] interoutputs = master.getInteroutputs();
84 for(int i = 0; i < interoutputs.length; ++i) {
85 ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
86 master.setReduceInit(i);
89 taskexit(master{!assignReduce, reduceoutput});
92 //First do sort and group on the intermediate key/value pairs assigned
94 task t8(ReduceWorker rworker{sortgroup}) {
95 System.printString("task t8\n");
99 taskexit(rworker{!sortgroup, reduce});
102 //Do 'reduce' function
103 task t9(ReduceWorker rworker{reduce}) {
104 System.printString("task t9\n");
108 taskexit(rworker{!reduce, reduceoutput});
111 //Collect the output into master
112 task t10(Master master{reduceoutput}, ReduceWorker rworker{reduceoutput}) {
113 System.printString("task t10\n");
115 master.collectROutput(rworker.getOutput());
116 master.setReduceFinish(rworker.getID());
118 if(master.isReduceFinish()) {
119 taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
122 taskexit(rworker{!reduceoutput});
125 task t11(Master master{output}) {
126 System.printString("task t11\n");
128 /*if(master.isPartial()) {
129 System.printString("Partial! The result may not be right due to some failure!\n");
131 System.printString("Finish!\n");// Results are in the output file: " + master.getOutputFile() + "\n");
132 System.printI(0xdddd);
133 taskexit(master{!output});