adding a test case
[IRC.git] / Robust / src / Benchmarks / MapReduce / Nor / MapReduce.java
1 task startup(StartupObject s{initialstate}) {
2     // read in configuration parameters
3     //System.printString("Top of task startup\n");
4     String path = new String("/scratch/mapreduce_nor/conf.txt");
5     FileInputStream iStream = new FileInputStream(path);
6     byte[] b = new byte[1024];
7     int length = iStream.read(b);
8     if(length < 0 ) {
9         System.printString("Error! Can not read from configure file: " + path + "\n");
10         System.exit(-1);
11     }
12     String content = new String(b, 0, length);
13     //System.printString(content + "\n");
14     int index = content.indexOf('\n');
15     String inputfile = content.subString(0, index);
16     content = content.subString(index + 1);
17     index = content.indexOf('\n');
18     int m = Integer.parseInt(content.subString(0, index));
19     content = content.subString(index + 1);
20     index = content.indexOf('\n');
21     int r = Integer.parseInt(content.subString(0, index));
22     content = content.subString(index + 1);
23     index = content.indexOf('\n');
24     String temp = content.subString(0, index);
25     char seperator = temp.charAt(0);
26     //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27     Splitter splitter = new Splitter(inputfile, m, seperator);
28     Master master = new Master(m, r, splitter){mapoutput};//{split};
29     
30     master.assignMap();
31
32     taskexit(s{!initialstate});
33 }
34
35 //Split the input file into M pieces
36 /*task split(Master master{split}) {
37     //System.printString("Top of task split\n");
38     master.split();
39
40     taskexit(master{!split, assignMap});
41 }
42
43 //Select a map worker to handle one of the pieces of input file
44 task assignMap(Master master{assignMap}) {
45     //System.printString("Top of task assignMap\n");
46     master.assignMap();
47
48     taskexit(master{!assignMap, mapoutput});
49 }*/
50
51 //MapWorker do 'map' function on a input file piece
52 task map(MapWorker mworker{map}) {
53     //System.printString("Top of task map\n");
54     mworker.map();
55
56     /*taskexit(mworker{!map, partition});
57 }
58
59 //Partition the intermediate key/value pair generated
60 //into R intermediate local files
61 task partition(MapWorker mworker{partition}) {
62     //System.printString("Top of task partition\n");*/
63     mworker.partition();
64
65     //taskexit(mworker{!partition, mapoutput});
66     taskexit(mworker{!map, mapoutput});
67 }
68
69 //Register the intermediate ouput from map worker to master
70 task mapOutput(Master master{mapoutput}, /*optional*/ MapWorker mworker{mapoutput}) {
71     //System.printString("Top of task mapOutput\n");
72     //if(isavailable(mworker)) {
73         int total = master.getR();
74         for(int i = 0; i < total; ++i) {
75             String temp = mworker.outputFile(i);
76             if(temp != null) {
77                 master.addInterOutput(temp); 
78             }
79         }
80         master.setMapFinish(mworker.getID());
81     /*} else {
82         master.setMapFail(mworker.getID());
83         master.setPartial(true);
84     }*/
85     if(master.isMapFinish()) {
86         taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
87     }
88
89     taskexit(mworker{!mapoutput});
90 }
91
92 //Assign the list of intermediate output associated to one key to
93 //a reduce worker 
94 task assignReduce(Master master{assignReduce}) {
95     //System.printString("Top of task assignReduce\n");
96     master.assignReduce();
97
98     taskexit(master{!assignReduce, reduceoutput});
99 }
100
101 //First do sort and group on the intermediate key/value pairs assigned
102 //to reduce worker
103 task sortgroup(ReduceWorker rworker{sortgroup}) {
104     //System.printString("Top of task sortgroup\n");
105     rworker.sortgroup();
106
107     /*taskexit(rworker{!sortgroup, reduce});
108 }
109
110 //Do 'reduce' function
111 task reduce(ReduceWorker rworker{reduce}) {
112     //System.printString("Top of task reduce\n");*/
113     rworker.reduce();
114
115     //taskexit(rworker{!reduce, reduceoutput});
116     taskexit(rworker{!sortgroup, reduceoutput});
117 }
118
119 //Collect the output into master
120 task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker{reduceoutput}) {
121     //System.printString("Top of task reduceOutput\n");
122     //if(isavailable(rworker)) {
123         master.collectROutput(rworker.getOutputFile());
124         master.setReduceFinish(rworker.getID());
125    /* } else {
126         master.setReduceFail(rworker.getID());
127         master.setPartial(true);
128     }*/
129     if(master.isReduceFinish()) {
130         //System.printString("reduce finish\n");
131         taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
132     }
133
134     taskexit(rworker{!reduceoutput});
135 }
136
137 task output(Master master{output}) {
138     //System.printString("Top of task output\n");
139     /*if(master.isPartial()) {
140         System.printString("Partial! The result may not be right due to some failure!\n");
141     }
142     System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
143     taskexit(master{!output});
144 }