int ldepth;
atomic {
+ System.out.println("trans 2");
max = this.maxDepth;
maxSearch = this.maxSearchDepth;
ldepth=this.depth;
String title;
atomic {
+ System.out.println("trans 3");
hostname = new String(GlobalString.toLocalCharArray(getHostName()));
path = new String(GlobalString.toLocalCharArray(getPath()));
if ((title = grabTitle(lq)) != null) {
atomic {
+ System.out.println("trans 4");
//commits everything...either works or fails
gTitle = global new GlobalString(title);
processPage(lq);
s.close();
} else {
atomic {
+ System.out.println("trans 5");
dequeueTask();
}
}
//set up workers
ts=global new TaskSet(NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
- ts.threads[i] = global new Worker(ts,i);
+ ts.threads[i] = global new Worker(ts,i,(NUM_THREADS/2));
+ }
+ for (i = 0; i < NUM_THREADS/2; i++) {
+ ts.todo[i] = global new GlobalQueue();
}
}
DistributedHashMap results = global new DistributedHashMap(100, 100, 0.75f);
DistributedLinkedList results_list = global new DistributedLinkedList();
QueryTask firstquery = global new QueryTask(visitedList, maxDepth, maxSearchDepth, results, results_list, firstmachine, firstpage, 0);
- ts.todo.push(firstquery);
+ ts.todo[0].push(firstquery);
}
System.printString("Finished to create Objects\n");
public class Worker extends Thread {
- Object[] currentWorkList;
- int mid;
+ int id;
TaskSet tasks;
Task workingtask;
+ int numQueue;
- Worker(TaskSet tasks, int mid) {
+ Worker(TaskSet tasks, int id, int numQueue) {
this.tasks = tasks;
- this.currentWorkList = currentWorkList;
- mid = mid;
+ this.id = id;
+ this.numQueue = 3; // Correct this 3 should be hash defined
}
public void run() {
while(notdone) {
Task t=null;
atomic {
- if (!tasks.todo.isEmpty()) {
+ System.out.println("Transacion 1");
+ int qindex = (id%numQueue);
+ //System.out.println("id= " + id + " numQueue= " + numQueue);
+ if (!tasks.todo[qindex].isEmpty()) {
//grab segment from todo list
- t=workingtask=(Task) tasks.todo.pop();
+ t=workingtask=(Task) tasks.todo[qindex].pop();
if(t!=null)
- t.setWorker(this);
+ t.setWorker(this, qindex);
} else {
- //steal work from dead threads
- Worker[] threads=tasks.threads;
- boolean shouldexit=true;
- for(int i=0;i<threads.length;i++) {
- Worker w=(Worker)threads[i];
- if (w.workingtask!=null)
- shouldexit=false;
- if (w.getStatus(i)==-1&&w.workingtask!=null) {
- //steal work from this thread
- t=workingtask=w.workingtask;
- w.workingtask=null;
- t.setWorker(this);
- break;
+ int newqindex = qindex;
+ boolean skipvisit = false;
+ for(int queuecount=1;queuecount < numQueue;queuecount++) {
+ newqindex = ((newqindex+1)%numQueue);
+ if (!tasks.todo[newqindex].isEmpty()) {
+ //grab segment from another todo list
+ t=workingtask=(Task) tasks.todo[newqindex].pop();
+ if(t!=null) {
+ t.setWorker(this, qindex);
+ skipvisit = true;
+ break;
+ }
}
}
- if (shouldexit)
- notdone=false;
+ if(!skipvisit) {
+ //steal work from dead threads
+ Worker[] threads=tasks.threads;
+ boolean shouldexit=true;
+ for(int i=0;i<threads.length;i++) {
+ Worker w=(Worker)threads[i];
+ if (w.workingtask!=null)
+ shouldexit=false;
+ if (w.getStatus(i)==-1&&w.workingtask!=null) {
+ //steal work from this thread
+ t=workingtask=w.workingtask;
+ w.workingtask=null;
+ t.setWorker(this, qindex);
+ break;
+ }
+ }
+ if (shouldexit)
+ notdone=false;
+ }
}
}
if (t!=null) {
continue;
} else if (notdone) {
//System.out.println("Not done");
- sleep(500000);
+ sleep(10000);
}
}
fi = System.currentTimeMillis();