locations = new String[r];
for(int i = 0; i < r; ++i) {
- StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+ StringBuffer temp = new StringBuffer("/scratch/mapreduce_java/output-intermediate-map-");
temp.append(String.valueOf(ID));
temp.append("-of-");
temp.append(String.valueOf(r));
}
this.splitter = splitter;
- this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+ this.outputfile = new String("/scratch/mapreduce_java/output.dat");
this.partial = false;
}
}
public void addInterOutput(String interoutput) {
- int start = interoutput.indexOf('_');
+ int start = interoutput.lastindexOf('_');
int end = interoutput.indexOf('.');
- int index = Integer.parseInt(interoutput.substring(start + 1, end));
+ int index = Integer.parseInt(interoutput.subString(start + 1, end));
//System.printString(interoutput.subString(start + 1, end) + "\n");
if(interoutputs[index] == null) {
interoutputs[index] = new Vector();
//try{
FileInputStream iStream = new FileInputStream(file);
FileOutputStream oStream = new FileOutputStream(outputfile, true);
- byte[] b = new byte[1024 * 100];
+ byte[] b = new byte[1024 * 10];
int length = iStream.read(b);
if(length < 0) {
System./*out.println*/printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
//this.sorts = null;
this.output = new OutputCollector();
- this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+ this.outputfile = "/scratch/mapreduce_java/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
}
public MapReduceBase getMapreducer() {
//try{
for(int i = 0; i < interoutputs.size(); ++i) {
FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
- byte[] b = new byte[1024 * 100];
+ byte[] b = new byte[1024 * 10];
int length = iStream.read(b);
if(length < 0) {
System./*out.println*/printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
int temp = a[1];
}*/
- if(this.interoutputs == null) {
- return;
- }
+ if(this.interoutputs != null) {
+ //return;
+ //}
for(int i = 0; i < this.sorts.length; ++i) {
String key = (String)this.keys.elementAt(this.sorts[i]);
Vector values = (Vector)this.values.get(key);
this.mapreducer.reduce(key, values, output);
}
+ }
//try{
// output all the result into some local file
//System.printString("Top of Splitter's constructor\n");
filename = path;
FileInputStream iStream = new FileInputStream(filename);
- byte[] b = new byte[1024 * 10];
+ byte[] b = new byte[1024 * 1024];
length = iStream.read(b);
if(length < 0) {
System./*out.println*/printString("Error! Can not read from input file: " + filename + "\n");
}
boolean isspace(char c) {
- if((c == ' ') ||
- (c == '.') ||
- (c == '!') ||
- (c == '?') ||
- (c == '"') ||
- (c == '\n')) {
- return true;
- }
- return false;
+ if((c == ' ') ||
+ (c == ',') ||
+ (c == '.') ||
+ (c == '!') ||
+ (c == '?') ||
+ (c == '"') ||
+ (c == '(') ||
+ (c == ')') ||
+ (c == '[') ||
+ (c == ']') ||
+ (c == '{') ||
+ (c == '}') ||
+ (c == '\n')) {
+ return true;
+ }
+ return false;
}
}
--- /dev/null
+task startup(StartupObject s{initialstate}) {
+ // read in configuration parameters
+ // System.printString("Top of task startup\n");
+ String path = new String("/scratch/mapreduce_nor/conf.txt");
+ FileInputStream iStream = new FileInputStream(path);
+ byte[] b = new byte[1024];
+ int length = iStream.read(b);
+ if(length < 0 ) {
+ System.printString("Error! Can not read from configure file: " + path + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ String inputfile = content.subString(0, index);
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ int m = Integer.parseInt(content.subString(0, index));
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ int r = Integer.parseInt(content.subString(0, index));
+ content = content.subString(index + 1);
+ index = content.indexOf('\n');
+ String temp = content.subString(0, index);
+ char seperator = temp.charAt(0);
+ //System.printString(inputfile + "; " + String.valueOf(m) + "; " + String.valueOf(r) + "\n");
+ Splitter splitter = new Splitter(inputfile, m, seperator);
+ Master master = new Master(m, r, splitter){split};
+
+ taskexit(s{!initialstate});
+}
+
+//Split the input file into M pieces
+task split(Master master{split}) {
+ System.printString("Top of task split\n");
+ master.split();
+
+ taskexit(master{!split, assignMap});
+}
+
+//Select a map worker to handle one of the pieces of input file
+task assignMap(Master master{assignMap}) {
+ System.printString("Top of task assignMap\n");
+ master.assignMap();
+
+ taskexit(master{!assignMap, mapoutput});
+}
+
+//MapWorker do 'map' function on a input file piece
+task map(MapWorker mworker{map}) {
+ System.printString("Top of task map\n");
+ mworker.map();
+
+ taskexit(mworker{!map, partition});
+}
+
+//Partition the intermediate key/value pair generated
+//into R intermediate local files
+task partition(MapWorker mworker{partition}) {
+ System.printString("Top of task partition\n");
+ mworker.partition();
+
+ taskexit(mworker{!partition, mapoutput});
+}
+
+//Register the intermediate ouput from map worker to master
+task mapOutput(Master master{mapoutput}, /*optional*/ MapWorker mworker{mapoutput}) {
+ System.printString("Top of task mapOutput\n");
+ //if(isavailable(mworker)) {
+ int total = master.getR();
+ for(int i = 0; i < total; ++i) {
+ String temp = mworker.outputFile(i);
+ if(temp != null) {
+ master.addInterOutput(temp);
+ }
+ }
+ master.setMapFinish(mworker.getID());
+ /*} else {
+ master.setMapFail(mworker.getID());
+ master.setPartial(true);
+ }*/
+ if(master.isMapFinish()) {
+ taskexit(master{!mapoutput, mapfinished, assignReduce}, mworker{!mapoutput});
+ }
+
+ taskexit(mworker{!mapoutput});
+}
+
+//Assign the list of intermediate output associated to one key to
+//a reduce worker
+task assignReduce(Master master{assignReduce}) {
+ System.printString("Top of task assignReduce\n");
+ master.assignReduce();
+
+ taskexit(master{!assignReduce, reduceoutput});
+}
+
+//First do sort and group on the intermediate key/value pairs assigned
+//to reduce worker
+task sortgroup(ReduceWorker rworker{sortgroup}) {
+ System.printString("Top of task sortgroup\n");
+ rworker.sortgroup();
+
+ taskexit(rworker{!sortgroup, reduce});
+}
+
+//Do 'reduce' function
+task reduce(ReduceWorker rworker{reduce}) {
+ System.printString("Top of task reduce\n");
+ rworker.reduce();
+
+ taskexit(rworker{!reduce, reduceoutput});
+}
+
+//Collect the output into master
+task reduceOutput(Master master{reduceoutput}, /*optional*/ ReduceWorker rworker{reduceoutput}) {
+ System.printString("Top of task reduceOutput\n");
+ //if(isavailable(rworker)) {
+ master.collectROutput(rworker.getOutputFile());
+ master.setReduceFinish(rworker.getID());
+ /* } else {
+ master.setReduceFail(rworker.getID());
+ master.setPartial(true);
+ }*/
+ if(master.isReduceFinish()) {
+ //System.printString("reduce finish\n");
+ taskexit(master{!reduceoutput, reducefinished, output}, rworker{!reduceoutput});
+ }
+
+ taskexit(rworker{!reduceoutput});
+}
+
+task output(Master master{output}) {
+ System.printString("Top of task output\n");
+ if(master.isPartial()) {
+ System.printString("Partial! The result may not be right due to some failure!\n");
+ }
+ System.printString("Finish! Results are in the output file: " + master.getOutputFile() + "\n");
+ taskexit(master{!output});
+}
--- /dev/null
+public class MapReduceBase {
+
+ public static void map(String key, String value, OutputCollector output) {
+ int n = value.length();
+ for (int i = 0; i < n; ) {
+ // Skip past leading whitespace
+ while ((i < n) && isspace(value.charAt(i))) {
+ ++i;
+ }
+
+ // Find word end
+ int start = i;
+ while ((i < n) && !isspace(value.charAt(i))) {
+ i++;
+ }
+
+ if (start < i) {
+ output.emit(value.subString(start, i), "1");
+ //System.printString(value.subString(start,i) + "\n");
+ }
+ }
+ }
+
+ public static void reduce(String key, Vector values, OutputCollector output) {
+ // Iterate over all entries with the
+ // // same key and add the values
+ int value = 0;
+ for(int i = 0; i < values.size(); ++i) {
+ value += Integer.parseInt((String)values.elementAt(i));
+ }
+
+ // Emit sum for input->key()
+ output.emit(key, String.valueOf(value));
+ }
+
+ static boolean isspace(char c) {
+ if((c == ' ') ||
+ (c == ',') ||
+ (c == '.') ||
+ (c == '!') ||
+ (c == '?') ||
+ (c == '"') ||
+ (c == '(') ||
+ (c == ')') ||
+ (c == '[') ||
+ (c == ']') ||
+ (c == '{') ||
+ (c == '}') ||
+ (c == '\n')) {
+ return true;
+ }
+ return false;
+ }
+}
+
--- /dev/null
+public class MapWorker {
+ flag map;
+ flag partition;
+ flag mapoutput;
+
+ int ID;
+
+ int r;
+ String key;
+ String value;
+ OutputCollector output;
+
+ String[] locations;
+ FileOutputStream[] outputs;
+
+ public MapWorker(String key, String value, int r, int id) {
+ this.ID = id;
+ this.r = r;
+
+ this.key = key;
+ this.value = value;
+ this.output = new OutputCollector();
+
+ locations = new String[r];
+ for(int i = 0; i < r; ++i) {
+ StringBuffer temp = new StringBuffer("/scratch/mapreduce_nor/output-intermediate-map-");
+ temp.append(String.valueOf(ID));
+ temp.append("-of-");
+ temp.append(String.valueOf(r));
+ temp.append("_");
+ temp.append(String.valueOf(i));
+ temp.append(".dat");
+ locations[i] = new String(temp);
+ }
+
+ outputs = new FileOutputStream[r];
+ for(int i = 0; i < r; ++i) {
+ outputs[i] = null;
+ }
+ }
+
+ public void map() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+
+ MapReduceBase.map(key, value, output);
+ }
+
+ public void partition() {
+ /*if(ID % 2 == 1) {
+ String temp = locations[locations.length];
+ }*/
+
+ int size = this.output.size();
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // use the hashcode of key to decide which intermediate output
+ // this pair should be in
+ int index = (int)Math.abs(key.hashCode()) % this.r;
+ FileOutputStream oStream = outputs[index];
+ if(oStream == null) {
+ // open the file
+ String filepath = locations[index];
+ oStream = new FileOutputStream(filepath, true); // append
+ outputs[index] = oStream;
+ }
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+
+ // close the output files
+ for(int i = 0; i < this.outputs.length; ++i) {
+ FileOutputStream temp = this.outputs[i];
+ if(temp != null) {
+ temp.close();
+ }
+ }
+ }
+
+ public String outputFile(int i) {
+ if(outputs[i] != null) {
+ return locations[i];
+ } else {
+ return null;
+ }
+ }
+
+ public int getID() {
+ return this.ID;
+ }
+
+ public int getR() {
+ return this.r;
+ }
+
+}
--- /dev/null
+public class Master {
+ flag split;
+ flag assignMap;
+ flag mapoutput;
+ flag mapfinished;
+ flag assignReduce;
+ flag reduceoutput;
+ flag reducefinished;
+ flag output;
+
+ int m;
+ int r;
+ int[] mworkerStates; // array of map worker's state
+ // 0: idle 1: process 2: finished 3: fail
+ int[] rworkerStates; // array of reduce worker's state
+ Vector[] interoutputs; // array of string vector containing
+ // paths of intermediate outputs from
+ // map worker
+
+ Splitter splitter;
+
+ String outputfile; // path of final output file
+
+ boolean partial;
+
+ public Master(int m, int r, Splitter splitter) {
+ this.m = m;
+ this.r = r;
+
+ mworkerStates = new int[m];
+ rworkerStates = new int[r];
+ for(int i = 0; i < m; ++i) {
+ mworkerStates[i] = 0;
+ }
+ for(int i = 0; i < r; ++i) {
+ rworkerStates[i] = 0;
+ }
+
+ interoutputs = new Vector[r];
+ for(int i = 0; i < r; ++i) {
+ interoutputs[i] = null;
+ }
+
+ this.splitter = splitter;
+ this.outputfile = new String("/scratch/mapreduce_nor/output.dat");
+
+ this.partial = false;
+ }
+
+ public int getR() {
+ return this.r;
+ }
+
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+
+ public boolean isPartial() {
+ return this.partial;
+ }
+
+ public void setPartial(boolean partial) {
+ this.partial = partial || this.partial;
+ }
+
+ public void split() {
+ splitter.split();
+ }
+
+ public void assignMap() {
+ String[] contentsplits = splitter.getSlices();
+ for(int i = 0; i < contentsplits.length; ++i) {
+ //System.printString("*************************\n");
+ //System.printString(contentsplits[i] + "\n");
+ //System.printString("*************************\n");
+ MapWorker mworker = new MapWorker(splitter.getFilename(), contentsplits[i], r, i){map};
+ mworkerStates[i] = 1;
+ }
+ }
+
+ public void setMapFinish(int i) {
+ mworkerStates[i] = 2;
+ }
+
+ public void setMapFail(int i) {
+ mworkerStates[i] = 3;
+ }
+
+ public boolean isMapFinish() {
+ for(int i = 0; i < mworkerStates.length; ++i) {
+ if(mworkerStates[i] == 1) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void addInterOutput(String interoutput) {
+ int start = interoutput.lastindexOf('_');
+ int end = interoutput.indexOf('.');
+ int index = Integer.parseInt(interoutput.subString(start + 1, end));
+ //System.printString(interoutput.subString(start + 1, end) + "\n");
+ if(interoutputs[index] == null) {
+ interoutputs[index] = new Vector();
+ }
+ interoutputs[index].addElement(interoutput);
+ }
+
+ public void assignReduce() {
+ for(int i = 0; i < interoutputs.length; ++i) {
+ ReduceWorker rworker = new ReduceWorker(interoutputs[i], i){sortgroup};
+ rworkerStates[i] = 1;
+ }
+ }
+
+ public void setReduceFinish(int i) {
+ rworkerStates[i] = 2;
+ }
+
+ public void setReduceFail(int i) {
+ rworkerStates[i] = 3;
+ }
+
+ public boolean isReduceFinish() {
+ for(int i = 0; i < rworkerStates.length; ++i) {
+ if(rworkerStates[i] == 1) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public void collectROutput(String file) {
+ FileInputStream iStream = new FileInputStream(file);
+ FileOutputStream oStream = new FileOutputStream(outputfile, true);
+ byte[] b = new byte[1024 * 10];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
+ System.exit(-1);
+ }
+ //System.printString(new String(b, 0, length) + "\n");
+ oStream.write(b, 0, length);
+ iStream.close();
+ oStream.close();
+ }
+}
--- /dev/null
+public class OutputCollector {
+
+ Vector keys;
+ Vector values;
+
+ public OutputCollector() {
+ this.keys = new Vector();
+ this.values = new Vector();
+ }
+
+ public void emit(String key, String value) {
+ this.keys.addElement(key);
+ this.values.addElement(value);
+ }
+
+ public int size() {
+ return this.keys.size();
+ }
+
+ public String getKey(int i) {
+ return (String)this.keys.elementAt(i);
+ }
+
+ public String getValue(int i) {
+ return (String)this.values.elementAt(i);
+ }
+}
--- /dev/null
+public class ReduceWorker {
+ flag sortgroup;
+ flag reduce;
+ flag reduceoutput;
+
+ int ID;
+ Vector interoutputs; // string vector containing paths
+ // of intermediate outputs from map worker
+ Vector keys;
+ HashMap values; // hashmap map key to vector of string vector
+ int[] sorts; // array record the sort of keys
+ OutputCollector output;
+ String outputfile; // path of the intermediate output file
+
+ public ReduceWorker(Vector interoutputs, int id) {
+ this.ID = id;
+ this.interoutputs = interoutputs;
+
+ this.keys = new Vector();
+ this.values = new HashMap();
+ //this.sorts = null;
+
+ this.output = new OutputCollector();
+ this.outputfile = "/scratch/mapreduce_nor/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+ }
+
+ public void sortgroup() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+
+ // group values associated to the same key
+ //System.printString("================================\n");
+ if(interoutputs == null) {
+ return;
+ }
+ for(int i = 0; i < interoutputs.size(); ++i) {
+ FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
+ byte[] b = new byte[1024 * 10];
+ int length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
+ System.exit(-1);
+ }
+ String content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ int index = content.indexOf('\n');
+ while(index != -1) {
+ String line = content.subString(0, index);
+ content = content.subString(index + 1);
+ //System.printString(line + "\n");
+ int tmpindex = line.indexOf(' ');
+ String key = line.subString(0, tmpindex);
+ String value = line.subString(tmpindex + 1);
+ //System.printString(key + "; " + value + "\n");
+ if(!this.values.containsKey(key)) {
+ this.values.put(key, new Vector());
+ this.keys.addElement(key);
+ }
+ ((Vector)this.values.get(key)).addElement(value);
+ index = content.indexOf('\n');
+ }
+ iStream.close();
+ }
+ //System.printString("================================\n");
+
+ /*for(int i = 0; i < this.keys.size(); ++i) {
+ System.printString((String)this.keys.elementAt(i) + ", " + ((String)this.keys.elementAt(i)).hashCode() + "; ");
+ }
+ System.printString("\n");*/
+
+ // sort all the keys inside interoutputs
+ this.sorts = new int[this.keys.size()];
+ // insert sorting
+ this.sorts[0] = 0;
+ int tosort = 1;
+ for(; tosort < this.keys.size(); ++tosort) {
+ int tosortkey = ((String)this.keys.elementAt(tosort)).hashCode();
+ int index = tosort;
+ for(int i = tosort; i > 0; --i) {
+ if(((String)this.keys.elementAt(this.sorts[i - 1])).hashCode() > tosortkey) {
+ this.sorts[i] = this.sorts[i-1];
+ index = i - 1;
+ } else {
+ //System.printString(i + "; " + tosort + "\n");
+ index = i;
+ i = 0;
+ }
+ }
+ this.sorts[index] = tosort;
+ }
+ /*for(int i = 0; i < this.sorts.length; ++i) {
+ System.printString(this.sorts[i] + "; ");
+ }
+ System.printString("\n");*/
+ }
+
+ public void reduce() {
+ /*if(ID % 2 == 1) {
+ int a[] = new int[1];
+ int temp = a[1];
+ }*/
+
+ if(this.interoutputs != null) {
+ // return;
+ //}
+ for(int i = 0; i < this.sorts.length; ++i) {
+ String key = (String)this.keys.elementAt(this.sorts[i]);
+ Vector values = (Vector)this.values.get(key);
+ MapReduceBase.reduce(key, values, output);
+ }
+ }
+
+ // output all the result into some local file
+ int size = this.output.size();
+ FileOutputStream oStream = new FileOutputStream(outputfile, true); // append
+ for(int i = 0; i < size; ++i) {
+ String key = this.output.getKey(i);
+ String value = this.output.getValue(i);
+ // format: key value\n
+ oStream.write(key.getBytes());
+ oStream.write(' ');
+ oStream.write(value.getBytes());
+ oStream.write('\n');
+ oStream.flush();
+ }
+ oStream.close();
+ }
+
+ public String getOutputFile() {
+ return this.outputfile;
+ }
+
+ public int getID() {
+ return this.ID;
+ }
+}
--- /dev/null
+public class Splitter {
+ String filename;
+ String content;
+ int length;
+ int[] splits;
+ String[] slices;
+
+ public Splitter(String path, int splitNum, char seperator) {
+ //System.printString("Top of Splitter's constructor\n");
+ filename = path;
+ FileInputStream iStream = new FileInputStream(filename);
+ byte[] b = new byte[1024 * 1024];
+ length = iStream.read(b);
+ if(length < 0) {
+ System.printString("Error! Can not read from input file: " + filename + "\n");
+ System.exit(-1);
+ }
+ content = new String(b, 0, length);
+ //System.printString(content + "\n");
+ iStream.close();
+
+ if(splitNum == 1) {
+ slices = new String[1];
+ slices[0] = content;
+ } else {
+ splits = new int[splitNum - 1];
+ int index = 0;
+ int span = length / splitNum;
+ int temp = 0;
+ for(int i = 0; i < splitNum - 1; ++i) {
+ temp += span;
+ if(temp > index) {
+ index = temp;
+ while((content.charAt(index) != seperator) && (index != length - 1)) {
+ ++index;
+ }
+ }
+ splits[i] = index;
+ }
+
+ this.slices = new String[splits.length + 1];
+ for(int i = 0; i < this.slices.length; ++i) {
+ this.slices[i] = null;
+ }
+ }
+ }
+
+ public void split() {
+ if(slices.length == 1) {
+ return;
+ }
+ int start = 0;
+ int end = 0;
+ for(int i = 0; i < splits.length; ++i) {
+ end = splits[i];
+ if(end < start) {
+ slices[i] = null;
+ } else {
+ slices[i] = content.subString(start, end);
+ }
+ start = end + 1;
+ }
+ slices[slices.length - 1] = content.subString(start);
+ }
+
+ public String getFilename() {
+ return filename;
+ }
+
+ public String[] getSlices() {
+ return this.slices;
+ }
+}
task startup(StartupObject s{initialstate}) {
// read in configuration parameters
// System.printString("Top of task startup\n");
- String path = new String("/home/jzhou/mapreduce/conf.txt");
+ String path = new String("/scratch/mapreduce_opt/conf.txt");
FileInputStream iStream = new FileInputStream(path);
byte[] b = new byte[1024];
int length = iStream.read(b);
locations = new String[r];
for(int i = 0; i < r; ++i) {
- StringBuffer temp = new StringBuffer("/home/jzhou/mapreduce/output-intermediate-map-");
+ StringBuffer temp = new StringBuffer("/scratch/mapreduce_opt/output-intermediate-map-");
temp.append(String.valueOf(ID));
temp.append("-of-");
temp.append(String.valueOf(r));
}
this.splitter = splitter;
- this.outputfile = new String("/home/jzhou/mapreduce/output.dat");
+ this.outputfile = new String("/scratch/mapreduce_opt/output.dat");
this.partial = false;
}
}
public void addInterOutput(String interoutput) {
- int start = interoutput.indexOf('_');
+ int start = interoutput.lastindexOf('_');
int end = interoutput.indexOf('.');
int index = Integer.parseInt(interoutput.subString(start + 1, end));
//System.printString(interoutput.subString(start + 1, end) + "\n");
public void collectROutput(String file) {
FileInputStream iStream = new FileInputStream(file);
FileOutputStream oStream = new FileOutputStream(outputfile, true);
- byte[] b = new byte[1024 * 100];
+ byte[] b = new byte[1024 * 10];
int length = iStream.read(b);
if(length < 0) {
System.printString("Error! Can not read from intermediate output file from reduce worker: " + file + "\n");
//this.sorts = null;
this.output = new OutputCollector();
- this.outputfile = "/home/jzhou/mapreduce/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
+ this.outputfile = "/scratch/mapreduce_opt/output-intermediate-reduce-" + String.valueOf(id) + ".dat";
}
public void sortgroup() {
}
for(int i = 0; i < interoutputs.size(); ++i) {
FileInputStream iStream = new FileInputStream((String)interoutputs.elementAt(i));
- byte[] b = new byte[1024 * 100];
+ byte[] b = new byte[1024 * 10];
int length = iStream.read(b);
if(length < 0) {
System.printString("Error! Can not read from intermediate ouput file of map worker: " + (String)interoutputs.elementAt(i) + "\n");
//System.printString("Top of Splitter's constructor\n");
filename = path;
FileInputStream iStream = new FileInputStream(filename);
- byte[] b = new byte[1024 * 10];
+ byte[] b = new byte[1024 * 1024];
length = iStream.read(b);
if(length < 0) {
System.printString("Error! Can not read from input file: " + filename + "\n");