-//package mapreduce;
-
public class Configuration {
MapReduceBase mapreducer;
-//package mapreduce;
-
public class Configured {
Configuration conf;
-//package mapreduce;
-
public class JobClient{
public JobClient() {}
// split input file
//System.printString("Split\n");
- master.split();
+ //master.split();
// do 'map'
//System.printString("Map\n");
}
//assert(master.isReduceFinish());
- System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ //System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
}
}
-//package mapreduce;
-
-//import java.util.Vector;
-
-public /*abstract*/ class MapReduceBase {
+public class MapReduceBase {
public MapReduceBase() {}
- public /*abstract*/ void map(String key, String value, OutputCollector output);
+ public void map(String key, String value, OutputCollector output);
- public /*abstract*/ void reduce(String key, Vector values, OutputCollector output);
+ public void reduce(String key, Vector values, OutputCollector output);
}
-//package mapreduce;
-
-//import java.io.FileOutputStream;
-
public class MapWorker {
int ID;
MapReduceBase mapreducer;
-
int r;
String key;
String value;
OutputCollector output;
-
- String[] locations;
- FileOutputStream[] outputs;
+ String locationPrefix;
+ boolean[] outputsexit;
public MapWorker(String key, String value, int r, int id) {
this.ID = id;
this.key = key;
this.value = value;
this.output = new OutputCollector();
-
- locations = new String[r];
- for(int i = 0; i < r; ++i) {
- StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
- temp.append(String.valueOf(ID));
- temp.append("-of-");
- temp.append(String.valueOf(r));
- temp.append("_");
- temp.append(String.valueOf(i));
- temp.append(".dat");
- locations[i] = new String(temp);
- }
-
- outputs = new FileOutputStream[r];
- for(int i = 0; i < r; ++i) {
- outputs[i] = null;
- }
+ this.locationPrefix = "/scratch/mapreduce_java/output-intermediate-map-";
+
+ this.outputsexit = new boolean[r];
}
public MapReduceBase getMapreducer() {
}
public void map() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
-
this.mapreducer.map(key, value, output);
+ this.key = null;
+ this.value = null;
}
public void partition() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
+ FileOutputStream[] outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
- //try{
- int size = this.output.size();
- for(int i = 0; i < size; ++i) {
- String key = this.output.getKey(i);
- String value = this.output.getValue(i);
- // use the hashcode of key to decide which intermediate output
- // this pair should be in
- //int hash = key.hashCode();
- int index = (int)Math.abs(key.hashCode()) % this.r;
- FileOutputStream oStream = outputs[index];
- if(oStream == null) {
- // open the file
- String filepath = locations[index];
- oStream = new FileOutputStream(filepath, true); // append
- outputs[index] = oStream;
- }
- // format: key value\n
- oStream.write(key.getBytes());
- oStream.write(' ');
- oStream.write(value.getBytes());
- oStream.write('\n');
- oStream.flush();
+ int size = this.output.size();
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // use the hashcode of key to decide which intermediate output
+ // this pair should be in
+ //int hash = key.hashCode();
+ int index = (int)Math.abs(key.hashCode()) % this.r;
+ FileOutputStream oStream = outputs[index];
+ if(oStream == null) {
+ // open the file
+ String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
+ oStream = new FileOutputStream(filepath, true); // append
+ outputs[index] = oStream;
+ this.outputsexit[index] = true;
}
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
- // close the output files
- for(int i = 0; i < this.outputs.length; ++i) {
- FileOutputStream temp = this.outputs[i];
- if(temp != null) {
- temp.close();
- }
+ // close the output files
+ for(int i = 0; i < outputs.length; ++i) {
+ FileOutputStream temp = outputs[i];
+ if(temp != null) {
+ temp.close();
+ outputs[i] = null;
}
- /*} catch(Exception e) {
- e.printStackTrace();
- System.exit(-1);
- }*/
+ }
+
+ this.output = null;
}
public String outputFile(int i) {
- if(outputs[i] != null) {
- return locations[i];
+ if(outputsexit[i]) {
+ StringBuffer temp = new StringBuffer(this.locationPrefix);
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ return new String(temp);
} else {
return null;
}
-//package mapreduce;
-
-/*import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.util.Vector;*/
-
public class Master {
int m;
int r;
int[] mworkerStates; // array of map worker's state
- // 0: idle 1: process 2: finished 3: fail
+ // 0: idle 1: process 2: finished 3: fail
int[] rworkerStates; // array of reduce worker's state
Vector[] interoutputs; // array of string vector containing
- // paths of intermediate outputs from
- // map worker
-
+ // paths of intermediate outputs from
+ // map worker
Splitter splitter;
-
String outputfile; // path of final output file
-
boolean partial;
public Master(int m, int r, Splitter splitter) {
this.m = m;
this.r = r;
-
- mworkerStates = new int[m];
- rworkerStates = new int[r];
- for(int i = 0; i < m; ++i) {
- mworkerStates[i] = 0;
- }
- for(int i = 0; i < r; ++i) {
- rworkerStates[i] = 0;
- }
-
- interoutputs = new Vector[r];
- for(int i = 0; i < r; ++i) {
- interoutputs[i] = null;
- }
-
+ this.mworkerStates = new int[m];
+ this.rworkerStates = new int[r];
+ this.interoutputs = new Vector[r];
this.splitter = splitter;
this.outputfile = new String("/scratch/mapreduce_java/output.dat");
-
this.partial = false;
}
this.partial = partial || this.partial;
}
- public void split() {
+ /*public void split() {
splitter.split();
- }
+ }*/
public MapWorker[] assignMap() {
- String[] contentsplits = splitter.getSlices();
+ String[] contentsplits = splitter.split();//splitter.getSlices();
MapWorker[] mworkers = new MapWorker[contentsplits.length];
for(int i = 0; i < contentsplits.length; ++i) {
//System.printString("*************************\n");
mworkerStates[i] = 1;
mworkers[i] = mworker;
}
+ this.splitter = null;
return mworkers;
}
ReduceWorker rworker = new ReduceWorker(interoutputs[i], i);
rworkerStates[i] = 1;
rworkers[i] = rworker;
+ this.interoutputs[i] = null;
}
+ this.interoutputs.clear();
return rworkers;
}
}
public void collectROutput(String file) {
- //try{
- FileInputStream iStream = new FileInputStream(file);
- FileOutputStream oStream = new FileOutputStream(outputfile, true);
- byte[] b = new byte[1024 * 10];
- int length = iStream.read(b);
- if(length < 0) {
- System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
- System.exit(-1);
- }
- //System.printString(new String(b, 0, length) + "\n");
- oStream.write(b, 0, length);
- iStream.close();
- oStream.close();
- /*} catch(Exception e) {
- e.printStackTrace();
+ FileInputStream iStream = new FileInputStream(file);
+ FileOutputStream oStream = new FileOutputStream(outputfile, true);
+ byte[] b = new byte[1024 * 10];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
System.exit(-1);
- }*/
+ }
+ //System.printString(new String(b, 0, length) + "\n");
+ oStream.write(b, 0, length);
+ iStream.close();
+ oStream.close();
}
}
-//package mapreduce;
-
-//import java.util.Vector;
-
public class OutputCollector {
Vector keys;
-//package mapreduce;
-
-/*import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Vector;*/
-
-import mapreduce.MapReduceBase;
-
public class ReduceWorker {
int ID;
public ReduceWorker(Vector interoutputs, int id) {
this.ID = id;
this.mapreducer = null;
-
this.interoutputs = interoutputs;
-
this.keys = new Vector();
this.values = new HashMap();
- //this.sorts = null;
-
this.output = new OutputCollector();
this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
}
}
public void sortgroup() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
// group values associated to the same key
//System.printString("================================\n");
if(interoutputs == null) {
return;
}
- //try{
- for(int i = 0; i < interoutputs.size(); ++i) {
- FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
- byte[] b = new byte[1024 * 10];
- int length = iStream.read(b);
- if(length < 0) {
- System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
- System.exit(-1);
- }
- String content = new String(b, 0, length);
- //System.printString(content + "\n");
- int index = content.indexOf('\n');
- while(index != -1) {
- String line = content.substring(0, index);
- content = content.substring(index + 1);
- //System.printString(line + "\n");
- int tmpindex = line.indexOf(' ');
- String key = line.substring(0, tmpindex);
- String value = line.substring(tmpindex + 1);
- //System.printString(key + "; " + value + "\n");
- if(!this.values.containsKey(key)) {
- this.values.put(key, new Vector());
- this.keys.addElement(key);
- }
- ((Vector)this.values.get(key)).addElement(value);
- index = content.indexOf('\n');
- }
- iStream.close();
+ for(int i = 0; i < interoutputs.size(); ++i) {
+ FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+ byte[] b = new byte[1024 * 10];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+ System.exit(-1);
}
- //System.printString("================================\n");
-
- /*for(int i = 0; i < this.keys.size(); ++i) {
- System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ while(index != -1) {
+ String line = content.subString(0, index);
+ content = content.subString(index + 1);
+ //System.printString(line + "\n");
+ int tmpindex = line.indexOf(' ');
+ String key = line.subString(0, tmpindex);
+ String value = line.subString(tmpindex + 1);
+ //System.printString(key + "; " + value + "\n");
+ if(!this.values.containsKey(key)) {
+ this.values.put(key, new Vector());
+ this.keys.addElement(key);
}
- System.printString("\n");*/
+ ((Vector)this.values.get(key)).addElement(value);
+ index = content.indexOf('\n');
+ }
+ iStream.close();
+ }
+ //System.printString("================================\n");
- // sort all the keys inside interoutputs
- this.sorts = new int[this.keys.size()];
- // insert sorting
- this.sorts[0] = 0;
- int tosort = 1;
- for(; tosort < this.keys.size(); ++tosort) {
- int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
- int index = tosort;
- for(int i = tosort; i > 0; --i) {
- if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
- this.sorts[i] = this.sorts[i-1];
- index = i - 1;
- } else {
- //System.printString(i + "; " + tosort + "\n");
- index = i;
- i = 0;
- }
+ /*for(int i = 0; i < this.keys.size(); ++i) {
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
+
+ // sort all the keys inside interoutputs
+ this.sorts = new int[this.keys.size()];
+ // insert sorting
+ this.sorts[0] = 0;
+ int tosort = 1;
+ for(; tosort < this.keys.size(); ++tosort) {
+ int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+ int index = tosort;
+ for(int i = tosort; i > 0; --i) {
+ if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+ this.sorts[i] = this.sorts[i-1];
+ index = i - 1;
+ } else {
+ //System.printString(i + "; " + tosort + "\n");
+ index = i;
+ i = 0;
}
- this.sorts[index] = tosort;
}
- /*for(int i = 0; i < this.sorts.length; ++i) {
- System.printString(this.sorts[i] + "; ");
- }
- System.printString("\n");*/
- /*} catch(IOException e) {
- e.printStackTrace();
- System.exit(-1);
- }*/
+ this.sorts[index] = tosort;
+ }
+ /*for(int i = 0; i < this.sorts.length; ++i) {
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
}
public void reduce() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
if(this.interoutputs != null) {
- //return;
- //}
- for(int i = 0; i < this.sorts.length; ++i) {
- String key = (String)this.keys.elementAt(this.sorts[i]);
- Vector values = (Vector)this.values.get(key);
- this.mapreducer.reduce(key, values, output);
- }
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ this.mapreducer.reduce(key, values, output);
+ }
}
- //try{
- // output all the result into some local file
- int size = this.output.size();
- FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
- for(int i = 0; i < size; ++i) {
- String key = this.output.getKey(i);
- String value = this.output.getValue(i);
- // format: key value\n
- oStream.write(key.getBytes());
- oStream.write(' ');
- oStream.write(value.getBytes());
- oStream.write('\n');
- oStream.flush();
- }
- oStream.close();
- /*} catch(Exception e) {
- e.printStackTrace();
- System.exit(-1);
- }*/
+ // output all the result into some local file
+ int size = this.output.size();
+ FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ oStream.close();
+ this.keys = null;
+ this.values = null;
}
public String getOutputFile() {
-//package mapreduce;
-
-//import java.io.FileInputStream;
-//import java.io.IOException;
-
public class Splitter {
String filename;
String content;
int length;
- int[] splits;
- String[] slices;
+ int splitNum;
+ char seperator;
public Splitter(String path, int splitNum, char seperator) {
- //try{
- //System.printString("Top of Splitter's constructor\n");
- filename = path;
- FileInputStream iStream = new FileInputStream(filename);
- byte[] b = new byte[1024 * 1024];
- length = iStream.read(b);
- if(length < 0) {
- System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
- System.exit(-1);
- }
- content = new String(b, 0, length);
- //System.printString(content + "\n");
- iStream.close();
+ //System.printString("Top of Splitter's constructor\n");
+ filename = path;
+ this.length = -1;
+ this.splitNum = splitNum;
+ this.seperator = seperator;
+ }
- if(splitNum == 1) {
- slices = new String[1];
- slices[0] = content;
- } else {
- splits = new int[splitNum - 1];
- int index = 0;
- int span = length / splitNum;
- int temp = 0;
- for(int i = 0; i < splitNum - 1; ++i) {
- temp += span;
- if(temp > index) {
- index = temp;
- while((content.charAt(index) != seperator) && (index != length - 1)) {
- ++index;
- }
- }
- splits[i] = index;
- }
+ public String[] split() {
+ int[] splits;
+ String[] slices;
+
+ FileInputStream iStream = new FileInputStream(filename);
+ byte[] b = new byte[1024 * 1024];
+ length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from input file: " + filename + "\n");
+ System.exit(-1);
+ }
+ content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ iStream.close();
- this.slices = new String[splits.length + 1];
- for(int i = 0; i < this.slices.length; ++i) {
- this.slices[i] = null;
+ if(splitNum == 1) {
+ slices = new String[1];
+ slices[0] = content;
+ this.content = null;
+ } else {
+ splits = new int[splitNum - 1];
+ int index = 0;
+ int span = length / splitNum;
+ int temp = 0;
+ for(int i = 0; i < splitNum - 1; ++i) {
+ temp += span;
+ if(temp > index) {
+ index = temp;
+ while((content.charAt(index) != seperator) && (index != length - 1)) {
+ ++index;
+ }
}
+ splits[i] = index;
}
- /*} catch(IOException e) {
- e.printStackTrace();
- System.exit(-1);
- }*/
- }
- public void split() {
- if(slices.length == 1) {
- return;
- }
- int start = 0;
- int end = 0;
- for(int i = 0; i < splits.length; ++i) {
- end = splits[i];
- if(end < start) {
- slices[i] = null;
- } else {
- slices[i] = content.substring(start, end);
+ slices = new String[splitNum];
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.subString(start, end);
+ }
+ start = end + 1;
}
- start = end + 1;
+ slices[slices.length - 1] = content.subString(start);
+ this.content = null;
}
- slices[slices.length - 1] = content.substring(start);
+ return slices;
}
public String getFilename() {
return filename;
}
-
- public String[] getSlices() {
- return this.slices;
- }
}
-//package mapreduce;
-
-public /*interface*/ class Tool {
+public class Tool {
public int run(String[] args);
}
-//package mapreduce;
-
public class ToolRunner {
public static int run(Tool tool, String[] args) {
-
-//import java.io.FileInputStream;
-//import java.util.Vector;
-
-/*import mapreduce.Configuration;
-import mapreduce.Configured;
-import mapreduce.JobClient;
-import mapreduce.MapReduceBase;
-import mapreduce.OutputCollector;
-import mapreduce.Tool;
-import mapreduce.ToolRunner;*/
-
/**
* Counts the words in each line.
* For each line of input, break the line into words and emit them as
}
}
-public class WordCounter /*implements*/extends Tool {
+public class WordCounter extends Tool {
- public WordCounter() {}
+ public WordCounter() {}
static int printUsage() {
- System./*out.println*/printString("<conffile>\n");
+ System.printString("<conffile>\n");
return -1;
}
* job tracker.
*/
public int run(String[] args) {
- //try {
- MapReduceClass mapreducer = new MapReduceClass();
-
- FileInputStream iStream = new FileInputStream(args[0]);
- byte[] b = new byte[1024];
- int length = iStream.read(b);
- if(length < 0 ) {
- System./*out.println*/printString("Error! Can not read from configure file: " + args[0] + "\n");
- System.exit(-1);
- }
- String content = new String(b, 0, length);
- //System.out.println(content);
- int index = content.indexOf('\n');
- String inputfile = content.substring(0, index);
- content = content.substring(index + 1);
- index = content.indexOf('\n');
- int m = Integer.parseInt(content.substring(0, index));
- content = content.substring(index + 1);
- index = content.indexOf('\n');
- int r = Integer.parseInt(content.substring(0, index));
- content = content.substring(index + 1);
- index = content.indexOf('\n');
- String temp = content.substring(0, index);
- char seperator = temp.charAt(0);
- //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
-
- Configuration conf = new Configuration();
- conf.setMapReduceClass(mapreducer);
- conf.setInputfile(inputfile);
- conf.setM(m);
- conf.setR(r);
- conf.setSeperator(seperator);
-
- JobClient.runJob(conf);
- /*} catch (Exception e) {
- e.printStackTrace();
+ MapReduceClass mapreducer = new MapReduceClass();
+
+ FileInputStream iStream = new FileInputStream(args[0]);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0 ) {
+ System.printString("Error! Can not read from configure file: " + args[0] + "\n");
System.exit(-1);
- }*/
+ }
+ String content = new String(b, 0, length);
+ //System.out.println(content);
+ int index = content.indexOf('\n');
+ String inputfile = content.substring(0, index);
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ int m = Integer.parseInt(content.substring(0, index));
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ int r = Integer.parseInt(content.substring(0, index));
+ content = content.substring(index + 1);
+ index = content.indexOf('\n');
+ String temp = content.substring(0, index);
+ char seperator = temp.charAt(0);
+ //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r));
+
+ Configuration conf = new Configuration();
+ conf.setMapReduceClass(mapreducer);
+ conf.setInputfile(inputfile);
+ conf.setM(m);
+ conf.setR(r);
+ conf.setSeperator(seperator);
+
+ JobClient.runJob(conf);
return 0;
}
- public static void main(String[] args) /*throws Exception*/ {
+ public static void main(String[] args) {
int res = ToolRunner.run(new WordCounter(), args);
System.exit(res);
}
task startup(StartupObject s{initialstate}) {
// read in configuration parameters
- // System.printString("Top of task startup\n");
+ //System.printString("Top of task startup\n");
String path = new String("/scratch/mapreduce_nor/conf.txt");
FileInputStream iStream = new FileInputStream(path);
byte[] b = new byte[1024];
char seperator = temp.charAt(0);
//System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
Splitter splitter = new Splitter(inputfile, m, seperator);
- Master master = new Master(m, r, splitter){split};
+ Master master = new Master(m, r, splitter){mapoutput};//{split};
+
+ master.assignMap();
taskexit(s{!initialstate});
}
//Split the input file into M pieces
-task split(Master master{split}) {
+/*task split(Master master{split}) {
//System.printString("Top of task split\n");
master.split();
master.assignMap();
taskexit(master{!assignMap, mapoutput});
-}
+}*/
//MapWorker do 'map' function on a input file piece
task map(MapWorker mworker{map}) {
//System.printString("Top of task map\n");
mworker.map();
- taskexit(mworker{!map, partition});
+ /*taskexit(mworker{!map, partition});
}
//Partition the intermediate key/value pair generated
//into R intermediate local files
task partition(MapWorker mworker{partition}) {
- //System.printString("Top of task partition\n");
+ //System.printString("Top of task partition\n");*/
mworker.partition();
- taskexit(mworker{!partition, mapoutput});
+ //taskexit(mworker{!partition, mapoutput});
+ taskexit(mworker{!map, mapoutput});
}
//Register the intermediate ouput from map worker to master
//System.printString("Top of task sortgroup\n");
rworker.sortgroup();
- taskexit(rworker{!sortgroup, reduce});
+ /*taskexit(rworker{!sortgroup, reduce});
}
//Do 'reduce' function
task reduce(ReduceWorker rworker{reduce}) {
- //System.printString("Top of task reduce\n");
+ //System.printString("Top of task reduce\n");*/
rworker.reduce();
- taskexit(rworker{!reduce, reduceoutput});
+ //taskexit(rworker{!reduce, reduceoutput});
+ taskexit(rworker{!sortgroup, reduceoutput});
}
//Collect the output into master
task output(Master master{output}) {
//System.printString("Top of task output\n");
- if(master.isPartial()) {
+ /*if(master.isPartial()) {
System.printString("Partial! The result may not be right due to some failure!\n");
}
- System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
taskexit(master{!output});
}
flag mapoutput;
int ID;
-
int r;
String key;
String value;
OutputCollector output;
-
- String[] locations;
- FileOutputStream[] outputs;
+ String locationPrefix;
+ boolean[] outputsexit;
public MapWorker(String key, String value, int r, int id) {
this.ID = id;
this.key = key;
this.value = value;
this.output = new OutputCollector();
-
- locations = new String[r];
- for(int i = 0; i < r; ++i) {
- StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-");
- temp.append(String.valueOf(ID));
- temp.append("-of-");
- temp.append(String.valueOf(r));
- temp.append("_");
- temp.append(String.valueOf(i));
- temp.append(".dat");
- locations[i] = new String(temp);
- }
-
- outputs = new FileOutputStream[r];
- for(int i = 0; i < r; ++i) {
- outputs[i] = null;
- }
+ this.locationPrefix = "/scratch/mapreduce_nor/output-intermediate-map-";
+
+ this.outputsexit = new boolean[r];
}
public void map() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
-
MapReduceBase.map(key, value, output);
+ this.key = null;
+ this.value = null;
}
public void partition() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
-
+ FileOutputStream[] outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
+
int size = this.output.size();
for(int i = 0; i < size; ++i) {
String key = this.output.getKey(i);
FileOutputStream oStream = outputs[index];
if(oStream == null) {
// open the file
- String filepath = locations[index];
+ String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
+ //System.printString("partition: " + filepath + "\n");
oStream = new FileOutputStream(filepath, true); // append
outputs[index] = oStream;
+ this.outputsexit[index] = true;
}
// format: key value\n
oStream.write(key.getBytes());
}
// close the output files
- for(int i = 0; i < this.outputs.length; ++i) {
- FileOutputStream temp = this.outputs[i];
+ for(int i = 0; i < outputs.length; ++i) {
+ FileOutputStream temp = outputs[i];
if(temp != null) {
temp.close();
+ outputs[i] = null;
}
}
+
+ this.output = null;
}
public String outputFile(int i) {
- if(outputs[i] != null) {
- return locations[i];
+ if(outputsexit[i]) {
+ StringBuffer temp = new StringBuffer(this.locationPrefix);
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ return new String(temp);
} else {
return null;
}
int m;
int r;
int[] mworkerStates; // array of map worker's state
- // 0: idle 1: process 2: finished 3: fail
+ // 0: idle 1: process 2: finished 3: fail
int[] rworkerStates; // array of reduce worker's state
Vector[] interoutputs; // array of string vector containing
- // paths of intermediate outputs from
- // map worker
-
+ // paths of intermediate outputs from
+ // map worker
Splitter splitter;
-
String outputfile; // path of final output file
-
boolean partial;
public Master(int m, int r, Splitter splitter) {
this.m = m;
this.r = r;
-
- mworkerStates = new int[m];
- rworkerStates = new int[r];
- for(int i = 0; i < m; ++i) {
- mworkerStates[i] = 0;
- }
- for(int i = 0; i < r; ++i) {
- rworkerStates[i] = 0;
- }
-
- interoutputs = new Vector[r];
- for(int i = 0; i < r; ++i) {
- interoutputs[i] = null;
- }
-
+ this.mworkerStates = new int[m];
+ this.rworkerStates = new int[r];
+ this.interoutputs = new Vector[r];
this.splitter = splitter;
this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
-
this.partial = false;
}
this.partial = partial || this.partial;
}
- public void split() {
+ /*public void split() {
splitter.split();
- }
+ }*/
public void assignMap() {
- String[] contentsplits = splitter.getSlices();
+ String[] contentsplits = splitter.split();//splitter.getSlices();
for(int i = 0; i < contentsplits.length; ++i) {
//System.printString("*************************\n");
//System.printString(contentsplits[i] + "\n");
MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
mworkerStates[i] = 1;
}
+
+ this.splitter = null;
}
public void setMapFinish(int i) {
for(int i = 0; i < interoutputs.length; ++i) {
ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
rworkerStates[i] = 1;
+ this.interoutputs[i] = null;
}
+ this.interoutputs.clear();
}
public void setReduceFinish(int i) {
int ID;
Vector interoutputs; // string vector containing paths
- // of intermediate outputs from map worker
+ // of intermediate outputs from map worker
Vector keys;
HashMap values; // hashmap map key to vector of string vector
int[] sorts; // array record the sort of keys
public ReduceWorker(Vector interoutputs, int id) {
this.ID = id;
this.interoutputs = interoutputs;
-
this.keys = new Vector();
this.values = new HashMap();
- //this.sorts = null;
-
this.output = new OutputCollector();
this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
}
public void sortgroup() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
// group values associated to the same key
//System.printString("================================\n");
if(interoutputs == null) {
//System.printString("================================\n");
/*for(int i = 0; i < this.keys.size(); ++i) {
- System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
- }
- System.printString("\n");*/
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
// sort all the keys inside interoutputs
this.sorts = new int[this.keys.size()];
this.sorts[index] = tosort;
}
/*for(int i = 0; i < this.sorts.length; ++i) {
- System.printString(this.sorts[i] + "; ");
- }
- System.printString("\n");*/
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
}
public void reduce() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
if(this.interoutputs != null) {
- // return;
- //}
- for(int i = 0; i < this.sorts.length; ++i) {
- String key = (String)this.keys.elementAt(this.sorts[i]);
- Vector values = (Vector)this.values.get(key);
- MapReduceBase.reduce(key, values, output);
- }
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ MapReduceBase.reduce(key, values, output);
+ }
}
// output all the result into some local file
oStream.flush();
}
oStream.close();
+ this.keys = null;
+ this.values = null;
}
public String getOutputFile() {
String filename;
String content;
int length;
- int[] splits;
- String[] slices;
+ int splitNum;
+ char seperator;
public Splitter(String path, int splitNum, char seperator) {
//System.printString("Top of Splitter's constructor\n");
filename = path;
+ this.length = -1;
+ this.splitNum = splitNum;
+ this.seperator = seperator;
+ }
+
+ public String[] split() {
+ int[] splits;
+ String[] slices;
+
FileInputStream iStream = new FileInputStream(filename);
byte[] b = new byte[1024 * 1024];
length = iStream.read(b);
if(splitNum == 1) {
slices = new String[1];
slices[0] = content;
+ this.content = null;
} else {
splits = new int[splitNum - 1];
int index = 0;
splits[i] = index;
}
- this.slices = new String[splits.length + 1];
- for(int i = 0; i < this.slices.length; ++i) {
- this.slices[i] = null;
- }
- }
- }
-
- public void split() {
- if(slices.length == 1) {
- return;
- }
- int start = 0;
- int end = 0;
- for(int i = 0; i < splits.length; ++i) {
- end = splits[i];
- if(end < start) {
- slices[i] = null;
- } else {
- slices[i] = content.subString(start, end);
+ slices = new String[splitNum];
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.subString(start, end);
+ }
+ start = end + 1;
}
- start = end + 1;
+ slices[slices.length - 1] = content.subString(start);
+ this.content = null;
}
- slices[slices.length - 1] = content.subString(start);
+ return slices;
}
public String getFilename() {
return filename;
}
-
- public String[] getSlices() {
- return this.slices;
- }
}
char seperator = temp.charAt(0);
//System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
Splitter splitter = new Splitter(inputfile, m, seperator);
- Master master = new Master(m, r, splitter){split};
+ Master master = new Master(m, r, splitter){mapoutput};//{split};
+
+ master.assignMap();
taskexit(s{!initialstate});
}
//Split the input file into M pieces
-task split(Master master{split}) {
+/*task split(Master master{split}) {
//System.printString("Top of task split\n");
- master.split();
+ //master.split();
taskexit(master{!split, assignMap});
}
//System.printString("Top of task assignMap\n");
master.assignMap();
- taskexit(master{!assignMap, mapoutput});
-}
+ //taskexit(master{!assignMap, mapoutput});
+ taskexit(master{!split, mapoutput});
+}*/
//MapWorker do 'map' function on a input file piece
task map(MapWorker mworker{map}) {
//System.printString("Top of task map\n");
mworker.map();
- taskexit(mworker{!map, partition});
+ /*taskexit(mworker{!map, partition});
}
//Partition the intermediate key/value pair generated
//into R intermediate local files
task partition(MapWorker mworker{partition}) {
- //System.printString("Top of task partition\n");
+ //System.printString("Top of task partition\n");*/
mworker.partition();
- taskexit(mworker{!partition, mapoutput});
+ taskexit(mworker{!map, mapoutput});
+ //taskexit(mworker{!partition, mapoutput});
}
//Register the intermediate ouput from map worker to master
if(isavailable(mworker)) {
int total = master.getR();
for(int i = 0; i < total; ++i) {
+ //System.printString("mapOutput\n");
String temp = mworker.outputFile(i);
if(temp != null) {
master.addInterOutput(temp);
//System.printString("Top of task sortgroup\n");
rworker.sortgroup();
- taskexit(rworker{!sortgroup, reduce});
+ /*taskexit(rworker{!sortgroup, reduce});
}
//Do 'reduce' function
task reduce(ReduceWorker rworker{reduce}) {
- //System.printString("Top of task reduce\n");
+ //System.printString("Top of task reduce\n");*/
rworker.reduce();
- taskexit(rworker{!reduce, reduceoutput});
+ //taskexit(rworker{!reduce, reduceoutput});
+ taskexit(rworker{!sortgroup, reduceoutput});
}
//Collect the output into master
task output(Master master{output}) {
//System.printString("Top of task output\n");
- if(master.isPartial()) {
+ /*if(master.isPartial()) {
System.printString("Partial! The result may not be right due to some failure!\n");
}
- System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/
taskexit(master{!output});
}
flag mapoutput;
int ID;
-
int r;
String key;
String value;
OutputCollector output;
-
- String[] locations;
- FileOutputStream[] outputs;
+ String locationPrefix;
+ boolean[] outputsexit;
public MapWorker(String key, String value, int r, int id) {
this.ID = id;
this.key = key;
this.value = value;
this.output = new OutputCollector();
-
- locations = new String[r];
- for(int i = 0; i < r; ++i) {
- StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
- temp.append(String.valueOf(ID));
- temp.append("-of-");
- temp.append(String.valueOf(r));
- temp.append("_");
- temp.append(String.valueOf(i));
- temp.append(".dat");
- locations[i] = new String(temp);
- }
-
- outputs = new FileOutputStream[r];
- for(int i = 0; i < r; ++i) {
- outputs[i] = null;
- }
+ this.locationPrefix = "/scratch/mapreduce_opt/output-intermediate-map-";
+
+ this.outputsexit = new boolean[r];
}
public void map() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
-
MapReduceBase.map(key, value, output);
+ this.key = null;
+ this.value = null;
}
public void partition() {
- /*if(ID % 2 == 1) {
- String temp = locations[locations.length];
- }*/
+ FileOutputStream[] outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
int size = this.output.size();
for(int i = 0; i < size; ++i) {
FileOutputStream oStream = outputs[index];
if(oStream == null) {
// open the file
- String filepath = locations[index];
+ String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat";
oStream = new FileOutputStream(filepath, true); // append
outputs[index] = oStream;
+ this.outputsexit[index] = true;
}
// format: key value\n
oStream.write(key.getBytes());
}
// close the output files
- for(int i = 0; i < this.outputs.length; ++i) {
- FileOutputStream temp = this.outputs[i];
+ for(int i = 0; i < outputs.length; ++i) {
+ FileOutputStream temp = outputs[i];
if(temp != null) {
temp.close();
+ outputs[i] = null;
}
}
+
+ this.output = null;
}
public String outputFile(int i) {
- if(outputs[i] != null) {
- return locations[i];
+ if(outputsexit[i]) {
+ StringBuffer temp = new StringBuffer(this.locationPrefix);
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ return new String(temp);
} else {
return null;
}
int m;
int r;
int[] mworkerStates; // array of map worker's state
- // 0: idle 1: process 2: finished 3: fail
+ // 0: idle 1: process 2: finished 3: fail
int[] rworkerStates; // array of reduce worker's state
Vector[] interoutputs; // array of string vector containing
- // paths of intermediate outputs from
- // map worker
-
+ // paths of intermediate outputs from
+ // map worker
Splitter splitter;
-
String outputfile; // path of final output file
-
boolean partial;
public Master(int m, int r, Splitter splitter) {
this.m = m;
this.r = r;
-
- mworkerStates = new int[m];
- rworkerStates = new int[r];
- for(int i = 0; i < m; ++i) {
- mworkerStates[i] = 0;
- }
- for(int i = 0; i < r; ++i) {
- rworkerStates[i] = 0;
- }
-
- interoutputs = new Vector[r];
- for(int i = 0; i < r; ++i) {
- interoutputs[i] = null;
- }
-
+ this.mworkerStates = new int[m];
+ this.rworkerStates = new int[r];
+ this.interoutputs = new Vector[r];
this.splitter = splitter;
this.outputfile = new String("/scratch/mapreduce_opt/output.dat");
-
this.partial = false;
}
this.partial = partial || this.partial;
}
- public void split() {
+ /*public void split() {
splitter.split();
- }
+ }*/
public void assignMap() {
- String[] contentsplits = splitter.getSlices();
+ String[] contentsplits = splitter.split();//splitter.getSlices();
for(int i = 0; i < contentsplits.length; ++i) {
//System.printString("*************************\n");
//System.printString(contentsplits[i] + "\n");
MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
mworkerStates[i] = 1;
}
+
+ this.splitter = null;
}
public void setMapFinish(int i) {
for(int i = 0; i < interoutputs.length; ++i) {
ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
rworkerStates[i] = 1;
+ this.interoutputs[i] = null;
}
+ this.interoutputs.clear();
}
public void setReduceFinish(int i) {
int ID;
Vector interoutputs; // string vector containing paths
- // of intermediate outputs from map worker
+ // of intermediate outputs from map worker
Vector keys;
HashMap values; // hashmap map key to vector of string vector
int[] sorts; // array record the sort of keys
public ReduceWorker(Vector interoutputs, int id) {
this.ID = id;
this.interoutputs = interoutputs;
-
this.keys = new Vector();
this.values = new HashMap();
- //this.sorts = null;
-
this.output = new OutputCollector();
this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
}
public void sortgroup() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
// group values associated to the same key
//System.printString("================================\n");
if(interoutputs == null) {
//System.printString("================================\n");
/*for(int i = 0; i < this.keys.size(); ++i) {
- System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
- }
- System.printString("\n");*/
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
// sort all the keys inside interoutputs
this.sorts = new int[this.keys.size()];
this.sorts[index] = tosort;
}
/*for(int i = 0; i < this.sorts.length; ++i) {
- System.printString(this.sorts[i] + "; ");
- }
- System.printString("\n");*/
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
}
public void reduce() {
- /*if(ID % 2 == 1) {
- int a[] = new int[1];
- int temp = a[1];
- }*/
-
if(this.interoutputs != null) {
- // return;
- //}
- for(int i = 0; i < this.sorts.length; ++i) {
- String key = (String)this.keys.elementAt(this.sorts[i]);
- Vector values = (Vector)this.values.get(key);
- MapReduceBase.reduce(key, values, output);
- }
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ MapReduceBase.reduce(key, values, output);
+ }
}
// output all the result into some local file
oStream.flush();
}
oStream.close();
+ this.keys = null;
+ this.values = null;
}
public String getOutputFile() {
String filename;
String content;
int length;
- int[] splits;
- String[] slices;
+ int splitNum;
+ char seperator;
public Splitter(String path, int splitNum, char seperator) {
//System.printString("Top of Splitter's constructor\n");
filename = path;
+ this.length = -1;
+ this.splitNum = splitNum;
+ this.seperator = seperator;
+ }
+
+ public String[] split() {
+ int[] splits;
+ String[] slices;
+
FileInputStream iStream = new FileInputStream(filename);
byte[] b = new byte[1024 * 1024];
length = iStream.read(b);
if(splitNum == 1) {
slices = new String[1];
slices[0] = content;
+ this.content = null;
} else {
splits = new int[splitNum - 1];
int index = 0;
splits[i] = index;
}
- this.slices = new String[splits.length + 1];
- for(int i = 0; i < this.slices.length; ++i) {
- this.slices[i] = null;
- }
- }
- }
-
- public void split() {
- if(slices.length == 1) {
- return;
- }
- int start = 0;
- int end = 0;
- for(int i = 0; i < splits.length; ++i) {
- end = splits[i];
- if(end < start) {
- slices[i] = null;
- } else {
- slices[i] = content.subString(start, end);
+ slices = new String[splitNum];
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.subString(start, end);
+ }
+ start = end + 1;
}
- start = end + 1;
+ slices[slices.length - 1] = content.subString(start);
+ this.content = null;
}
- slices[slices.length - 1] = content.subString(start);
+ return slices;
}
public String getFilename() {
return filename;
}
-
- public String[] getSlices() {
- return this.slices;
- }
}