From a8d0e636d83f9b239c93faa6301c14ade13532cf Mon Sep 17 00:00:00 2001 From: jzhou Date: Fri, 5 Sep 2008 22:52:03 +0000 Subject: [PATCH] optimize for performance --- .../MapReduce/Java/Configuration.java | 2 - .../Benchmarks/MapReduce/Java/Configured.java | 2 - .../Benchmarks/MapReduce/Java/JobClient.java | 6 +- .../MapReduce/Java/MapReduceBase.java | 10 +- .../Benchmarks/MapReduce/Java/MapWorker.java | 117 +++++------ .../src/Benchmarks/MapReduce/Java/Master.java | 70 +++---- .../MapReduce/Java/OutputCollector.java | 4 - .../MapReduce/Java/ReduceWorker.java | 184 +++++++----------- .../Benchmarks/MapReduce/Java/Splitter.java | 117 +++++------ .../src/Benchmarks/MapReduce/Java/Tool.java | 4 +- .../Benchmarks/MapReduce/Java/ToolRunner.java | 2 - .../MapReduce/Java/WordCounter.java | 89 ++++----- .../Benchmarks/MapReduce/Nor/MapReduce.java | 28 +-- .../Benchmarks/MapReduce/Nor/MapWorker.java | 63 +++--- .../src/Benchmarks/MapReduce/Nor/Master.java | 38 ++-- .../MapReduce/Nor/ReduceWorker.java | 41 ++-- .../Benchmarks/MapReduce/Nor/Splitter.java | 53 +++-- .../Benchmarks/MapReduce/Tag/MapReduce.java | 32 +-- .../Benchmarks/MapReduce/Tag/MapWorker.java | 60 +++--- .../src/Benchmarks/MapReduce/Tag/Master.java | 38 ++-- .../MapReduce/Tag/ReduceWorker.java | 41 ++-- .../Benchmarks/MapReduce/Tag/Splitter.java | 53 +++-- 22 files changed, 441 insertions(+), 613 deletions(-) diff --git a/Robust/src/Benchmarks/MapReduce/Java/Configuration.java b/Robust/src/Benchmarks/MapReduce/Java/Configuration.java index 9a245f51..98332679 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Configuration.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Configuration.java @@ -1,5 +1,3 @@ -//package mapreduce; - public class Configuration { MapReduceBase mapreducer; diff --git a/Robust/src/Benchmarks/MapReduce/Java/Configured.java b/Robust/src/Benchmarks/MapReduce/Java/Configured.java index 5cce432b..8de0f09c 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Configured.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Configured.java @@ -1,5 +1,3 @@ -//package mapreduce; - public class Configured { Configuration conf; diff --git a/Robust/src/Benchmarks/MapReduce/Java/JobClient.java b/Robust/src/Benchmarks/MapReduce/Java/JobClient.java index cab79af5..ce92f96b 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/JobClient.java +++ b/Robust/src/Benchmarks/MapReduce/Java/JobClient.java @@ -1,5 +1,3 @@ -//package mapreduce; - public class JobClient{ public JobClient() {} @@ -10,7 +8,7 @@ public class JobClient{ // split input file //System.printString("Split\n"); - master.split(); + //master.split(); // do 'map' //System.printString("Map\n"); @@ -53,7 +51,7 @@ public class JobClient{ } //assert(master.isReduceFinish()); - System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n"); + //System./*out.println*/printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n"); } } diff --git a/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java b/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java index 21b806e6..c73d4a65 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java +++ b/Robust/src/Benchmarks/MapReduce/Java/MapReduceBase.java @@ -1,13 +1,9 @@ -//package mapreduce; - -//import java.util.Vector; - -public /*abstract*/ class MapReduceBase { +public class MapReduceBase { public MapReduceBase() {} - public /*abstract*/ void map(String key, String value, OutputCollector output); + public void map(String key, String value, OutputCollector output); - public /*abstract*/ void reduce(String key, Vector values, OutputCollector output); + public void reduce(String key, Vector values, OutputCollector output); } diff --git a/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java index f5b59117..7c56eae8 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Java/MapWorker.java @@ -1,19 +1,13 @@ -//package mapreduce; - -//import java.io.FileOutputStream; - public class MapWorker { int ID; MapReduceBase mapreducer; - int r; String key; String value; OutputCollector output; - - String[] locations; - FileOutputStream[] outputs; + String locationPrefix; + boolean[] outputsexit; public MapWorker(String key, String value, int r, int id) { this.ID = id; @@ -23,23 +17,9 @@ public class MapWorker { this.key = key; this.value = value; this.output = new OutputCollector(); - - locations = new String[r]; - for(int i = 0; i < r; ++i) { - StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-"); - temp.append(String.valueOf(ID)); - temp.append("-of-"); - temp.append(String.valueOf(r)); - temp.append("_"); - temp.append(String.valueOf(i)); - temp.append(".dat"); - locations[i] = new String(temp); - } - - outputs = new FileOutputStream[r]; - for(int i = 0; i < r; ++i) { - outputs[i] = null; - } + this.locationPrefix = "/scratch/mapreduce_java/output-intermediate-map-"; + + this.outputsexit = new boolean[r]; } public MapReduceBase getMapreducer() { @@ -51,58 +31,63 @@ public class MapWorker { } public void map() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ - this.mapreducer.map(key, value, output); + this.key = null; + this.value = null; } public void partition() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ + FileOutputStream[] outputs = new FileOutputStream[r]; + for(int i = 0; i < r; ++i) { + outputs[i] = null; + } - //try{ - int size = this.output.size(); - for(int i = 0; i < size; ++i) { - String key = this.output.getKey(i); - String value = this.output.getValue(i); - // use the hashcode of key to decide which intermediate output - // this pair should be in - //int hash = key.hashCode(); - int index = (int)Math.abs(key.hashCode()) % this.r; - FileOutputStream oStream = outputs[index]; - if(oStream == null) { - // open the file - String filepath = locations[index]; - oStream = new FileOutputStream(filepath, true); // append - outputs[index] = oStream; - } - // format: key value\n - oStream.write(key.getBytes()); - oStream.write(' '); - oStream.write(value.getBytes()); - oStream.write('\n'); - oStream.flush(); + int size = this.output.size(); + for(int i = 0; i < size; ++i) { + String key = this.output.getKey(i); + String value = this.output.getValue(i); + // use the hashcode of key to decide which intermediate output + // this pair should be in + //int hash = key.hashCode(); + int index = (int)Math.abs(key.hashCode()) % this.r; + FileOutputStream oStream = outputs[index]; + if(oStream == null) { + // open the file + String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat"; + oStream = new FileOutputStream(filepath, true); // append + outputs[index] = oStream; + this.outputsexit[index] = true; } + // format: key value\n + oStream.write(key.getBytes()); + oStream.write(' '); + oStream.write(value.getBytes()); + oStream.write('\n'); + oStream.flush(); + } - // close the output files - for(int i = 0; i < this.outputs.length; ++i) { - FileOutputStream temp = this.outputs[i]; - if(temp != null) { - temp.close(); - } + // close the output files + for(int i = 0; i < outputs.length; ++i) { + FileOutputStream temp = outputs[i]; + if(temp != null) { + temp.close(); + outputs[i] = null; } - /*} catch(Exception e) { - e.printStackTrace(); - System.exit(-1); - }*/ + } + + this.output = null; } public String outputFile(int i) { - if(outputs[i] != null) { - return locations[i]; + if(outputsexit[i]) { + StringBuffer temp = new StringBuffer(this.locationPrefix); + temp.append(String.valueOf(ID)); + temp.append("-of-"); + temp.append(String.valueOf(r)); + temp.append("_"); + temp.append(String.valueOf(i)); + temp.append(".dat"); + return new String(temp); } else { return null; } diff --git a/Robust/src/Benchmarks/MapReduce/Java/Master.java b/Robust/src/Benchmarks/MapReduce/Java/Master.java index 39ac5cc4..f1f197d9 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Master.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Master.java @@ -1,47 +1,25 @@ -//package mapreduce; - -/*import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.util.Vector;*/ - public class Master { int m; int r; int[] mworkerStates; // array of map worker's state - // 0: idle 1: process 2: finished 3: fail + // 0: idle 1: process 2: finished 3: fail int[] rworkerStates; // array of reduce worker's state Vector[] interoutputs; // array of string vector containing - // paths of intermediate outputs from - // map worker - + // paths of intermediate outputs from + // map worker Splitter splitter; - String outputfile; // path of final output file - boolean partial; public Master(int m, int r, Splitter splitter) { this.m = m; this.r = r; - - mworkerStates = new int[m]; - rworkerStates = new int[r]; - for(int i = 0; i < m; ++i) { - mworkerStates[i] = 0; - } - for(int i = 0; i < r; ++i) { - rworkerStates[i] = 0; - } - - interoutputs = new Vector[r]; - for(int i = 0; i < r; ++i) { - interoutputs[i] = null; - } - + this.mworkerStates = new int[m]; + this.rworkerStates = new int[r]; + this.interoutputs = new Vector[r]; this.splitter = splitter; this.outputfile = new String("/scratch/mapreduce_java/output.dat"); - this.partial = false; } @@ -61,12 +39,12 @@ public class Master { this.partial = partial || this.partial; } - public void split() { + /*public void split() { splitter.split(); - } + }*/ public MapWorker[] assignMap() { - String[] contentsplits = splitter.getSlices(); + String[] contentsplits = splitter.split();//splitter.getSlices(); MapWorker[] mworkers = new MapWorker[contentsplits.length]; for(int i = 0; i < contentsplits.length; ++i) { //System.printString("*************************\n"); @@ -76,6 +54,7 @@ public class Master { mworkerStates[i] = 1; mworkers[i] = mworker; } + this.splitter = null; return mworkers; } @@ -114,7 +93,9 @@ public class Master { ReduceWorker rworker = new ReduceWorker(interoutputs[i], i); rworkerStates[i] = 1; rworkers[i] = rworker; + this.interoutputs[i] = null; } + this.interoutputs.clear(); return rworkers; } @@ -137,22 +118,17 @@ public class Master { } public void collectROutput(String file) { - //try{ - FileInputStream iStream = new FileInputStream(file); - FileOutputStream oStream = new FileOutputStream(outputfile, true); - byte[] b = new byte[1024 * 10]; - int length = iStream.read(b); - if(length < 0) { - System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n"); - System.exit(-1); - } - //System.printString(new String(b, 0, length) + "\n"); - oStream.write(b, 0, length); - iStream.close(); - oStream.close(); - /*} catch(Exception e) { - e.printStackTrace(); + FileInputStream iStream = new FileInputStream(file); + FileOutputStream oStream = new FileOutputStream(outputfile, true); + byte[] b = new byte[1024 * 10]; + int length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n"); System.exit(-1); - }*/ + } + //System.printString(new String(b, 0, length) + "\n"); + oStream.write(b, 0, length); + iStream.close(); + oStream.close(); } } diff --git a/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java b/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java index e66a6758..3c6d852c 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java +++ b/Robust/src/Benchmarks/MapReduce/Java/OutputCollector.java @@ -1,7 +1,3 @@ -//package mapreduce; - -//import java.util.Vector; - public class OutputCollector { Vector keys; diff --git a/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java index 4559882b..3df1f0b9 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Java/ReduceWorker.java @@ -1,13 +1,3 @@ -//package mapreduce; - -/*import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.util.HashMap; -import java.util.Vector;*/ - -import mapreduce.MapReduceBase; - public class ReduceWorker { int ID; @@ -24,13 +14,9 @@ public class ReduceWorker { public ReduceWorker(Vector interoutputs, int id) { this.ID = id; this.mapreducer = null; - this.interoutputs = interoutputs; - this.keys = new Vector(); this.values = new HashMap(); - //this.sorts = null; - this.output = new OutputCollector(); this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; } @@ -44,117 +30,97 @@ public class ReduceWorker { } public void sortgroup() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - // group values associated to the same key //System.printString("================================\n"); if(interoutputs == null) { return; } - //try{ - for(int i = 0; i < interoutputs.size(); ++i) { - FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i)); - byte[] b = new byte[1024 * 10]; - int length = iStream.read(b); - if(length < 0) { - System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n"); - System.exit(-1); - } - String content = new String(b, 0, length); - //System.printString(content + "\n"); - int index = content.indexOf('\n'); - while(index != -1) { - String line = content.substring(0, index); - content = content.substring(index + 1); - //System.printString(line + "\n"); - int tmpindex = line.indexOf(' '); - String key = line.substring(0, tmpindex); - String value = line.substring(tmpindex + 1); - //System.printString(key + "; " + value + "\n"); - if(!this.values.containsKey(key)) { - this.values.put(key, new Vector()); - this.keys.addElement(key); - } - ((Vector)this.values.get(key)).addElement(value); - index = content.indexOf('\n'); - } - iStream.close(); + for(int i = 0; i < interoutputs.size(); ++i) { + FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i)); + byte[] b = new byte[1024 * 10]; + int length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n"); + System.exit(-1); } - //System.printString("================================\n"); - - /*for(int i = 0; i < this.keys.size(); ++i) { - System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); + String content = new String(b, 0, length); + //System.printString(content + "\n"); + int index = content.indexOf('\n'); + while(index != -1) { + String line = content.subString(0, index); + content = content.subString(index + 1); + //System.printString(line + "\n"); + int tmpindex = line.indexOf(' '); + String key = line.subString(0, tmpindex); + String value = line.subString(tmpindex + 1); + //System.printString(key + "; " + value + "\n"); + if(!this.values.containsKey(key)) { + this.values.put(key, new Vector()); + this.keys.addElement(key); } - System.printString("\n");*/ + ((Vector)this.values.get(key)).addElement(value); + index = content.indexOf('\n'); + } + iStream.close(); + } + //System.printString("================================\n"); - // sort all the keys inside interoutputs - this.sorts = new int[this.keys.size()]; - // insert sorting - this.sorts[0] = 0; - int tosort = 1; - for(; tosort < this.keys.size(); ++tosort) { - int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode(); - int index = tosort; - for(int i = tosort; i > 0; --i) { - if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) { - this.sorts[i] = this.sorts[i-1]; - index = i - 1; - } else { - //System.printString(i + "; " + tosort + "\n"); - index = i; - i = 0; - } + /*for(int i = 0; i < this.keys.size(); ++i) { + System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); + } + System.printString("\n");*/ + + // sort all the keys inside interoutputs + this.sorts = new int[this.keys.size()]; + // insert sorting + this.sorts[0] = 0; + int tosort = 1; + for(; tosort < this.keys.size(); ++tosort) { + int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode(); + int index = tosort; + for(int i = tosort; i > 0; --i) { + if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) { + this.sorts[i] = this.sorts[i-1]; + index = i - 1; + } else { + //System.printString(i + "; " + tosort + "\n"); + index = i; + i = 0; } - this.sorts[index] = tosort; } - /*for(int i = 0; i < this.sorts.length; ++i) { - System.printString(this.sorts[i] + "; "); - } - System.printString("\n");*/ - /*} catch(IOException e) { - e.printStackTrace(); - System.exit(-1); - }*/ + this.sorts[index] = tosort; + } + /*for(int i = 0; i < this.sorts.length; ++i) { + System.printString(this.sorts[i] + "; "); + } + System.printString("\n");*/ } public void reduce() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - if(this.interoutputs != null) { - //return; - //} - for(int i = 0; i < this.sorts.length; ++i) { - String key = (String)this.keys.elementAt(this.sorts[i]); - Vector values = (Vector)this.values.get(key); - this.mapreducer.reduce(key, values, output); - } + for(int i = 0; i < this.sorts.length; ++i) { + String key = (String)this.keys.elementAt(this.sorts[i]); + Vector values = (Vector)this.values.get(key); + this.mapreducer.reduce(key, values, output); + } } - //try{ - // output all the result into some local file - int size = this.output.size(); - FileOutputStream oStream = new FileOutputStream(outputfile, true); // append - for(int i = 0; i < size; ++i) { - String key = this.output.getKey(i); - String value = this.output.getValue(i); - // format: key value\n - oStream.write(key.getBytes()); - oStream.write(' '); - oStream.write(value.getBytes()); - oStream.write('\n'); - oStream.flush(); - } - oStream.close(); - /*} catch(Exception e) { - e.printStackTrace(); - System.exit(-1); - }*/ + // output all the result into some local file + int size = this.output.size(); + FileOutputStream oStream = new FileOutputStream(outputfile, true); // append + for(int i = 0; i < size; ++i) { + String key = this.output.getKey(i); + String value = this.output.getValue(i); + // format: key value\n + oStream.write(key.getBytes()); + oStream.write(' '); + oStream.write(value.getBytes()); + oStream.write('\n'); + oStream.flush(); + } + oStream.close(); + this.keys = null; + this.values = null; } public String getOutputFile() { diff --git a/Robust/src/Benchmarks/MapReduce/Java/Splitter.java b/Robust/src/Benchmarks/MapReduce/Java/Splitter.java index 53cae4df..07e69e7f 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Splitter.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Splitter.java @@ -1,83 +1,72 @@ -//package mapreduce; - -//import java.io.FileInputStream; -//import java.io.IOException; - public class Splitter { String filename; String content; int length; - int[] splits; - String[] slices; + int splitNum; + char seperator; public Splitter(String path, int splitNum, char seperator) { - //try{ - //System.printString("Top of Splitter's constructor\n"); - filename = path; - FileInputStream iStream = new FileInputStream(filename); - byte[] b = new byte[1024 * 1024]; - length = iStream.read(b); - if(length < 0) { - System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n"); - System.exit(-1); - } - content = new String(b, 0, length); - //System.printString(content + "\n"); - iStream.close(); + //System.printString("Top of Splitter's constructor\n"); + filename = path; + this.length = -1; + this.splitNum = splitNum; + this.seperator = seperator; + } - if(splitNum == 1) { - slices = new String[1]; - slices[0] = content; - } else { - splits = new int[splitNum - 1]; - int index = 0; - int span = length / splitNum; - int temp = 0; - for(int i = 0; i < splitNum - 1; ++i) { - temp += span; - if(temp > index) { - index = temp; - while((content.charAt(index) != seperator) && (index != length - 1)) { - ++index; - } - } - splits[i] = index; - } + public String[] split() { + int[] splits; + String[] slices; + + FileInputStream iStream = new FileInputStream(filename); + byte[] b = new byte[1024 * 1024]; + length = iStream.read(b); + if(length < 0) { + System.printString("Error! Can not read from input file: " + filename + "\n"); + System.exit(-1); + } + content = new String(b, 0, length); + //System.printString(content + "\n"); + iStream.close(); - this.slices = new String[splits.length + 1]; - for(int i = 0; i < this.slices.length; ++i) { - this.slices[i] = null; + if(splitNum == 1) { + slices = new String[1]; + slices[0] = content; + this.content = null; + } else { + splits = new int[splitNum - 1]; + int index = 0; + int span = length / splitNum; + int temp = 0; + for(int i = 0; i < splitNum - 1; ++i) { + temp += span; + if(temp > index) { + index = temp; + while((content.charAt(index) != seperator) && (index != length - 1)) { + ++index; + } } + splits[i] = index; } - /*} catch(IOException e) { - e.printStackTrace(); - System.exit(-1); - }*/ - } - public void split() { - if(slices.length == 1) { - return; - } - int start = 0; - int end = 0; - for(int i = 0; i < splits.length; ++i) { - end = splits[i]; - if(end < start) { - slices[i] = null; - } else { - slices[i] = content.substring(start, end); + slices = new String[splitNum]; + int start = 0; + int end = 0; + for(int i = 0; i < splits.length; ++i) { + end = splits[i]; + if(end < start) { + slices[i] = null; + } else { + slices[i] = content.subString(start, end); + } + start = end + 1; } - start = end + 1; + slices[slices.length - 1] = content.subString(start); + this.content = null; } - slices[slices.length - 1] = content.substring(start); + return slices; } public String getFilename() { return filename; } - - public String[] getSlices() { - return this.slices; - } } diff --git a/Robust/src/Benchmarks/MapReduce/Java/Tool.java b/Robust/src/Benchmarks/MapReduce/Java/Tool.java index 5dbeb6e7..ab9dd838 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/Tool.java +++ b/Robust/src/Benchmarks/MapReduce/Java/Tool.java @@ -1,5 +1,3 @@ -//package mapreduce; - -public /*interface*/ class Tool { +public class Tool { public int run(String[] args); } diff --git a/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java b/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java index 82a8c5a4..b91290c4 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java +++ b/Robust/src/Benchmarks/MapReduce/Java/ToolRunner.java @@ -1,5 +1,3 @@ -//package mapreduce; - public class ToolRunner { public static int run(Tool tool, String[] args) { diff --git a/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java b/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java index c67a291b..16061424 100644 --- a/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java +++ b/Robust/src/Benchmarks/MapReduce/Java/WordCounter.java @@ -1,15 +1,3 @@ - -//import java.io.FileInputStream; -//import java.util.Vector; - -/*import mapreduce.Configuration; -import mapreduce.Configured; -import mapreduce.JobClient; -import mapreduce.MapReduceBase; -import mapreduce.OutputCollector; -import mapreduce.Tool; -import mapreduce.ToolRunner;*/ - /** * Counts the words in each line. * For each line of input, break the line into words and emit them as @@ -71,12 +59,12 @@ import mapreduce.ToolRunner;*/ } } -public class WordCounter /*implements*/extends Tool { +public class WordCounter extends Tool { - public WordCounter() {} + public WordCounter() {} static int printUsage() { - System./*out.println*/printString("\n"); + System.printString("\n"); return -1; } @@ -87,50 +75,45 @@ public class WordCounter /*implements*/extends Tool { * job tracker. */ public int run(String[] args) { - //try { - MapReduceClass mapreducer = new MapReduceClass(); - - FileInputStream iStream = new FileInputStream(args[0]); - byte[] b = new byte[1024]; - int length = iStream.read(b); - if(length < 0 ) { - System./*out.println*/printString("Error! Can not read from configure file: " + args[0] + "\n"); - System.exit(-1); - } - String content = new String(b, 0, length); - //System.out.println(content); - int index = content.indexOf('\n'); - String inputfile = content.substring(0, index); - content = content.substring(index + 1); - index = content.indexOf('\n'); - int m = Integer.parseInt(content.substring(0, index)); - content = content.substring(index + 1); - index = content.indexOf('\n'); - int r = Integer.parseInt(content.substring(0, index)); - content = content.substring(index + 1); - index = content.indexOf('\n'); - String temp = content.substring(0, index); - char seperator = temp.charAt(0); - //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r)); - - Configuration conf = new Configuration(); - conf.setMapReduceClass(mapreducer); - conf.setInputfile(inputfile); - conf.setM(m); - conf.setR(r); - conf.setSeperator(seperator); - - JobClient.runJob(conf); - /*} catch (Exception e) { - e.printStackTrace(); + MapReduceClass mapreducer = new MapReduceClass(); + + FileInputStream iStream = new FileInputStream(args[0]); + byte[] b = new byte[1024]; + int length = iStream.read(b); + if(length < 0 ) { + System.printString("Error! Can not read from configure file: " + args[0] + "\n"); System.exit(-1); - }*/ + } + String content = new String(b, 0, length); + //System.out.println(content); + int index = content.indexOf('\n'); + String inputfile = content.substring(0, index); + content = content.substring(index + 1); + index = content.indexOf('\n'); + int m = Integer.parseInt(content.substring(0, index)); + content = content.substring(index + 1); + index = content.indexOf('\n'); + int r = Integer.parseInt(content.substring(0, index)); + content = content.substring(index + 1); + index = content.indexOf('\n'); + String temp = content.substring(0, index); + char seperator = temp.charAt(0); + //System.out.println(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r)); + + Configuration conf = new Configuration(); + conf.setMapReduceClass(mapreducer); + conf.setInputfile(inputfile); + conf.setM(m); + conf.setR(r); + conf.setSeperator(seperator); + + JobClient.runJob(conf); return 0; } - public static void main(String[] args) /*throws Exception*/ { + public static void main(String[] args) { int res = ToolRunner.run(new WordCounter(), args); System.exit(res); } diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java index 1a81bb61..8c126800 100644 --- a/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java +++ b/Robust/src/Benchmarks/MapReduce/Nor/MapReduce.java @@ -1,6 +1,6 @@ task startup(StartupObject s{initialstate}) { // read in configuration parameters - // System.printString("Top of task startup\n"); + //System.printString("Top of task startup\n"); String path = new String("/scratch/mapreduce_nor/conf.txt"); FileInputStream iStream = new FileInputStream(path); byte[] b = new byte[1024]; @@ -25,13 +25,15 @@ task startup(StartupObject s{initialstate}) { char seperator = temp.charAt(0); //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n"); Splitter splitter = new Splitter(inputfile, m, seperator); - Master master = new Master(m, r, splitter){split}; + Master master = new Master(m, r, splitter){mapoutput};//{split}; + + master.assignMap(); taskexit(s{!initialstate}); } //Split the input file into M pieces -task split(Master master{split}) { +/*task split(Master master{split}) { //System.printString("Top of task split\n"); master.split(); @@ -44,23 +46,24 @@ task assignMap(Master master{assignMap}) { master.assignMap(); taskexit(master{!assignMap, mapoutput}); -} +}*/ //MapWorker do 'map' function on a input file piece task map(MapWorker mworker{map}) { //System.printString("Top of task map\n"); mworker.map(); - taskexit(mworker{!map, partition}); + /*taskexit(mworker{!map, partition}); } //Partition the intermediate key/value pair generated //into R intermediate local files task partition(MapWorker mworker{partition}) { - //System.printString("Top of task partition\n"); + //System.printString("Top of task partition\n");*/ mworker.partition(); - taskexit(mworker{!partition, mapoutput}); + //taskexit(mworker{!partition, mapoutput}); + taskexit(mworker{!map, mapoutput}); } //Register the intermediate ouput from map worker to master @@ -101,15 +104,16 @@ task sortgroup(ReduceWorker rworker{sortgroup}) { //System.printString("Top of task sortgroup\n"); rworker.sortgroup(); - taskexit(rworker{!sortgroup, reduce}); + /*taskexit(rworker{!sortgroup, reduce}); } //Do 'reduce' function task reduce(ReduceWorker rworker{reduce}) { - //System.printString("Top of task reduce\n"); + //System.printString("Top of task reduce\n");*/ rworker.reduce(); - taskexit(rworker{!reduce, reduceoutput}); + //taskexit(rworker{!reduce, reduceoutput}); + taskexit(rworker{!sortgroup, reduceoutput}); } //Collect the output into master @@ -132,9 +136,9 @@ task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker task output(Master master{output}) { //System.printString("Top of task output\n"); - if(master.isPartial()) { + /*if(master.isPartial()) { System.printString("Partial! The result may not be right due to some failure!\n"); } - System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n"); + System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/ taskexit(master{!output}); } diff --git a/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java index 0ab23fa7..8c9fa3c1 100644 --- a/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Nor/MapWorker.java @@ -4,14 +4,12 @@ public class MapWorker { flag mapoutput; int ID; - int r; String key; String value; OutputCollector output; - - String[] locations; - FileOutputStream[] outputs; + String locationPrefix; + boolean[] outputsexit; public MapWorker(String key, String value, int r, int id) { this.ID = id; @@ -20,38 +18,23 @@ public class MapWorker { this.key = key; this.value = value; this.output = new OutputCollector(); - - locations = new String[r]; - for(int i = 0; i < r; ++i) { - StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-"); - temp.append(String.valueOf(ID)); - temp.append("-of-"); - temp.append(String.valueOf(r)); - temp.append("_"); - temp.append(String.valueOf(i)); - temp.append(".dat"); - locations[i] = new String(temp); - } - - outputs = new FileOutputStream[r]; - for(int i = 0; i < r; ++i) { - outputs[i] = null; - } + this.locationPrefix = "/scratch/mapreduce_nor/output-intermediate-map-"; + + this.outputsexit = new boolean[r]; } public void map() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ - MapReduceBase.map(key, value, output); + this.key = null; + this.value = null; } public void partition() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ - + FileOutputStream[] outputs = new FileOutputStream[r]; + for(int i = 0; i < r; ++i) { + outputs[i] = null; + } + int size = this.output.size(); for(int i = 0; i < size; ++i) { String key = this.output.getKey(i); @@ -62,9 +45,11 @@ public class MapWorker { FileOutputStream oStream = outputs[index]; if(oStream == null) { // open the file - String filepath = locations[index]; + String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat"; + //System.printString("partition: " + filepath + "\n"); oStream = new FileOutputStream(filepath, true); // append outputs[index] = oStream; + this.outputsexit[index] = true; } // format: key value\n oStream.write(key.getBytes()); @@ -75,17 +60,27 @@ public class MapWorker { } // close the output files - for(int i = 0; i < this.outputs.length; ++i) { - FileOutputStream temp = this.outputs[i]; + for(int i = 0; i < outputs.length; ++i) { + FileOutputStream temp = outputs[i]; if(temp != null) { temp.close(); + outputs[i] = null; } } + + this.output = null; } public String outputFile(int i) { - if(outputs[i] != null) { - return locations[i]; + if(outputsexit[i]) { + StringBuffer temp = new StringBuffer(this.locationPrefix); + temp.append(String.valueOf(ID)); + temp.append("-of-"); + temp.append(String.valueOf(r)); + temp.append("_"); + temp.append(String.valueOf(i)); + temp.append(".dat"); + return new String(temp); } else { return null; } diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Master.java b/Robust/src/Benchmarks/MapReduce/Nor/Master.java index 4e21b9c3..65738868 100644 --- a/Robust/src/Benchmarks/MapReduce/Nor/Master.java +++ b/Robust/src/Benchmarks/MapReduce/Nor/Master.java @@ -11,39 +11,23 @@ public class Master { int m; int r; int[] mworkerStates; // array of map worker's state - // 0: idle 1: process 2: finished 3: fail + // 0: idle 1: process 2: finished 3: fail int[] rworkerStates; // array of reduce worker's state Vector[] interoutputs; // array of string vector containing - // paths of intermediate outputs from - // map worker - + // paths of intermediate outputs from + // map worker Splitter splitter; - String outputfile; // path of final output file - boolean partial; public Master(int m, int r, Splitter splitter) { this.m = m; this.r = r; - - mworkerStates = new int[m]; - rworkerStates = new int[r]; - for(int i = 0; i < m; ++i) { - mworkerStates[i] = 0; - } - for(int i = 0; i < r; ++i) { - rworkerStates[i] = 0; - } - - interoutputs = new Vector[r]; - for(int i = 0; i < r; ++i) { - interoutputs[i] = null; - } - + this.mworkerStates = new int[m]; + this.rworkerStates = new int[r]; + this.interoutputs = new Vector[r]; this.splitter = splitter; this.outputfile = new String("/scratch/mapreduce_nor/output.dat"); - this.partial = false; } @@ -63,12 +47,12 @@ public class Master { this.partial = partial || this.partial; } - public void split() { + /*public void split() { splitter.split(); - } + }*/ public void assignMap() { - String[] contentsplits = splitter.getSlices(); + String[] contentsplits = splitter.split();//splitter.getSlices(); for(int i = 0; i < contentsplits.length; ++i) { //System.printString("*************************\n"); //System.printString(contentsplits[i] + "\n"); @@ -76,6 +60,8 @@ public class Master { MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map}; mworkerStates[i] = 1; } + + this.splitter = null; } public void setMapFinish(int i) { @@ -111,7 +97,9 @@ public class Master { for(int i = 0; i < interoutputs.length; ++i) { ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup}; rworkerStates[i] = 1; + this.interoutputs[i] = null; } + this.interoutputs.clear(); } public void setReduceFinish(int i) { diff --git a/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java index a0e39ffa..4eb2c69b 100644 --- a/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Nor/ReduceWorker.java @@ -5,7 +5,7 @@ public class ReduceWorker { int ID; Vector interoutputs; // string vector containing paths - // of intermediate outputs from map worker + // of intermediate outputs from map worker Vector keys; HashMap values; // hashmap map key to vector of string vector int[] sorts; // array record the sort of keys @@ -15,21 +15,13 @@ public class ReduceWorker { public ReduceWorker(Vector interoutputs, int id) { this.ID = id; this.interoutputs = interoutputs; - this.keys = new Vector(); this.values = new HashMap(); - //this.sorts = null; - this.output = new OutputCollector(); this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; } public void sortgroup() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - // group values associated to the same key //System.printString("================================\n"); if(interoutputs == null) { @@ -66,9 +58,9 @@ public class ReduceWorker { //System.printString("================================\n"); /*for(int i = 0; i < this.keys.size(); ++i) { - System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); - } - System.printString("\n");*/ + System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); + } + System.printString("\n");*/ // sort all the keys inside interoutputs this.sorts = new int[this.keys.size()]; @@ -91,25 +83,18 @@ public class ReduceWorker { this.sorts[index] = tosort; } /*for(int i = 0; i < this.sorts.length; ++i) { - System.printString(this.sorts[i] + "; "); - } - System.printString("\n");*/ + System.printString(this.sorts[i] + "; "); + } + System.printString("\n");*/ } public void reduce() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - if(this.interoutputs != null) { - // return; - //} - for(int i = 0; i < this.sorts.length; ++i) { - String key = (String)this.keys.elementAt(this.sorts[i]); - Vector values = (Vector)this.values.get(key); - MapReduceBase.reduce(key, values, output); - } + for(int i = 0; i < this.sorts.length; ++i) { + String key = (String)this.keys.elementAt(this.sorts[i]); + Vector values = (Vector)this.values.get(key); + MapReduceBase.reduce(key, values, output); + } } // output all the result into some local file @@ -126,6 +111,8 @@ public class ReduceWorker { oStream.flush(); } oStream.close(); + this.keys = null; + this.values = null; } public String getOutputFile() { diff --git a/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java b/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java index d61bf023..07e69e7f 100644 --- a/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java +++ b/Robust/src/Benchmarks/MapReduce/Nor/Splitter.java @@ -2,12 +2,21 @@ public class Splitter { String filename; String content; int length; - int[] splits; - String[] slices; + int splitNum; + char seperator; public Splitter(String path, int splitNum, char seperator) { //System.printString("Top of Splitter's constructor\n"); filename = path; + this.length = -1; + this.splitNum = splitNum; + this.seperator = seperator; + } + + public String[] split() { + int[] splits; + String[] slices; + FileInputStream iStream = new FileInputStream(filename); byte[] b = new byte[1024 * 1024]; length = iStream.read(b); @@ -22,6 +31,7 @@ public class Splitter { if(splitNum == 1) { slices = new String[1]; slices[0] = content; + this.content = null; } else { splits = new int[splitNum - 1]; int index = 0; @@ -38,36 +48,25 @@ public class Splitter { splits[i] = index; } - this.slices = new String[splits.length + 1]; - for(int i = 0; i < this.slices.length; ++i) { - this.slices[i] = null; - } - } - } - - public void split() { - if(slices.length == 1) { - return; - } - int start = 0; - int end = 0; - for(int i = 0; i < splits.length; ++i) { - end = splits[i]; - if(end < start) { - slices[i] = null; - } else { - slices[i] = content.subString(start, end); + slices = new String[splitNum]; + int start = 0; + int end = 0; + for(int i = 0; i < splits.length; ++i) { + end = splits[i]; + if(end < start) { + slices[i] = null; + } else { + slices[i] = content.subString(start, end); + } + start = end + 1; } - start = end + 1; + slices[slices.length - 1] = content.subString(start); + this.content = null; } - slices[slices.length - 1] = content.subString(start); + return slices; } public String getFilename() { return filename; } - - public String[] getSlices() { - return this.slices; - } } diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java b/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java index a411a165..3ac4cd4d 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/MapReduce.java @@ -25,15 +25,17 @@ task startup(StartupObject s{initialstate}) { char seperator = temp.charAt(0); //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n"); Splitter splitter = new Splitter(inputfile, m, seperator); - Master master = new Master(m, r, splitter){split}; + Master master = new Master(m, r, splitter){mapoutput};//{split}; + + master.assignMap(); taskexit(s{!initialstate}); } //Split the input file into M pieces -task split(Master master{split}) { +/*task split(Master master{split}) { //System.printString("Top of task split\n"); - master.split(); + //master.split(); taskexit(master{!split, assignMap}); } @@ -43,24 +45,26 @@ task assignMap(Master master{assignMap}) { //System.printString("Top of task assignMap\n"); master.assignMap(); - taskexit(master{!assignMap, mapoutput}); -} + //taskexit(master{!assignMap, mapoutput}); + taskexit(master{!split, mapoutput}); +}*/ //MapWorker do 'map' function on a input file piece task map(MapWorker mworker{map}) { //System.printString("Top of task map\n"); mworker.map(); - taskexit(mworker{!map, partition}); + /*taskexit(mworker{!map, partition}); } //Partition the intermediate key/value pair generated //into R intermediate local files task partition(MapWorker mworker{partition}) { - //System.printString("Top of task partition\n"); + //System.printString("Top of task partition\n");*/ mworker.partition(); - taskexit(mworker{!partition, mapoutput}); + taskexit(mworker{!map, mapoutput}); + //taskexit(mworker{!partition, mapoutput}); } //Register the intermediate ouput from map worker to master @@ -69,6 +73,7 @@ task mapOutput(Master master{mapoutput}, optional MapWorker mworker{mapoutput}) if(isavailable(mworker)) { int total = master.getR(); for(int i = 0; i < total; ++i) { + //System.printString("mapOutput\n"); String temp = mworker.outputFile(i); if(temp != null) { master.addInterOutput(temp); @@ -101,15 +106,16 @@ task sortgroup(ReduceWorker rworker{sortgroup}) { //System.printString("Top of task sortgroup\n"); rworker.sortgroup(); - taskexit(rworker{!sortgroup, reduce}); + /*taskexit(rworker{!sortgroup, reduce}); } //Do 'reduce' function task reduce(ReduceWorker rworker{reduce}) { - //System.printString("Top of task reduce\n"); + //System.printString("Top of task reduce\n");*/ rworker.reduce(); - taskexit(rworker{!reduce, reduceoutput}); + //taskexit(rworker{!reduce, reduceoutput}); + taskexit(rworker{!sortgroup, reduceoutput}); } //Collect the output into master @@ -132,9 +138,9 @@ task reduceOutput(Master master{reduceoutput}, optional ReduceWorker rworker{red task output(Master master{output}) { //System.printString("Top of task output\n"); - if(master.isPartial()) { + /*if(master.isPartial()) { System.printString("Partial! The result may not be right due to some failure!\n"); } - System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n"); + System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");*/ taskexit(master{!output}); } diff --git a/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java index 60407fb3..9c810dd6 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/MapWorker.java @@ -4,14 +4,12 @@ public class MapWorker { flag mapoutput; int ID; - int r; String key; String value; OutputCollector output; - - String[] locations; - FileOutputStream[] outputs; + String locationPrefix; + boolean[] outputsexit; public MapWorker(String key, String value, int r, int id) { this.ID = id; @@ -20,37 +18,22 @@ public class MapWorker { this.key = key; this.value = value; this.output = new OutputCollector(); - - locations = new String[r]; - for(int i = 0; i < r; ++i) { - StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-"); - temp.append(String.valueOf(ID)); - temp.append("-of-"); - temp.append(String.valueOf(r)); - temp.append("_"); - temp.append(String.valueOf(i)); - temp.append(".dat"); - locations[i] = new String(temp); - } - - outputs = new FileOutputStream[r]; - for(int i = 0; i < r; ++i) { - outputs[i] = null; - } + this.locationPrefix = "/scratch/mapreduce_opt/output-intermediate-map-"; + + this.outputsexit = new boolean[r]; } public void map() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ - MapReduceBase.map(key, value, output); + this.key = null; + this.value = null; } public void partition() { - /*if(ID % 2 == 1) { - String temp = locations[locations.length]; - }*/ + FileOutputStream[] outputs = new FileOutputStream[r]; + for(int i = 0; i < r; ++i) { + outputs[i] = null; + } int size = this.output.size(); for(int i = 0; i < size; ++i) { @@ -62,9 +45,10 @@ public class MapWorker { FileOutputStream oStream = outputs[index]; if(oStream == null) { // open the file - String filepath = locations[index]; + String filepath = this.locationPrefix + this.ID + "-of-" + this.r + "_" + index + ".dat"; oStream = new FileOutputStream(filepath, true); // append outputs[index] = oStream; + this.outputsexit[index] = true; } // format: key value\n oStream.write(key.getBytes()); @@ -75,17 +59,27 @@ public class MapWorker { } // close the output files - for(int i = 0; i < this.outputs.length; ++i) { - FileOutputStream temp = this.outputs[i]; + for(int i = 0; i < outputs.length; ++i) { + FileOutputStream temp = outputs[i]; if(temp != null) { temp.close(); + outputs[i] = null; } } + + this.output = null; } public String outputFile(int i) { - if(outputs[i] != null) { - return locations[i]; + if(outputsexit[i]) { + StringBuffer temp = new StringBuffer(this.locationPrefix); + temp.append(String.valueOf(ID)); + temp.append("-of-"); + temp.append(String.valueOf(r)); + temp.append("_"); + temp.append(String.valueOf(i)); + temp.append(".dat"); + return new String(temp); } else { return null; } diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Master.java b/Robust/src/Benchmarks/MapReduce/Tag/Master.java index c547fd83..ba98fb93 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/Master.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/Master.java @@ -11,39 +11,23 @@ public class Master { int m; int r; int[] mworkerStates; // array of map worker's state - // 0: idle 1: process 2: finished 3: fail + // 0: idle 1: process 2: finished 3: fail int[] rworkerStates; // array of reduce worker's state Vector[] interoutputs; // array of string vector containing - // paths of intermediate outputs from - // map worker - + // paths of intermediate outputs from + // map worker Splitter splitter; - String outputfile; // path of final output file - boolean partial; public Master(int m, int r, Splitter splitter) { this.m = m; this.r = r; - - mworkerStates = new int[m]; - rworkerStates = new int[r]; - for(int i = 0; i < m; ++i) { - mworkerStates[i] = 0; - } - for(int i = 0; i < r; ++i) { - rworkerStates[i] = 0; - } - - interoutputs = new Vector[r]; - for(int i = 0; i < r; ++i) { - interoutputs[i] = null; - } - + this.mworkerStates = new int[m]; + this.rworkerStates = new int[r]; + this.interoutputs = new Vector[r]; this.splitter = splitter; this.outputfile = new String("/scratch/mapreduce_opt/output.dat"); - this.partial = false; } @@ -63,12 +47,12 @@ public class Master { this.partial = partial || this.partial; } - public void split() { + /*public void split() { splitter.split(); - } + }*/ public void assignMap() { - String[] contentsplits = splitter.getSlices(); + String[] contentsplits = splitter.split();//splitter.getSlices(); for(int i = 0; i < contentsplits.length; ++i) { //System.printString("*************************\n"); //System.printString(contentsplits[i] + "\n"); @@ -76,6 +60,8 @@ public class Master { MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map}; mworkerStates[i] = 1; } + + this.splitter = null; } public void setMapFinish(int i) { @@ -111,7 +97,9 @@ public class Master { for(int i = 0; i < interoutputs.length; ++i) { ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup}; rworkerStates[i] = 1; + this.interoutputs[i] = null; } + this.interoutputs.clear(); } public void setReduceFinish(int i) { diff --git a/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java b/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java index 7f2cbdb4..27557c9e 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/ReduceWorker.java @@ -5,7 +5,7 @@ public class ReduceWorker { int ID; Vector interoutputs; // string vector containing paths - // of intermediate outputs from map worker + // of intermediate outputs from map worker Vector keys; HashMap values; // hashmap map key to vector of string vector int[] sorts; // array record the sort of keys @@ -15,21 +15,13 @@ public class ReduceWorker { public ReduceWorker(Vector interoutputs, int id) { this.ID = id; this.interoutputs = interoutputs; - this.keys = new Vector(); this.values = new HashMap(); - //this.sorts = null; - this.output = new OutputCollector(); this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat"; } public void sortgroup() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - // group values associated to the same key //System.printString("================================\n"); if(interoutputs == null) { @@ -66,9 +58,9 @@ public class ReduceWorker { //System.printString("================================\n"); /*for(int i = 0; i < this.keys.size(); ++i) { - System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); - } - System.printString("\n");*/ + System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; "); + } + System.printString("\n");*/ // sort all the keys inside interoutputs this.sorts = new int[this.keys.size()]; @@ -91,25 +83,18 @@ public class ReduceWorker { this.sorts[index] = tosort; } /*for(int i = 0; i < this.sorts.length; ++i) { - System.printString(this.sorts[i] + "; "); - } - System.printString("\n");*/ + System.printString(this.sorts[i] + "; "); + } + System.printString("\n");*/ } public void reduce() { - /*if(ID % 2 == 1) { - int a[] = new int[1]; - int temp = a[1]; - }*/ - if(this.interoutputs != null) { - // return; - //} - for(int i = 0; i < this.sorts.length; ++i) { - String key = (String)this.keys.elementAt(this.sorts[i]); - Vector values = (Vector)this.values.get(key); - MapReduceBase.reduce(key, values, output); - } + for(int i = 0; i < this.sorts.length; ++i) { + String key = (String)this.keys.elementAt(this.sorts[i]); + Vector values = (Vector)this.values.get(key); + MapReduceBase.reduce(key, values, output); + } } // output all the result into some local file @@ -126,6 +111,8 @@ public class ReduceWorker { oStream.flush(); } oStream.close(); + this.keys = null; + this.values = null; } public String getOutputFile() { diff --git a/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java b/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java index d61bf023..07e69e7f 100644 --- a/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java +++ b/Robust/src/Benchmarks/MapReduce/Tag/Splitter.java @@ -2,12 +2,21 @@ public class Splitter { String filename; String content; int length; - int[] splits; - String[] slices; + int splitNum; + char seperator; public Splitter(String path, int splitNum, char seperator) { //System.printString("Top of Splitter's constructor\n"); filename = path; + this.length = -1; + this.splitNum = splitNum; + this.seperator = seperator; + } + + public String[] split() { + int[] splits; + String[] slices; + FileInputStream iStream = new FileInputStream(filename); byte[] b = new byte[1024 * 1024]; length = iStream.read(b); @@ -22,6 +31,7 @@ public class Splitter { if(splitNum == 1) { slices = new String[1]; slices[0] = content; + this.content = null; } else { splits = new int[splitNum - 1]; int index = 0; @@ -38,36 +48,25 @@ public class Splitter { splits[i] = index; } - this.slices = new String[splits.length + 1]; - for(int i = 0; i < this.slices.length; ++i) { - this.slices[i] = null; - } - } - } - - public void split() { - if(slices.length == 1) { - return; - } - int start = 0; - int end = 0; - for(int i = 0; i < splits.length; ++i) { - end = splits[i]; - if(end < start) { - slices[i] = null; - } else { - slices[i] = content.subString(start, end); + slices = new String[splitNum]; + int start = 0; + int end = 0; + for(int i = 0; i < splits.length; ++i) { + end = splits[i]; + if(end < start) { + slices[i] = null; + } else { + slices[i] = content.subString(start, end); + } + start = end + 1; } - start = end + 1; + slices[slices.length - 1] = content.subString(start); + this.content = null; } - slices[slices.length - 1] = content.subString(start); + return slices; } public String getFilename() { return filename; } - - public String[] getSlices() { - return this.slices; - } } -- 2.34.1