adding a test case
[IRC.git] / Robust / src / Benchmarks / MapReduce / Tag / MapReduce.java
1 task startup(StartupObject s{initialstate}) {
2     // read in configuration parameters
3     // System.printString("Top of task startup\n");
4     String path = new String("/scratch/mapreduce_opt/conf.txt");
5     FileInputStream iStream = new FileInputStream(path);
6     byte[] b = new byte[1024];
7     int length = iStream.read(b);
8     if(length < 0 ) {
9         System.printString("Error! Can not read from configure file: " + path + "\n");
10         System.exit(-1);
11     }
12     String content = new String(b, 0, length);
13     //System.printString(content + "\n");
14     int index = content.indexOf('\n');
15     String inputfile = content.subString(0, index);
16     content = content.subString(index + 1);
17     index = content.indexOf('\n');
18     int m = Integer.parseInt(content.subString(0, index));
19     content = content.subString(index + 1);
20     index = content.indexOf('\n');
21     int r = Integer.parseInt(content.subString(0, index));
22     content = content.subString(index + 1);
23     index = content.indexOf('\n');
24     String temp = content.subString(0, index);
25     char seperator = temp.charAt(0);
26     //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
27     Splitter splitter = new Splitter(inputfile, m, seperator);
28     Master master = new Master(m, r, splitter){mapoutput};//{split};
29     
30     master.assignMap();
31
32     taskexit(s{!initialstate});
33 }
34
35 //Split the input file into M pieces
36 /*task split(Master master{split}) {
37     //System.printString("Top of task split\n");
38     //master.split();
39
40     taskexit(master{!split, assignMap});
41 }
42
43 //Select a map worker to handle one of the pieces of input file
44 task assignMap(Master master{assignMap}) {
45     //System.printString("Top of task assignMap\n");
46     master.assignMap();
47
48     //taskexit(master{!assignMap, mapoutput});
49     taskexit(master{!split, mapoutput});
50 }*/
51
52 //MapWorker do 'map' function on a input file piece
53 task map(MapWorker mworker{map}) {
54     //System.printString("Top of task map\n");
55     mworker.map();
56
57     /*taskexit(mworker{!map, partition});
58 }
59
60 //Partition the intermediate key/value pair generated
61 //into R intermediate local files
62 task partition(MapWorker mworker{partition}) {
63     //System.printString("Top of task partition\n");*/
64     mworker.partition();
65
66     taskexit(mworker{!map, mapoutput});
67     //taskexit(mworker{!partition, mapoutput});
68 }
69
70 //Register the intermediate ouput from map worker to master
71 task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) {
72     //System.printString("Top of task mapOutput\n");
73     if(isavailable(mworker)) {
74         int total = master.getR();
75         for(int i = 0; i < total; ++i) {
76             //System.printString("mapOutput\n");
77             String temp = mworker.outputFile(i);
78             if(temp != null) {
79                 master.addInterOutput(temp); 
80             }
81         }
82         master.setMapFinish(mworker.getID());
83     } else {
84         master.setMapFail(mworker.getID());
85         master.setPartial(true);
86     }
87     if(master.isMapFinish()) {
88         taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
89     }
90
91     taskexit(mworker{!mapoutput});
92 }
93
94 //Assign the list of intermediate output associated to one key to
95 //a reduce worker 
96 task assignReduce(Master master{assignReduce}) {
97     //System.printString("Top of task assignReduce\n");
98     master.assignReduce();
99
100     taskexit(master{!assignReduce, reduceoutput});
101 }
102
103 //First do sort and group on the intermediate key/value pairs assigned
104 //to reduce worker
105 task sortgroup(ReduceWorker rworker{sortgroup}) {
106     //System.printString("Top of task sortgroup\n");
107     rworker.sortgroup();
108
109     /*taskexit(rworker{!sortgroup, reduce});
110 }
111
112 //Do 'reduce' function
113 task reduce(ReduceWorker rworker{reduce}) {
114     //System.printString("Top of task reduce\n");*/
115     rworker.reduce();
116
117     //taskexit(rworker{!reduce, reduceoutput});
118     taskexit(rworker{!sortgroup, reduceoutput});
119 }
120
121 //Collect the output into master
122 task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{reduceoutput}) {
123     //System.printString("Top of task reduceOutput\n");
124     if(isavailable(rworker)) {
125         master.collectROutput(rworker.getOutputFile());
126         master.setReduceFinish(rworker.getID());
127     } else {
128         master.setReduceFail(rworker.getID());
129         master.setPartial(true);
130     }
131     if(master.isReduceFinish()) {
132         //System.printString("reduce finish\n");
133         taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
134     }
135
136     taskexit(rworker{!reduceoutput});
137 }
138
139 task output(Master master{output}) {
140     //System.printString("Top of task output\n");
141     /*if(master.isPartial()) {
142         System.printString("Partial! The result may not be right due to some failure!\n");
143     }
144     System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
145     taskexit(master{!output});
146 }