3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 objtoinfo=new Hashtable();
35 threadinfo=new ThreadInfo[e.numThreads()];
36 blocked=new boolean[e.numThreads()];
38 for(int i=0;i<e.numThreads();i++) {
39 backoff[i]=BACKOFFSTART;
40 threadinfo[i]=new ThreadInfo(this);
44 serCommit=p.getSeries("COMMIT");
45 serStart=p.getSeries("START");
46 serAbort=p.getSeries("ABORT");
47 serStall=p.getSeries("STALL");
48 serWake=p.getSeries("WAKE");
49 serAvoid=p.getSeries("AVOIDDEADLOCK");
61 public int getDeadLockCount() {
65 //Where to start the backoff delay at
66 public static final int BACKOFFSTART=1;
69 public static final int LAZY=0;
70 public static final int COMMIT=1;
71 public static final int ATTACK=2;
72 public static final int SUICIDE=3;
73 public static final int TIMESTAMP=4;
74 public static final int LOCK=5;
75 public static final int LOCKCOMMIT=6;
76 public static final int RANDOM=7;
77 public static final int KARMA=8;
78 public static final int POLITE=9;
79 public static final int ERUPTION=10;
90 Event[] currentevents;
95 ThreadInfo[] threadinfo;
99 public boolean isEager() {
100 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE;
103 public boolean countObjects() {
104 return policy==KARMA||policy==ERUPTION;
107 public boolean isLock() {
108 return policy==LOCK||policy==LOCKCOMMIT;
111 public int getAborts() {
115 public int getCommits() {
119 public long getTime() {
120 return shorttesttime-starttime;
123 //Aborts another thread...
124 public void reschedule(int currthread, long time) {
125 currentevents[currthread].makeInvalid();
126 if (threadinfo[currthread].isStalled()) {
127 //remove from waiter list
128 threadinfo[currthread].setStall(false);
129 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
131 if (serAbort!=null) {
132 serAbort.addPoint(time, currthread);
134 Transaction trans=currentevents[currthread].getTransaction();
136 releaseObjects(trans, currthread, time);
137 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
138 currentevents[currthread]=nev;
142 //Aborts another thread...
143 public void stall(Event ev, long time) {
149 private void releaseObjects(Transaction trans, int currthread, long time) {
151 for(int i=0;i<trans.numEvents();i++) {
152 ObjIndex object=trans.getObjIndex(i);
154 if (object!=null&&rdobjmap.containsKey(object)) {
155 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
157 if (object!=null&&wrobjmap.containsKey(object)) {
158 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
160 if (object!=null&&objtoinfo.containsKey(object)) {
161 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
162 if (oi.getOwner()==currentevents[currthread].getThread()) {
166 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
167 //requeue everyone who was waiting on us and start them back up
168 Event waiter=(Event)waitit.next();
170 waiter.setTime(time);
171 threadinfo[waiter.getThread()].setStall(false);
173 serWake.addPoint(time,waiter.getThread());
174 oi.setOwner(waiter.getThread());
183 /* Initializes things and returns number of transactions */
184 public int startinitial() {
186 for(int i=0;i<e.numThreads();i++) {
187 Transaction trans=e.getThread(i).getTransaction(0);
188 long time=trans.getTime(0);
189 Event ev=new Event(time, trans, 0, i, 0);
192 tcount+=e.getThread(i).numTransactions();
197 public void dosim() {
199 //start first transactions
200 int numtrans=startinitial();
201 System.out.println("Number of transactions="+numtrans);
203 while(!eq.isEmpty()) {
204 Event ev=(Event)eq.poll();
209 Transaction trans=ev.getTransaction();
211 int event=ev.getEvent();
212 long currtime=ev.getTime();
214 if (trans.started&&starttime==-1)
217 if (trans.numEvents()==(event+1)) {
218 tryCommit(ev, trans);
220 if ((tcount%100000)==0)
221 System.out.println("Attempted "+tcount+"transactions");
223 enqueueEvent(ev, trans);
226 shorttesttime=lasttime;
231 private ObjectInfo getmapping(ObjIndex obj) {
232 if (!objtoinfo.containsKey(obj))
233 objtoinfo.put(obj, new ObjectInfo(this));
234 return (ObjectInfo)objtoinfo.get(obj);
237 public void tryCommit(Event ev, Transaction trans) {
238 //ready to commit this one
239 long currtime=ev.getTime();
240 releaseObjects(trans, ev.getThread(), currtime);
242 //See if we have been flagged as aborted for the lazy case
243 boolean abort=aborted[ev.getThread()];
244 aborted[ev.getThread()]=false;
246 //if it is a transaction, increment comit count
247 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
248 threadinfo[ev.getThread()].priority=0;
250 if (serCommit!=null) {
251 serCommit.addPoint(ev.getTime(),ev.getThread());
254 //Reset our backoff counter
255 backoff[ev.getThread()]=BACKOFFSTART;
256 retrycount[ev.getThread()]=0;
258 //abort the other threads
259 for(int i=0;i<trans.numEvents();i++) {
260 ObjIndex object=trans.getObjIndex(i);
261 int op=trans.getEvent(i);
262 //Mark commits to objects
263 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
265 System.out.println(op);
267 getmapping(object).recordCommit();
269 //Check for threads we might cause to abort
270 if (op==Transaction.WRITE) {
271 HashSet abortset=new HashSet();
272 if (rdobjmap.containsKey(object)) {
273 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
274 Integer threadid=(Integer)it.next();
275 abortset.add(threadid);
277 ObjectInfo oi=getmapping(object);
282 if (wrobjmap.containsKey(object)) {
283 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
284 Integer threadid=(Integer)it.next();
285 abortset.add(threadid);
286 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
287 //if this object hasn't already cause this thread to
288 //abort, then flag it as an abort cause
289 ObjectInfo oi=getmapping(object);
294 for(Iterator abit=abortset.iterator();abit.hasNext();) {
295 Integer threadid=(Integer)abit.next();
296 if (policy==LAZY||policy==LOCK) {
297 //just flag to abort when it trie to commit
298 aborted[threadid]=true;
300 serAbort.addPoint(currtime, threadid);
301 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
302 //abort it immediately
303 reschedule(threadid, currtime);
313 //add next transaction event...could be us if we aborted
314 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
315 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
316 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
317 if (serStart!=null) {
318 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
319 serStart.addPoint(ev.getTime(),ev.getThread());
323 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
324 currentevents[ev.getThread()]=nev;
329 public Set rdConflictSet(int thread, ObjIndex obj) {
330 if (!wrobjmap.containsKey(obj))
332 HashSet conflictset=new HashSet();
333 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
334 Integer threadid=(Integer)it.next();
335 if (threadid.intValue()!=thread)
336 conflictset.add(threadid);
338 if (conflictset.isEmpty())
344 public Set wrConflictSet(int thread, ObjIndex obj) {
345 HashSet conflictset=new HashSet();
346 if (rdobjmap.containsKey(obj)) {
347 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
348 Integer threadid=(Integer)it.next();
349 if (threadid.intValue()!=thread)
350 conflictset.add(threadid);
353 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
354 Integer threadid=(Integer)it.next();
355 if (threadid.intValue()!=thread)
356 conflictset.add(threadid);
358 if (conflictset.isEmpty())
364 //Takes as parameter -- current transaction read event ev, conflicting
365 //set of threads, and the current time
366 //Returning false causes current transaction not continue to be scheduled
368 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
369 if (policy==RANDOM) {
370 boolean b=r.nextBoolean();
373 stall(ev, time+r.nextInt(200));
376 //abort other transactions
377 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
378 Integer thread=(Integer)thit.next();
379 reschedule(thread, time);
384 } else if (policy==KARMA) {
386 //abort other transactions
387 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
388 Integer thread=(Integer)thit.next();
389 if (threadinfo[thread].priority>maxpriority)
390 maxpriority=threadinfo[thread].priority;
392 if (maxpriority>threadinfo[ev.getThread()].priority) {
394 threadinfo[ev.getThread()].priority++;
395 //stall for a little while
400 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
401 Integer thread=(Integer)thit.next();
402 reschedule(thread, time);
407 } else if (policy==ERUPTION) {
409 //abort other transactions
410 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
411 Integer thread=(Integer)thit.next();
412 if (threadinfo[thread].priority>maxpriority)
413 maxpriority=threadinfo[thread].priority;
415 if (maxpriority>threadinfo[ev.getThread()].priority) {
417 //stall for a little while
419 int ourpriority=threadinfo[ev.getThread()].priority;
420 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
421 Integer thread=(Integer)thit.next();
422 threadinfo[thread].priority+=ourpriority;;
427 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
428 Integer thread=(Integer)thit.next();
429 reschedule(thread, time);
434 } else if (policy==POLITE) {
435 int retry=retrycount[ev.getThread()]++;
437 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
438 Integer thread=(Integer)thit.next();
439 reschedule(thread, time);
440 int dback=backoff[thread.intValue()]*2;
442 backoff[thread.intValue()]=dback;
448 stall(ev, time+r.nextInt((1<<retry)*12));
451 } else if (policy==ATTACK) {
452 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
453 Integer thread=(Integer)thit.next();
454 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
455 int dback=backoff[thread.intValue()]*2;
457 backoff[thread.intValue()]=dback;
461 } else if (policy==SUICIDE) {
462 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
463 int dback=backoff[ev.getThread()]*2;
465 backoff[ev.getThread()]=dback;
468 } else if (policy==TIMESTAMP) {
471 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
472 Integer thread=(Integer)thit.next();
473 Event other=currentevents[thread.intValue()];
474 int eventnum=other.getEvent();
475 long otime=other.getTransaction().getTime(other.getEvent());
476 if (otime>opponenttime)
479 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
481 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
486 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
487 Integer thread=(Integer)thit.next();
488 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
499 //Handle current event (read, write, delay) in a transaction and
500 //enqueue the next one
502 public void enqueueEvent(Event ev, Transaction trans) {
503 //just enqueue next event
504 int event=ev.getEvent();
505 long currtime=ev.getTime();
506 ObjIndex object=trans.getObjIndex(event);
507 int operation=trans.getEvent(event);
509 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
510 ObjectInfo oi=getmapping(object);
513 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
514 //we're going to wait
515 boolean deadlocked=true;
517 for(int i=0;i<checkdepth;i++) {
518 //check if stalling would close the loop
519 if (toi.getOwner()==ev.getThread())
521 //see if cycle is broken
522 if (!threadinfo[toi.getOwner()].isStalled()) {
526 //follow one more in depth
527 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
531 //don't wait on stalled threads, we could deadlock
532 threadinfo[ev.getThread()].setStall(true);
533 threadinfo[ev.getThread()].setObjIndex(object);
535 serStall.addPoint(ev.getTime(),ev.getThread());
540 serAvoid.addPoint(ev.getTime(),ev.getThread());
545 oi.setOwner(ev.getThread());
550 //process the current event
551 if (operation==Transaction.READ) {
553 if (!rdobjmap.containsKey(object))
554 rdobjmap.put(object,new HashSet());
555 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
557 if (countObjects()) {
558 threadinfo[ev.getThread()].priority++;
562 //do eager contention management
563 Set conflicts=rdConflictSet(ev.getThread(), object);
564 if (conflicts!=null) {
565 if (!handleConflicts(ev, conflicts, currtime))
569 } else if (operation==Transaction.WRITE) {
571 if (!wrobjmap.containsKey(object))
572 wrobjmap.put(object,new HashSet());
573 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
574 if (countObjects()) {
575 threadinfo[ev.getThread()].priority++;
579 Set conflicts=wrConflictSet(ev.getThread(), object);
580 if (conflicts!=null) {
581 if (!handleConflicts(ev, conflicts, currtime))
585 } else if (operation==Transaction.BARRIER) {
587 if (barriercount==0) {
588 for(int i=0;i<e.numThreads();i++) {
589 //enqueue the next event
590 Event bev=currentevents[i];
591 int bevent=bev.getEvent();
592 long bcurrtime=bev.getTime();
593 Transaction btrans=bev.getTransaction();
594 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
595 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
596 currentevents[bev.getThread()]=nev;
599 barriercount=e.numThreads();
602 //wait until all threads in barrier
607 //enqueue the next event
608 long deltatime=trans.getTime(event+1)-trans.getTime(event);
609 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
610 currentevents[ev.getThread()]=nev;
615 class Event implements Comparable {
623 public void makeInvalid() {
627 public boolean isValid() {
631 public int getTransNum() {
635 public Transaction getTransaction() {
639 public int getEvent() {
643 public long getTime() {
647 public void setTime(long time) {
651 public int getThread() {
655 public Event(long time, Transaction t, int num, int threadid, int transnum) {
659 this.threadid=threadid;
660 this.transnum=transnum;
664 //break ties to allow commits to occur earliest
665 public int compareTo(Object o) {
667 long delta=time-e.time;
674 if (((getEvent()+1)==getTransaction().numEvents())&&
675 (e.getEvent()+1)!=e.getTransaction().numEvents())
677 if (((getEvent()+1)!=getTransaction().numEvents())&&
678 (e.getEvent()+1)==e.getTransaction().numEvents())