1 task startup(StartupObject s{initialstate}) {
2 // read in configuration parameters
3 // System.printString("Top of task startup\n");
4 String path = new String("/scratch/mapreduce_opt/conf.txt");
5 FileInputStream iStream = new FileInputStream(path);
6 byte[] b = new byte[1024];
7 int length = iStream.read(b);
9 System.printString("Error! Can not read from configure file: " + path + "\n");
12 String content = new String(b, 0, length);
13 //System.printString(content + "\n");
14 int index = content.indexOf('\n');
15 String inputfile = content.subString(0, index);
16 content = content.subString(index + 1);
17 index = content.indexOf('\n');
18 int m = Integer.parseInt(content.subString(0, index));
19 content = content.subString(index + 1);
20 index = content.indexOf('\n');
21 int r = Integer.parseInt(content.subString(0, index));
22 content = content.subString(index + 1);
23 index = content.indexOf('\n');
24 String temp = content.subString(0, index);
25 char seperator = temp.charAt(0);
26 //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27 Splitter splitter = new Splitter(inputfile, m, seperator);
28 Master master = new Master(m, r, splitter){mapoutput};//{split};
32 taskexit(s{!initialstate});
35 //Split the input file into M pieces
36 /*task split(Master master{split}) {
37 //System.printString("Top of task split\n");
40 taskexit(master{!split, assignMap});
43 //Select a map worker to handle one of the pieces of input file
44 task assignMap(Master master{assignMap}) {
45 //System.printString("Top of task assignMap\n");
48 //taskexit(master{!assignMap, mapoutput});
49 taskexit(master{!split, mapoutput});
52 //MapWorker do 'map' function on a input file piece
53 task map(MapWorker mworker{map}) {
54 //System.printString("Top of task map\n");
57 /*taskexit(mworker{!map, partition});
60 //Partition the intermediate key/value pair generated
61 //into R intermediate local files
62 task partition(MapWorker mworker{partition}) {
63 //System.printString("Top of task partition\n");*/
66 taskexit(mworker{!map, mapoutput});
67 //taskexit(mworker{!partition, mapoutput});
70 //Register the intermediate ouput from map worker to master
71 task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
72 //System.printString("Top of task mapOutput\n");
73 if(isavailable(mworker)) {
74 int total = master.getR();
75 for(int i = 0; i < total; ++i) {
76 //System.printString("mapOutput\n");
77 String temp = mworker.outputFile(i);
79 master.addInterOutput(temp);
82 master.setMapFinish(mworker.getID());
84 master.setMapFail(mworker.getID());
85 master.setPartial(true);
87 if(master.isMapFinish()) {
88 taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
91 taskexit(mworker{!mapoutput});
94 //Assign the list of intermediate output associated to one key to
96 task assignReduce(Master master{assignReduce}) {
97 //System.printString("Top of task assignReduce\n");
98 master.assignReduce();
100 taskexit(master{!assignReduce, reduceoutput});
103 //First do sort and group on the intermediate key/value pairs assigned
105 task sortgroup(ReduceWorker rworker{sortgroup}) {
106 //System.printString("Top of task sortgroup\n");
109 /*taskexit(rworker{!sortgroup, reduce});
112 //Do 'reduce' function
113 task reduce(ReduceWorker rworker{reduce}) {
114 //System.printString("Top of task reduce\n");*/
117 //taskexit(rworker{!reduce, reduceoutput});
118 taskexit(rworker{!sortgroup, reduceoutput});
121 //Collect the output into master
122 task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
123 //System.printString("Top of task reduceOutput\n");
124 if(isavailable(rworker)) {
125 master.collectROutput(rworker.getOutputFile());
126 master.setReduceFinish(rworker.getID());
128 master.setReduceFail(rworker.getID());
129 master.setPartial(true);
131 if(master.isReduceFinish()) {
132 //System.printString("reduce finish\n");
133 taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
136 taskexit(rworker{!reduceoutput});
139 task output(Master master{output}) {
140 //System.printString("Top of task output\n");
141 /*if(master.isPartial()) {
142 System.printString("Partial! The result may not be right due to some failure!\n");
144 System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
145 taskexit(master{!output});