1 task startup(StartupObject s{initialstate}) {
2 // read in configuration parameters
3 //System.printString("Top of task startup\n");
4 String path = new String("/scratch/mapreduce_nor/conf.txt");
5 FileInputStream iStream = new FileInputStream(path);
6 byte[] b = new byte[1024];
7 int length = iStream.read(b);
9 System.printString("Error! Can not read from configure file: " + path + "\n");
12 String content = new String(b, 0, length);
13 //System.printString(content + "\n");
14 int index = content.indexOf('\n');
15 String inputfile = content.subString(0, index);
16 content = content.subString(index + 1);
17 index = content.indexOf('\n');
18 int m = Integer.parseInt(content.subString(0, index));
19 content = content.subString(index + 1);
20 index = content.indexOf('\n');
21 int r = Integer.parseInt(content.subString(0, index));
22 content = content.subString(index + 1);
23 index = content.indexOf('\n');
24 String temp = content.subString(0, index);
25 char seperator = temp.charAt(0);
26 //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27 Splitter splitter = new Splitter(inputfile, m, seperator);
28 Master master = new Master(m, r, splitter){mapoutput};//{split};
32 taskexit(s{!initialstate});
35 //Split the input file into M pieces
36 /*task split(Master master{split}) {
37 //System.printString("Top of task split\n");
40 taskexit(master{!split, assignMap});
43 //Select a map worker to handle one of the pieces of input file
44 task assignMap(Master master{assignMap}) {
45 //System.printString("Top of task assignMap\n");
48 taskexit(master{!assignMap, mapoutput});
51 //MapWorker do 'map' function on a input file piece
52 task map(MapWorker mworker{map}) {
53 //System.printString("Top of task map\n");
56 /*taskexit(mworker{!map, partition});
59 //Partition the intermediate key/value pair generated
60 //into R intermediate local files
61 task partition(MapWorker mworker{partition}) {
62 //System.printString("Top of task partition\n");*/
65 //taskexit(mworker{!partition, mapoutput});
66 taskexit(mworker{!map, mapoutput});
69 //Register the intermediate ouput from map worker to master
70 task mapOutput(Master master{mapoutput}, /*optional*/ MapWorker mworker{mapoutput}) {
71 //System.printString("Top of task mapOutput\n");
72 //if(isavailable(mworker)) {
73 int total = master.getR();
74 for(int i = 0; i < total; ++i) {
75 String temp = mworker.outputFile(i);
77 master.addInterOutput(temp);
80 master.setMapFinish(mworker.getID());
82 master.setMapFail(mworker.getID());
83 master.setPartial(true);
85 if(master.isMapFinish()) {
86 taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
89 taskexit(mworker{!mapoutput});
92 //Assign the list of intermediate output associated to one key to
94 task assignReduce(Master master{assignReduce}) {
95 //System.printString("Top of task assignReduce\n");
96 master.assignReduce();
98 taskexit(master{!assignReduce, reduceoutput});
101 //First do sort and group on the intermediate key/value pairs assigned
103 task sortgroup(ReduceWorker rworker{sortgroup}) {
104 //System.printString("Top of task sortgroup\n");
107 /*taskexit(rworker{!sortgroup, reduce});
110 //Do 'reduce' function
111 task reduce(ReduceWorker rworker{reduce}) {
112 //System.printString("Top of task reduce\n");*/
115 //taskexit(rworker{!reduce, reduceoutput});
116 taskexit(rworker{!sortgroup, reduceoutput});
119 //Collect the output into master
120 task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker{reduceoutput}) {
121 //System.printString("Top of task reduceOutput\n");
122 //if(isavailable(rworker)) {
123 master.collectROutput(rworker.getOutputFile());
124 master.setReduceFinish(rworker.getID());
126 master.setReduceFail(rworker.getID());
127 master.setPartial(true);
129 if(master.isReduceFinish()) {
130 //System.printString("reduce finish\n");
131 taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
134 taskexit(rworker{!reduceoutput});
137 task output(Master master{output}) {
138 //System.printString("Top of task output\n");
139 /*if(master.isPartial()) {
140 System.printString("Partial! The result may not be right due to some failure!\n");
142 System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
143 taskexit(master{!output});