From e4485993685d8141de7b103a8afc1d8051b2464f Mon Sep 17 00:00:00 2001 From: bdemsky Date: Mon, 19 Apr 2010 20:06:29 +0000 Subject: [PATCH] changes --- .../Recovery/Spider/recovery/QueryTask.java | 4 ++ .../Recovery/Spider/recovery/Spider.java | 7 +- .../Recovery/Spider/recovery/Task.java | 7 +- .../Recovery/Spider/recovery/TaskSet.java | 4 +- .../Recovery/Spider/recovery/Worker.java | 67 ++++++++++++------- 5 files changed, 59 insertions(+), 30 deletions(-) diff --git a/Robust/src/Benchmarks/Recovery/Spider/recovery/QueryTask.java b/Robust/src/Benchmarks/Recovery/Spider/recovery/QueryTask.java index 4992deee..79011c4d 100644 --- a/Robust/src/Benchmarks/Recovery/Spider/recovery/QueryTask.java +++ b/Robust/src/Benchmarks/Recovery/Spider/recovery/QueryTask.java @@ -27,6 +27,7 @@ public class QueryTask extends Task { int ldepth; atomic { + System.out.println("trans 2"); max = this.maxDepth; maxSearch = this.maxSearchDepth; ldepth=this.depth; @@ -39,6 +40,7 @@ public class QueryTask extends Task { String title; atomic { + System.out.println("trans 3"); hostname = new String(GlobalString.toLocalCharArray(getHostName())); path = new String(GlobalString.toLocalCharArray(getPath())); @@ -65,6 +67,7 @@ public class QueryTask extends Task { if ((title = grabTitle(lq)) != null) { atomic { + System.out.println("trans 4"); //commits everything...either works or fails gTitle = global new GlobalString(title); processPage(lq); @@ -74,6 +77,7 @@ public class QueryTask extends Task { s.close(); } else { atomic { + System.out.println("trans 5"); dequeueTask(); } } diff --git a/Robust/src/Benchmarks/Recovery/Spider/recovery/Spider.java b/Robust/src/Benchmarks/Recovery/Spider/recovery/Spider.java index 00a2d8f6..5a290186 100644 --- a/Robust/src/Benchmarks/Recovery/Spider/recovery/Spider.java +++ b/Robust/src/Benchmarks/Recovery/Spider/recovery/Spider.java @@ -39,7 +39,10 @@ public class Spider { //set up workers ts=global new TaskSet(NUM_THREADS); for (i = 0; i < NUM_THREADS; i++) { - ts.threads[i] = global new Worker(ts,i); + ts.threads[i] = global new Worker(ts,i,(NUM_THREADS/2)); + } + for (i = 0; i < NUM_THREADS/2; i++) { + ts.todo[i] = global new GlobalQueue(); } } @@ -50,7 +53,7 @@ public class Spider { DistributedHashMap results = global new DistributedHashMap(100, 100, 0.75f); DistributedLinkedList results_list = global new DistributedLinkedList(); QueryTask firstquery = global new QueryTask(visitedList, maxDepth, maxSearchDepth, results, results_list, firstmachine, firstpage, 0); - ts.todo.push(firstquery); + ts.todo[0].push(firstquery); } System.printString("Finished to create Objects\n"); diff --git a/Robust/src/Benchmarks/Recovery/Spider/recovery/Task.java b/Robust/src/Benchmarks/Recovery/Spider/recovery/Task.java index 1398c1c6..c45c25e8 100644 --- a/Robust/src/Benchmarks/Recovery/Spider/recovery/Task.java +++ b/Robust/src/Benchmarks/Recovery/Spider/recovery/Task.java @@ -1,16 +1,19 @@ public class Task { //Current worker thread Worker w; + int queueid; public Task() {} public void execute(); - public void setWorker(Worker w) { + public void setWorker(Worker w, int queueid) { this.w = w; + this.queueid = queueid; } public void dequeueTask() { w.workingtask=null; } public void enqueueTask(Task t) { - w.tasks.todo.push(t); + //System.out.println("queueid= " + queueid); + w.tasks.todo[queueid].push(t); } public native void execution(); } diff --git a/Robust/src/Benchmarks/Recovery/Spider/recovery/TaskSet.java b/Robust/src/Benchmarks/Recovery/Spider/recovery/TaskSet.java index c88182d4..1904a89e 100644 --- a/Robust/src/Benchmarks/Recovery/Spider/recovery/TaskSet.java +++ b/Robust/src/Benchmarks/Recovery/Spider/recovery/TaskSet.java @@ -2,11 +2,11 @@ public class TaskSet { public TaskSet(int nt) { numthreads=nt; threads=global new Worker[nt]; - todo=global new GlobalQueue(); + todo=global new GlobalQueue[(nt/2)]; } //Tasks to be executed - GlobalQueue todo; + GlobalQueue[] todo; //Vector of worker threads Worker threads[]; int numthreads; diff --git a/Robust/src/Benchmarks/Recovery/Spider/recovery/Worker.java b/Robust/src/Benchmarks/Recovery/Spider/recovery/Worker.java index 92d026fc..0c8991e4 100644 --- a/Robust/src/Benchmarks/Recovery/Spider/recovery/Worker.java +++ b/Robust/src/Benchmarks/Recovery/Spider/recovery/Worker.java @@ -1,13 +1,13 @@ public class Worker extends Thread { - Object[] currentWorkList; - int mid; + int id; TaskSet tasks; Task workingtask; + int numQueue; - Worker(TaskSet tasks, int mid) { + Worker(TaskSet tasks, int id, int numQueue) { this.tasks = tasks; - this.currentWorkList = currentWorkList; - mid = mid; + this.id = id; + this.numQueue = 3; // Correct this 3 should be hash defined } public void run() { @@ -17,29 +17,48 @@ public class Worker extends Thread { while(notdone) { Task t=null; atomic { - if (!tasks.todo.isEmpty()) { + System.out.println("Transacion 1"); + int qindex = (id%numQueue); + //System.out.println("id= " + id + " numQueue= " + numQueue); + if (!tasks.todo[qindex].isEmpty()) { //grab segment from todo list - t=workingtask=(Task) tasks.todo.pop(); + t=workingtask=(Task) tasks.todo[qindex].pop(); if(t!=null) - t.setWorker(this); + t.setWorker(this, qindex); } else { - //steal work from dead threads - Worker[] threads=tasks.threads; - boolean shouldexit=true; - for(int i=0;i