From 3839d4880626950a705cbd40a49d35389b6e9dc8 Mon Sep 17 00:00:00 2001 From: jzhou Date: Tue, 18 Mar 2008 21:05:21 +0000 Subject: [PATCH] Make the java version and two Bristlecone versions be able to run in parellel --- .../Benchmarks/MapReduce/Java/MapWorker.java | 2 +- .../src/Benchmarks/MapReduce/Java/Master.java | 8 +- .../MapReduce/Java/ReduceWorker.java | 11 +- .../Benchmarks/MapReduce/Java/Splitter.java | 2 +- .../MapReduce/Java/WordCounter.java | 25 +-- .../Benchmarks/MapReduce/Nor/MapReduce.java | 140 ++++++++++++++++ .../MapReduce/Nor/MapReduceBase.java | 55 +++++++ .../Benchmarks/MapReduce/Nor/MapWorker.java | 102 ++++++++++++ .../src/Benchmarks/MapReduce/Nor/Master.java | 149 ++++++++++++++++++ .../MapReduce/Nor/OutputCollector.java | 27 ++++ .../MapReduce/Nor/ReduceWorker.java | 138 ++++++++++++++++ .../Benchmarks/MapReduce/Nor/Splitter.java | 73 +++++++++ .../Benchmarks/MapReduce/Tag/MapReduce.java | 2 +- .../Benchmarks/MapReduce/Tag/MapWorker.java | 2 +- .../src/Benchmarks/MapReduce/Tag/Master.java | 6 +- .../MapReduce/Tag/ReduceWorker.java | 4 +- .../Benchmarks/MapReduce/Tag/Splitter.java | 2 +- 17 files changed, 720 insertions(+), 28 deletions(-) create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/Master.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java create mode 100644 Robust/src/Benchmarks/MapReduce/Nor/Splitter.java diff --git a/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java index c31b575f..f5b59117 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java @@ -26,7 +26,7 @@ public class MapWorker { locations = new String[r]; for(int i = 0; i < r; ++i) { - StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-"); + StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-"); temp.append(String.valueOf(ID)); temp.append("-of-"); temp.append(String.valueOf(r)); diff --git a/Robust/src/Benchmarks/MapReduce/Java/Master.java b/Robust/src/Benchmarks/MapReduce/Java/Master.java index 92ce404d..39ac5cc4 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Master.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Master.java @@ -40,7 +40,7 @@ public class Master { } this.splitter = splitter; - this.outputfile = new String("/home/jzhou/mapreduce/output.dat"); + this.outputfile = new String("/scratch/mapreduce_java/output.dat"); this.partial = false; } @@ -98,9 +98,9 @@ public class Master { } public void addInterOutput(String interoutput) { - int start = interoutput.indexOf('_'); + int start = interoutput.lastindexOf('_'); int end = interoutput.indexOf('.'); - int index = Integer.parseInt(interoutput.substring(start + 1, end)); + int index = Integer.parseInt(interoutput.subString(start + 1, end)); //System.printString(interoutput.subString(start + 1, end) + "\n"); if(interoutputs[index] == null) { interoutputs[index] = new Vector(); @@ -140,7 +140,7 @@ public class Master { //try{ FileInputStream iStream = new FileInputStream(file); FileOutputStream oStream = new FileOutputStream(outputfile, true); - byte[] b = new byte[1024 * 100]; + byte[] b = new byte[1024 * 10]; int length = iStream.read(b); if(length < 0) { System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n"); diff --git a/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java index b18ce209..4559882b 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java @@ -32,7 +32,7 @@ public class ReduceWorker { //this.sorts = null; this.output = new OutputCollector(); - this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; + this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; } public MapReduceBase getMapreducer() { @@ -57,7 +57,7 @@ public class ReduceWorker { //try{ for(int i = 0; i < interoutputs.size(); ++i) { FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i)); - byte[] b = new byte[1024 * 100]; + byte[] b = new byte[1024 * 10]; int length = iStream.read(b); if(length < 0) { System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n"); @@ -126,14 +126,15 @@ public class ReduceWorker { int temp = a[1]; }*/ - if(this.interoutputs == null) { - return; - } + if(this.interoutputs != null) { + //return; + //} for(int i = 0; i < this.sorts.length; ++i) { String key = (String)this.keys.elementAt(this.sorts[i]); Vector values = (Vector)this.values.get(key); this.mapreducer.reduce(key, values, output); } + } //try{ // output all the result into some local file diff --git a/Robust/src/Benchmarks/MapReduce/Java/Splitter.java b/Robust/src/Benchmarks/MapReduce/Java/Splitter.java index 342328b6..53cae4df 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Splitter.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Splitter.java @@ -15,7 +15,7 @@ public class Splitter { //System.printString("Top of Splitter's constructor\n"); filename = path; FileInputStream iStream = new FileInputStream(filename); - byte[] b = new byte[1024 * 10]; + byte[] b = new byte[1024 * 1024]; length = iStream.read(b); if(length < 0) { System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n"); diff --git a/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java b/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java index b8d961c9..c67a291b 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java +++ b/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java @@ -52,15 +52,22 @@ import mapreduce.ToolRunner;*/ } boolean isspace(char c) { - if((c == ' ') || - (c == '.') || - (c == '!') || - (c == '?') || - (c == '"') || - (c == '\n')) { - return true; - } - return false; + if((c == ' ') || + (c == ',') || + (c == '.') || + (c == '!') || + (c == '?') || + (c == '"') || + (c == '(') || + (c == ')') || + (c == '[') || + (c == ']') || + (c == '{') || + (c == '}') || + (c == '\n')) { + return true; + } + return false; } } diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java new file mode 100644 index 00000000..cfef9366 --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java @@ -0,0 +1,140 @@ +task startup(StartupObject s{initialstate}) { + // read in configuration parameters + // System.printString("Top of task startup\n"); + String path = new String("/scratch/mapreduce_nor/conf.txt"); + FileInputStream iStream = new FileInputStream(path); + byte[] b = new byte[1024]; + int length = iStream.read(b); + if(length < 0 ) { + System.printString("Error! Can not read from configure file: " + path + "\n"); + System.exit(-1); + } + String content = new String(b, 0, length); + //System.printString(content + "\n"); + int index = content.indexOf('\n'); + String inputfile = content.subString(0, index); + content = content.subString(index + 1); + index = content.indexOf('\n'); + int m = Integer.parseInt(content.subString(0, index)); + content = content.subString(index + 1); + index = content.indexOf('\n'); + int r = Integer.parseInt(content.subString(0, index)); + content = content.subString(index + 1); + index = content.indexOf('\n'); + String temp = content.subString(0, index); + char seperator = temp.charAt(0); + //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n"); + Splitter splitter = new Splitter(inputfile, m, seperator); + Master master = new Master(m, r, splitter){split}; + + taskexit(s{!initialstate}); +} + +//Split the input file into M pieces +task split(Master master{split}) { + System.printString("Top of task split\n"); + master.split(); + + taskexit(master{!split, assignMap}); +} + +//Select a map worker to handle one of the pieces of input file +task assignMap(Master master{assignMap}) { + System.printString("Top of task assignMap\n"); + master.assignMap(); + + taskexit(master{!assignMap, mapoutput}); +} + +//MapWorker do 'map' function on a input file piece +task map(MapWorker mworker{map}) { + System.printString("Top of task map\n"); + mworker.map(); + + taskexit(mworker{!map, partition}); +} + +//Partition the intermediate key/value pair generated +//into R intermediate local files +task partition(MapWorker mworker{partition}) { + System.printString("Top of task partition\n"); + mworker.partition(); + + taskexit(mworker{!partition, mapoutput}); +} + +//Register the intermediate ouput from map worker to master +task mapOutput(Master master{mapoutput}, /*optional*/ MapWorker mworker{mapoutput}) { + System.printString("Top of task mapOutput\n"); + //if(isavailable(mworker)) { + int total = master.getR(); + for(int i = 0; i < total; ++i) { + String temp = mworker.outputFile(i); + if(temp != null) { + master.addInterOutput(temp); + } + } + master.setMapFinish(mworker.getID()); + /*} else { + master.setMapFail(mworker.getID()); + master.setPartial(true); + }*/ + if(master.isMapFinish()) { + taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput}); + } + + taskexit(mworker{!mapoutput}); +} + +//Assign the list of intermediate output associated to one key to +//a reduce worker +task assignReduce(Master master{assignReduce}) { + System.printString("Top of task assignReduce\n"); + master.assignReduce(); + + taskexit(master{!assignReduce, reduceoutput}); +} + +//First do sort and group on the intermediate key/value pairs assigned +//to reduce worker +task sortgroup(ReduceWorker rworker{sortgroup}) { + System.printString("Top of task sortgroup\n"); + rworker.sortgroup(); + + taskexit(rworker{!sortgroup, reduce}); +} + +//Do 'reduce' function +task reduce(ReduceWorker rworker{reduce}) { + System.printString("Top of task reduce\n"); + rworker.reduce(); + + taskexit(rworker{!reduce, reduceoutput}); +} + +//Collect the output into master +task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker{reduceoutput}) { + System.printString("Top of task reduceOutput\n"); + //if(isavailable(rworker)) { + master.collectROutput(rworker.getOutputFile()); + master.setReduceFinish(rworker.getID()); + /* } else { + master.setReduceFail(rworker.getID()); + master.setPartial(true); + }*/ + if(master.isReduceFinish()) { + //System.printString("reduce finish\n"); + taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput}); + } + + taskexit(rworker{!reduceoutput}); +} + +task output(Master master{output}) { + System.printString("Top of task output\n"); + if(master.isPartial()) { + System.printString("Partial! The result may not be right due to some failure!\n"); + } + System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n"); + taskexit(master{!output}); +} diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java b/Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java new file mode 100644 index 00000000..3ff539d9 --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/MapReduceBase.java @@ -0,0 +1,55 @@ +public class MapReduceBase { + + public static void map(String key, String value, OutputCollector output) { + int n = value.length(); + for (int i = 0; i < n; ) { + // Skip past leading whitespace + while ((i < n) && isspace(value.charAt(i))) { + ++i; + } + + // Find word end + int start = i; + while ((i < n) && !isspace(value.charAt(i))) { + i++; + } + + if (start < i) { + output.emit(value.subString(start, i), "1"); + //System.printString(value.subString(start,i) + "\n"); + } + } + } + + public static void reduce(String key, Vector values, OutputCollector output) { + // Iterate over all entries with the + // // same key and add the values + int value = 0; + for(int i = 0; i < values.size(); ++i) { + value += Integer.parseInt((String)values.elementAt(i)); + } + + // Emit sum for input->key() + output.emit(key, String.valueOf(value)); + } + + static boolean isspace(char c) { + if((c == ' ') || + (c == ',') || + (c == '.') || + (c == '!') || + (c == '?') || + (c == '"') || + (c == '(') || + (c == ')') || + (c == '[') || + (c == ']') || + (c == '{') || + (c == '}') || + (c == '\n')) { + return true; + } + return false; + } +} + diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java new file mode 100644 index 00000000..0ab23fa7 --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java @@ -0,0 +1,102 @@ +public class MapWorker { + flag map; + flag partition; + flag mapoutput; + + int ID; + + int r; + String key; + String value; + OutputCollector output; + + String[] locations; + FileOutputStream[] outputs; + + public MapWorker(String key, String value, int r, int id) { + this.ID = id; + this.r = r; + + this.key = key; + this.value = value; + this.output = new OutputCollector(); + + locations = new String[r]; + for(int i = 0; i < r; ++i) { + StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-"); + temp.append(String.valueOf(ID)); + temp.append("-of-"); + temp.append(String.valueOf(r)); + temp.append("_"); + temp.append(String.valueOf(i)); + temp.append(".dat"); + locations[i] = new String(temp); + } + + outputs = new FileOutputStream[r]; + for(int i = 0; i < r; ++i) { + outputs[i] = null; + } + } + + public void map() { + /*if(ID % 2 == 1) { + String temp = locations[locations.length]; + }*/ + + MapReduceBase.map(key, value, output); + } + + public void partition() { + /*if(ID % 2 == 1) { + String temp = locations[locations.length]; + }*/ + + int size = this.output.size(); + for(int i = 0; i < size; ++i) { + String key = this.output.getKey(i); + String value = this.output.getValue(i); + // use the hashcode of key to decide which intermediate output + // this pair should be in + int index = (int)Math.abs(key.hashCode()) % this.r; + FileOutputStream oStream = outputs[index]; + if(oStream == null) { + // open the file + String filepath = locations[index]; + oStream = new FileOutputStream(filepath, true); // append + outputs[index] = oStream; + } + // format: key value\n + oStream.write(key.getBytes()); + oStream.write(' '); + oStream.write(value.getBytes()); + oStream.write('\n'); + oStream.flush(); + } + + // close the output files + for(int i = 0; i < this.outputs.length; ++i) { + FileOutputStream temp = this.outputs[i]; + if(temp != null) { + temp.close(); + } + } + } + + public String outputFile(int i) { + if(outputs[i] != null) { + return locations[i]; + } else { + return null; + } + } + + public int getID() { + return this.ID; + } + + public int getR() { + return this.r; + } + +} diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Master.java b/Robust/src/Benchmarks/MapReduce/Nor/Master.java new file mode 100644 index 00000000..4e21b9c3 --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/Master.java @@ -0,0 +1,149 @@ +public class Master { + flag split; + flag assignMap; + flag mapoutput; + flag mapfinished; + flag assignReduce; + flag reduceoutput; + flag reducefinished; + flag output; + + int m; + int r; + int[] mworkerStates; // array of map worker's state + // 0: idle 1: process 2: finished 3: fail + int[] rworkerStates; // array of reduce worker's state + Vector[] interoutputs; // array of string vector containing + // paths of intermediate outputs from + // map worker + + Splitter splitter; + + String outputfile; // path of final output file + + boolean partial; + + public Master(int m, int r, Splitter splitter) { + this.m = m; + this.r = r; + + mworkerStates = new int[m]; + rworkerStates = new int[r]; + for(int i = 0; i < m; ++i) { + mworkerStates[i] = 0; + } + for(int i = 0; i < r; ++i) { + rworkerStates[i] = 0; + } + + interoutputs = new Vector[r]; + for(int i = 0; i < r; ++i) { + interoutputs[i] = null; + } + + this.splitter = splitter; + this.outputfile = new String("/scratch/mapreduce_nor/output.dat"); + + this.partial = false; + } + + public int getR() { + return this.r; + } + + public String getOutputFile() { + return this.outputfile; + } + + public boolean isPartial() { + return this.partial; + } + + public void setPartial(boolean partial) { + this.partial = partial || this.partial; + } + + public void split() { + splitter.split(); + } + + public void assignMap() { + String[] contentsplits = splitter.getSlices(); + for(int i = 0; i < contentsplits.length; ++i) { + //System.printString("*************************\n"); + //System.printString(contentsplits[i] + "\n"); + //System.printString("*************************\n"); + MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map}; + mworkerStates[i] = 1; + } + } + + public void setMapFinish(int i) { + mworkerStates[i] = 2; + } + + public void setMapFail(int i) { + mworkerStates[i] = 3; + } + + public boolean isMapFinish() { + for(int i = 0; i < mworkerStates.length; ++i) { + if(mworkerStates[i] == 1) { + return false; + } + } + + return true; + } + + public void addInterOutput(String interoutput) { + int start = interoutput.lastindexOf('_'); + int end = interoutput.indexOf('.'); + int index = Integer.parseInt(interoutput.subString(start + 1, end)); + //System.printString(interoutput.subString(start + 1, end) + "\n"); + if(interoutputs[index] == null) { + interoutputs[index] = new Vector(); + } + interoutputs[index].addElement(interoutput); + } + + public void assignReduce() { + for(int i = 0; i < interoutputs.length; ++i) { + ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup}; + rworkerStates[i] = 1; + } + } + + public void setReduceFinish(int i) { + rworkerStates[i] = 2; + } + + public void setReduceFail(int i) { + rworkerStates[i] = 3; + } + + public boolean isReduceFinish() { + for(int i = 0; i < rworkerStates.length; ++i) { + if(rworkerStates[i] == 1) { + return false; + } + } + + return true; + } + + public void collectROutput(String file) { + FileInputStream iStream = new FileInputStream(file); + FileOutputStream oStream = new FileOutputStream(outputfile, true); + byte[] b = new byte[1024 * 10]; + int length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n"); + System.exit(-1); + } + //System.printString(new String(b, 0, length) + "\n"); + oStream.write(b, 0, length); + iStream.close(); + oStream.close(); + } +} diff --git a/Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java b/Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java new file mode 100644 index 00000000..3c6d852c --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/OutputCollector.java @@ -0,0 +1,27 @@ +public class OutputCollector { + + Vector keys; + Vector values; + + public OutputCollector() { + this.keys = new Vector(); + this.values = new Vector(); + } + + public void emit(String key, String value) { + this.keys.addElement(key); + this.values.addElement(value); + } + + public int size() { + return this.keys.size(); + } + + public String getKey(int i) { + return (String)this.keys.elementAt(i); + } + + public String getValue(int i) { + return (String)this.values.elementAt(i); + } +} diff --git a/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java new file mode 100644 index 00000000..a0e39ffa --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java @@ -0,0 +1,138 @@ +public class ReduceWorker { + flag sortgroup; + flag reduce; + flag reduceoutput; + + int ID; + Vector interoutputs; // string vector containing paths + // of intermediate outputs from map worker + Vector keys; + HashMap values; // hashmap map key to vector of string vector + int[] sorts; // array record the sort of keys + OutputCollector output; + String outputfile; // path of the intermediate output file + + public ReduceWorker(Vector interoutputs, int id) { + this.ID = id; + this.interoutputs = interoutputs; + + this.keys = new Vector(); + this.values = new HashMap(); + //this.sorts = null; + + this.output = new OutputCollector(); + this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; + } + + public void sortgroup() { + /*if(ID % 2 == 1) { + int a[] = new int[1]; + int temp = a[1]; + }*/ + + // group values associated to the same key + //System.printString("================================\n"); + if(interoutputs == null) { + return; + } + for(int i = 0; i < interoutputs.size(); ++i) { + FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i)); + byte[] b = new byte[1024 * 10]; + int length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n"); + System.exit(-1); + } + String content = new String(b, 0, length); + //System.printString(content + "\n"); + int index = content.indexOf('\n'); + while(index != -1) { + String line = content.subString(0, index); + content = content.subString(index + 1); + //System.printString(line + "\n"); + int tmpindex = line.indexOf(' '); + String key = line.subString(0, tmpindex); + String value = line.subString(tmpindex + 1); + //System.printString(key + "; " + value + "\n"); + if(!this.values.containsKey(key)) { + this.values.put(key, new Vector()); + this.keys.addElement(key); + } + ((Vector)this.values.get(key)).addElement(value); + index = content.indexOf('\n'); + } + iStream.close(); + } + //System.printString("================================\n"); + + /*for(int i = 0; i < this.keys.size(); ++i) { + System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); + } + System.printString("\n");*/ + + // sort all the keys inside interoutputs + this.sorts = new int[this.keys.size()]; + // insert sorting + this.sorts[0] = 0; + int tosort = 1; + for(; tosort < this.keys.size(); ++tosort) { + int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode(); + int index = tosort; + for(int i = tosort; i > 0; --i) { + if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) { + this.sorts[i] = this.sorts[i-1]; + index = i - 1; + } else { + //System.printString(i + "; " + tosort + "\n"); + index = i; + i = 0; + } + } + this.sorts[index] = tosort; + } + /*for(int i = 0; i < this.sorts.length; ++i) { + System.printString(this.sorts[i] + "; "); + } + System.printString("\n");*/ + } + + public void reduce() { + /*if(ID % 2 == 1) { + int a[] = new int[1]; + int temp = a[1]; + }*/ + + if(this.interoutputs != null) { + // return; + //} + for(int i = 0; i < this.sorts.length; ++i) { + String key = (String)this.keys.elementAt(this.sorts[i]); + Vector values = (Vector)this.values.get(key); + MapReduceBase.reduce(key, values, output); + } + } + + // output all the result into some local file + int size = this.output.size(); + FileOutputStream oStream = new FileOutputStream(outputfile, true); // append + for(int i = 0; i < size; ++i) { + String key = this.output.getKey(i); + String value = this.output.getValue(i); + // format: key value\n + oStream.write(key.getBytes()); + oStream.write(' '); + oStream.write(value.getBytes()); + oStream.write('\n'); + oStream.flush(); + } + oStream.close(); + } + + public String getOutputFile() { + return this.outputfile; + } + + public int getID() { + return this.ID; + } +} diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java b/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java new file mode 100644 index 00000000..d61bf023 --- /dev/null +++ b/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java @@ -0,0 +1,73 @@ +public class Splitter { + String filename; + String content; + int length; + int[] splits; + String[] slices; + + public Splitter(String path, int splitNum, char seperator) { + //System.printString("Top of Splitter's constructor\n"); + filename = path; + FileInputStream iStream = new FileInputStream(filename); + byte[] b = new byte[1024 * 1024]; + length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from input file: " + filename + "\n"); + System.exit(-1); + } + content = new String(b, 0, length); + //System.printString(content + "\n"); + iStream.close(); + + if(splitNum == 1) { + slices = new String[1]; + slices[0] = content; + } else { + splits = new int[splitNum - 1]; + int index = 0; + int span = length / splitNum; + int temp = 0; + for(int i = 0; i < splitNum - 1; ++i) { + temp += span; + if(temp > index) { + index = temp; + while((content.charAt(index) != seperator) && (index != length - 1)) { + ++index; + } + } + splits[i] = index; + } + + this.slices = new String[splits.length + 1]; + for(int i = 0; i < this.slices.length; ++i) { + this.slices[i] = null; + } + } + } + + public void split() { + if(slices.length == 1) { + return; + } + int start = 0; + int end = 0; + for(int i = 0; i < splits.length; ++i) { + end = splits[i]; + if(end < start) { + slices[i] = null; + } else { + slices[i] = content.subString(start, end); + } + start = end + 1; + } + slices[slices.length - 1] = content.subString(start); + } + + public String getFilename() { + return filename; + } + + public String[] getSlices() { + return this.slices; + } +} diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java index b62e76bd..7556a8fc 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java @@ -1,7 +1,7 @@ task startup(StartupObject s{initialstate}) { // read in configuration parameters // System.printString("Top of task startup\n"); - String path = new String("/home/jzhou/mapreduce/conf.txt"); + String path = new String("/scratch/mapreduce_opt/conf.txt"); FileInputStream iStream = new FileInputStream(path); byte[] b = new byte[1024]; int length = iStream.read(b); diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java index 0563556b..60407fb3 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java @@ -23,7 +23,7 @@ public class MapWorker { locations = new String[r]; for(int i = 0; i < r; ++i) { - StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-"); + StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-"); temp.append(String.valueOf(ID)); temp.append("-of-"); temp.append(String.valueOf(r)); diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Master.java b/Robust/src/Benchmarks/MapReduce/Tag/Master.java index 49bb68fa..c547fd83 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/Master.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/Master.java @@ -42,7 +42,7 @@ public class Master { } this.splitter = splitter; - this.outputfile = new String("/home/jzhou/mapreduce/output.dat"); + this.outputfile = new String("/scratch/mapreduce_opt/output.dat"); this.partial = false; } @@ -97,7 +97,7 @@ public class Master { } public void addInterOutput(String interoutput) { - int start = interoutput.indexOf('_'); + int start = interoutput.lastindexOf('_'); int end = interoutput.indexOf('.'); int index = Integer.parseInt(interoutput.subString(start + 1, end)); //System.printString(interoutput.subString(start + 1, end) + "\n"); @@ -135,7 +135,7 @@ public class Master { public void collectROutput(String file) { FileInputStream iStream = new FileInputStream(file); FileOutputStream oStream = new FileOutputStream(outputfile, true); - byte[] b = new byte[1024 * 100]; + byte[] b = new byte[1024 * 10]; int length = iStream.read(b); if(length < 0) { System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n"); diff --git a/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java index d4787a4e..7f2cbdb4 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java @@ -21,7 +21,7 @@ public class ReduceWorker { //this.sorts = null; this.output = new OutputCollector(); - this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; + this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; } public void sortgroup() { @@ -37,7 +37,7 @@ public class ReduceWorker { } for(int i = 0; i < interoutputs.size(); ++i) { FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i)); - byte[] b = new byte[1024 * 100]; + byte[] b = new byte[1024 * 10]; int length = iStream.read(b); if(length < 0) { System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n"); diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java b/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java index f79c8d85..d61bf023 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java @@ -9,7 +9,7 @@ public class Splitter { //System.printString("Top of Splitter's constructor\n"); filename = path; FileInputStream iStream = new FileInputStream(filename); - byte[] b = new byte[1024 * 10]; + byte[] b = new byte[1024 * 1024]; length = iStream.read(b); if(length < 0) { System.printString("Error! Can not read from input file: " + filename + "\n"); -- 2.34.1