3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 retrycount=new int[e.numThreads()];
34 transferred=new int[e.numThreads()];
35 objtoinfo=new Hashtable();
36 threadinfo=new ThreadInfo[e.numThreads()];
37 blocked=new boolean[e.numThreads()];
39 for(int i=0;i<e.numThreads();i++) {
40 backoff[i]=BACKOFFSTART;
41 threadinfo[i]=new ThreadInfo(this);
45 serCommit=p.getSeries("COMMIT");
46 serStart=p.getSeries("START");
47 serAbort=p.getSeries("ABORT");
48 serStall=p.getSeries("STALL");
49 serWake=p.getSeries("WAKE");
50 serAvoid=p.getSeries("AVOIDDEADLOCK");
65 public int getDeadLockCount() {
69 //Where to start the backoff delay at
70 public static final int BACKOFFSTART=1;
73 public static final int LAZY=0;
74 public static final int COMMIT=1;
75 public static final int ATTACK=2;
76 public static final int SUICIDE=3;
77 public static final int TIMESTAMP=4;
78 public static final int LOCK=5;
79 public static final int LOCKCOMMIT=6;
80 public static final int RANDOM=7;
81 public static final int KARMA=8;
82 public static final int POLITE=9;
83 public static final int ERUPTION=10;
84 public static final int THREAD=11;
85 public static final int ATTACKTIME=12;
86 public static final int ATTACKTHREAD=13;
88 public static String getName(int policy) {
91 return new String("LAZY");
93 return new String("COMMIT");
95 return new String("ATTACK");
97 return new String("TIMID");
99 return new String("TIMESTAMP");
101 return new String("LOCK");
103 return new String("LOCKCOMMIT");
105 return new String("RANDOM");
107 return new String("KARMA");
109 return new String("POLITE");
111 return new String("ERUPTION");
113 return new String("THREAD");
115 return new String("ATTACKTIME");
117 return new String("ATTACKTHREAD");
126 long earliesttime=-1;
135 Event[] currentevents;
141 ThreadInfo[] threadinfo;
145 public boolean isEager() {
146 return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE||policy==ERUPTION||policy==THREAD||policy==ATTACKTIME||policy==ATTACKTHREAD;
149 public boolean countObjects() {
150 return policy==KARMA||policy==ERUPTION;
153 public boolean isLock() {
154 return policy==LOCK||policy==LOCKCOMMIT;
157 public int getAborts() {
161 public int getCommits() {
165 public long getEarliestTime() {
166 return earliesttime-starttime;
169 public long getTime() {
170 return shorttesttime-starttime;
173 public long getStallTime() {
177 public long getBackoffTime() {
178 return backoffcycles;
181 public long getAbortedTime() {
182 return abortedcycles;
185 //Computes wasted time
186 public void timewasted(int currthread, long currtime) {
187 Event e=currentevents[currthread];
188 Transaction trans=e.getTransaction();
189 int eIndex=e.getEvent();
190 long eTime=e.getTime();
191 long timeleft=eTime-currtime;
193 stallcycles-=timeleft; //this time is no longer stalled...back it out
194 timeleft=0;//if the event is stalled, we already waited this time...
196 long totaltime=trans.getTime(eIndex);
197 totaltime-=timeleft;//subtract off time to the next event
198 abortedcycles+=totaltime;
201 //Aborts another thread...
202 public void reschedule(int currthread, long currtime, long backofftime) {
203 long time=currtime+backofftime;
204 backoffcycles+=backofftime;
205 currentevents[currthread].makeInvalid();
206 if (threadinfo[currthread].isStalled()) {
207 //remove from waiter list
208 threadinfo[currthread].setStall(false);
209 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
211 if (serAbort!=null) {
212 serAbort.addPoint(time, currthread);
214 Transaction trans=currentevents[currthread].getTransaction();
216 releaseObjects(trans, currthread, time);
217 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
218 currentevents[currthread]=nev;
222 //Aborts another thread...
223 public void stall(Event ev, long time, long delay) {
225 ev.setTime(time+delay);
230 private void releaseObjects(Transaction trans, int currthread, long time) {
232 for(int i=0;i<trans.numEvents();i++) {
233 ObjIndex object=trans.getObjIndex(i);
235 if (object!=null&&rdobjmap.containsKey(object)) {
236 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
238 if (object!=null&&wrobjmap.containsKey(object)) {
239 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
241 if (object!=null&&objtoinfo.containsKey(object)) {
242 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
243 if (oi.getOwner()==currentevents[currthread].getThread()) {
247 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
248 //requeue everyone who was waiting on us and start them back up
249 Event waiter=(Event)waitit.next();
251 waiter.setTime(time);
252 threadinfo[waiter.getThread()].setStall(false);
254 serWake.addPoint(time,waiter.getThread());
255 oi.setOwner(waiter.getThread());
264 /* Initializes things and returns number of transactions */
265 public int startinitial() {
267 for(int i=0;i<e.numThreads();i++) {
268 Transaction trans=e.getThread(i).getTransaction(0);
269 long time=trans.getTime(0);
270 Event ev=new Event(time, trans, 0, i, 0);
273 tcount+=e.getThread(i).numTransactions();
278 public void dosim() {
280 //start first transactions
281 int numtrans=startinitial();
282 System.out.println("Number of transactions="+numtrans);
284 while(!eq.isEmpty()) {
285 Event ev=(Event)eq.poll();
290 Transaction trans=ev.getTransaction();
292 int event=ev.getEvent();
293 long currtime=ev.getTime();
295 if (trans.started&&starttime==-1)
298 if (trans.numEvents()==(event+1)) {
299 tryCommit(ev, trans);
301 if ((tcount%100000)==0)
302 System.out.println("Attempted "+tcount+"transactions "+policy);
304 enqueueEvent(ev, trans);
307 shorttesttime=lasttime;
312 private ObjectInfo getmapping(ObjIndex obj) {
313 if (!objtoinfo.containsKey(obj))
314 objtoinfo.put(obj, new ObjectInfo(this));
315 return (ObjectInfo)objtoinfo.get(obj);
318 public void tryCommit(Event ev, Transaction trans) {
319 //ready to commit this one
320 long currtime=ev.getTime();
321 releaseObjects(trans, ev.getThread(), currtime);
323 if (ev.getThread()==lowid) {
328 if (lowid==e.numThreads())
333 //See if we have been flagged as aborted for the lazy case
334 boolean abort=aborted[ev.getThread()];
335 aborted[ev.getThread()]=false;
337 //if it is a transaction, increment commit count
338 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
340 if (serCommit!=null) {
341 serCommit.addPoint(ev.getTime(),ev.getThread());
344 //Reset our backoff counter
345 threadinfo[ev.getThread()].priority=0;
346 threadinfo[ev.getThread()].aborted=false;
347 backoff[ev.getThread()]=BACKOFFSTART;
348 retrycount[ev.getThread()]=0;
349 transferred[ev.getThread()]=0;
351 //abort the other threads
352 for(int i=0;i<trans.numEvents();i++) {
353 ObjIndex object=trans.getObjIndex(i);
354 int op=trans.getEvent(i);
355 //Mark commits to objects
356 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
358 System.out.println(op);
360 getmapping(object).recordCommit();
362 //Check for threads we might cause to abort
363 if (op==Transaction.WRITE) {
364 HashSet abortset=new HashSet();
365 if (rdobjmap.containsKey(object)) {
366 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
367 Integer threadid=(Integer)it.next();
368 abortset.add(threadid);
370 ObjectInfo oi=getmapping(object);
375 if (wrobjmap.containsKey(object)) {
376 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
377 Integer threadid=(Integer)it.next();
378 abortset.add(threadid);
379 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
380 //if this object hasn't already cause this thread to
381 //abort, then flag it as an abort cause
382 ObjectInfo oi=getmapping(object);
387 for(Iterator abit=abortset.iterator();abit.hasNext();) {
388 Integer threadid=(Integer)abit.next();
389 if (policy==LAZY||policy==LOCK) {
390 //just flag to abort when it trie to commit
391 aborted[threadid]=true;
393 serAbort.addPoint(currtime, threadid);
394 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
395 //abort it immediately
396 timewasted(threadid, currtime);
397 reschedule(threadid, currtime, 0);
405 timewasted(ev.getThread(), currtime);
408 //add next transaction event...could be us if we aborted
409 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
410 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
411 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
412 if (serStart!=null) {
413 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
414 serStart.addPoint(ev.getTime(),ev.getThread());
418 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
419 currentevents[ev.getThread()]=nev;
422 if (earliesttime==-1)
423 earliesttime=currtime;
427 public Set rdConflictSet(int thread, ObjIndex obj) {
428 if (!wrobjmap.containsKey(obj))
430 HashSet conflictset=new HashSet();
431 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
432 Integer threadid=(Integer)it.next();
433 if (threadid.intValue()!=thread)
434 conflictset.add(threadid);
436 if (conflictset.isEmpty())
443 int normalize(int tid) {
444 int newtid=tid-lowid;
446 newtid+=e.numThreads();
450 public Set wrConflictSet(int thread, ObjIndex obj) {
451 HashSet conflictset=new HashSet();
452 if (rdobjmap.containsKey(obj)) {
453 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
454 Integer threadid=(Integer)it.next();
455 if (threadid.intValue()!=thread)
456 conflictset.add(threadid);
459 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
460 Integer threadid=(Integer)it.next();
461 if (threadid.intValue()!=thread)
462 conflictset.add(threadid);
464 if (conflictset.isEmpty())
470 //Takes as parameter -- current transaction read event ev, conflicting
471 //set of threads, and the current time
472 //Returning false causes current transaction not continue to be scheduled
475 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
476 if (policy==RANDOM) {
477 boolean b=r.nextBoolean();
480 int thread=ev.getThread();
481 int dback=backoff[thread]*2;
483 backoff[thread]=dback;
484 stall(ev, time, r.nextInt(backoff[thread]));
487 //abort other transactions
488 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
489 Integer thread=(Integer)thit.next();
490 timewasted(thread, time);
491 reschedule(thread, time, 0);
496 } else if (policy==KARMA) {
498 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
499 Integer thread=(Integer)thit.next();
500 if (threadinfo[thread].priority>maxpriority)
501 maxpriority=threadinfo[thread].priority;
503 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
504 //stall for a little while
505 threadinfo[ev.getThread()].priority--;
506 retrycount[ev.getThread()]++;
507 int rtime=r.nextInt(3000);
508 stall(ev, time, rtime);
512 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
513 Integer thread=(Integer)thit.next();
514 int dback=backoff[thread]*2;
516 backoff[thread]=dback;
517 int atime=r.nextInt(backoff[thread]);
518 timewasted(thread, time);
519 reschedule(thread, time, atime);
524 } else if (policy==ERUPTION) {
526 //abort other transactions
527 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
528 Integer thread=(Integer)thit.next();
529 if (threadinfo[thread].priority>maxpriority)
530 maxpriority=threadinfo[thread].priority;
532 if (maxpriority>(threadinfo[ev.getThread()].priority+retrycount[ev.getThread()])) {
534 threadinfo[ev.getThread()].priority--;
535 //stall for a little while
536 int rtime=r.nextInt(3000);
537 stall(ev, time, rtime);
538 int ourpriority=threadinfo[ev.getThread()].priority;
539 ourpriority-=transferred[ev.getThread()];
540 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
541 Integer thread=(Integer)thit.next();
542 threadinfo[thread].priority+=ourpriority;
544 transferred[ev.getThread()]=threadinfo[ev.getThread()].priority;
545 retrycount[ev.getThread()]++;
550 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
551 Integer thread=(Integer)thit.next();
552 int dback=backoff[thread]*2;
554 backoff[thread]=dback;
555 int atime=r.nextInt(backoff[thread]);
556 timewasted(thread, time);
557 reschedule(thread, time, atime);
562 } else if (policy==POLITE) {
563 int retry=(++retrycount[ev.getThread()]);
565 retrycount[ev.getThread()]=0;
566 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
567 Integer thread=(Integer)thit.next();
568 timewasted(thread, time);
569 reschedule(thread, time, 0);
575 int stalltime=(1<<(retry-1))*12;
578 stall(ev, time, r.nextInt(stalltime));
581 } else if (policy==ATTACK) {
582 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
583 Integer thread=(Integer)thit.next();
584 timewasted(thread, time);
585 reschedule(thread, time, r.nextInt(backoff[thread.intValue()]));
586 int dback=backoff[thread.intValue()]*2;
588 backoff[thread.intValue()]=dback;
592 } else if (policy==SUICIDE) {
593 timewasted(ev.getThread(), time);
594 reschedule(ev.getThread(), time, r.nextInt(backoff[ev.getThread()]));
595 int dback=backoff[ev.getThread()]*2;
597 backoff[ev.getThread()]=dback;
600 } else if (policy==TIMESTAMP) {
603 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
604 Integer thread=(Integer)thit.next();
605 Event other=currentevents[thread.intValue()];
606 int eventnum=other.getEvent();
607 long otime=other.getTransaction().getTime(other.getEvent());
608 if (otime>opponenttime)
611 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
613 timewasted(ev.getThread(), time);
614 reschedule(ev.getThread(), time, 0);
619 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
620 Integer thread=(Integer)thit.next();
621 timewasted(thread, time);
622 reschedule(thread, time, 0);
627 } else if (policy==THREAD) {
630 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
631 Integer thread=(Integer)thit.next();
632 Event other=currentevents[thread.intValue()];
633 int eventnum=other.getEvent();
634 long otid=normalize(thread.intValue());
638 if (normalize(ev.getThread())>tid) {
640 timewasted(ev.getThread(), time);
641 reschedule(ev.getThread(), time, 0);
646 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
647 Integer thread=(Integer)thit.next();
648 timewasted(thread, time);
649 reschedule(thread, time, 0);
654 } else if (policy==ATTACKTIME) {
655 boolean timebased=false;
656 int tev=ev.getThread();
657 timebased|=threadinfo[tev].aborted;
658 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
659 Integer thread=(Integer)thit.next();
660 timebased|=threadinfo[thread.intValue()].aborted;
665 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
666 Integer thread=(Integer)thit.next();
667 Event other=currentevents[thread.intValue()];
668 int eventnum=other.getEvent();
669 long otime=other.getTransaction().getTime(other.getEvent());
670 if (otime>opponenttime)
673 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
675 timewasted(ev.getThread(), time);
676 reschedule(ev.getThread(), time, 0);
677 threadinfo[ev.getThread()].aborted=true;
682 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
683 Integer thread=(Integer)thit.next();
684 timewasted(thread, time);
685 reschedule(thread, time, 0);
686 threadinfo[thread.intValue()].aborted=true;
692 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
693 Integer thread=(Integer)thit.next();
694 timewasted(thread, time);
695 reschedule(thread, time, 0);
696 threadinfo[thread.intValue()].aborted=true;
701 } else if (policy==ATTACKTHREAD) {
702 boolean threadbased=false;
703 int tev=ev.getThread();
704 threadbased|=threadinfo[tev].aborted;
705 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
706 Integer thread=(Integer)thit.next();
707 threadbased|=threadinfo[thread.intValue()].aborted;
710 long opponentthr=1000;
712 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
713 Integer thread=(Integer)thit.next();
714 Event other=currentevents[thread.intValue()];
715 int eventnum=other.getEvent();
716 long othr=thread.intValue();
717 if (othr<opponentthr)
720 if (opponentthr<tev) {
722 timewasted(ev.getThread(), time);
723 reschedule(ev.getThread(), time, 0);
724 threadinfo[ev.getThread()].aborted=true;
729 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
730 Integer thread=(Integer)thit.next();
731 timewasted(thread, time);
732 reschedule(thread, time, 0);
733 threadinfo[thread.intValue()].aborted=true;
739 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
740 Integer thread=(Integer)thit.next();
741 timewasted(thread, time);
742 reschedule(thread, time, 0);
743 threadinfo[thread.intValue()].aborted=true;
754 //Handle current event (read, write, delay) in a transaction and
755 //enqueue the next one
757 public void enqueueEvent(Event ev, Transaction trans) {
758 //just enqueue next event
759 int event=ev.getEvent();
760 long currtime=ev.getTime();
761 ObjIndex object=trans.getObjIndex(event);
762 int operation=trans.getEvent(event);
764 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
765 ObjectInfo oi=getmapping(object);
768 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
769 //we're going to wait
770 boolean deadlocked=true;
772 for(int i=0;i<checkdepth;i++) {
773 //check if stalling would close the loop
774 if (toi.getOwner()==ev.getThread())
776 //see if cycle is broken
777 if (!threadinfo[toi.getOwner()].isStalled()) {
781 //follow one more in depth
782 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
786 //don't wait on stalled threads, we could deadlock
787 threadinfo[ev.getThread()].setStall(true);
788 threadinfo[ev.getThread()].setObjIndex(object);
790 serStall.addPoint(ev.getTime(),ev.getThread());
795 serAvoid.addPoint(ev.getTime(),ev.getThread());
800 oi.setOwner(ev.getThread());
805 //process the current event
806 if (operation==Transaction.READ) {
808 if (!rdobjmap.containsKey(object))
809 rdobjmap.put(object,new HashSet());
810 if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
812 if (countObjects()) {
813 threadinfo[ev.getThread()].priority++;
817 //do eager contention management
818 Set conflicts=rdConflictSet(ev.getThread(), object);
819 if (conflicts!=null) {
820 if (!handleConflicts(ev, conflicts, currtime)) {
821 ((Set)rdobjmap.get(object)).remove(new Integer(ev.getThread()));
826 } else if (operation==Transaction.WRITE) {
828 if (!wrobjmap.containsKey(object))
829 wrobjmap.put(object,new HashSet());
830 if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
831 if (countObjects()) {
832 threadinfo[ev.getThread()].priority++;
836 Set conflicts=wrConflictSet(ev.getThread(), object);
837 if (conflicts!=null) {
838 if (!handleConflicts(ev, conflicts, currtime)) {
839 ((Set)wrobjmap.get(object)).remove(new Integer(ev.getThread()));
844 } else if (operation==Transaction.BARRIER) {
846 if (barriercount==0) {
847 for(int i=0;i<e.numThreads();i++) {
848 //enqueue the next event
849 Event bev=currentevents[i];
850 int bevent=bev.getEvent();
851 long bcurrtime=bev.getTime();
852 Transaction btrans=bev.getTransaction();
853 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
854 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
855 currentevents[bev.getThread()]=nev;
858 barriercount=e.numThreads();
861 //wait until all threads in barrier
865 retrycount[ev.getThread()]=0;
866 transferred[ev.getThread()]=0;
867 //enqueue the next event
868 long deltatime=trans.getTime(event+1)-trans.getTime(event);
869 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
870 currentevents[ev.getThread()]=nev;
875 class Event implements Comparable {
884 public boolean isStalled() {
888 public void setStall() {
892 public void makeInvalid() {
896 public boolean isValid() {
900 public int getTransNum() {
904 public Transaction getTransaction() {
908 public int getEvent() {
912 public long getTime() {
916 public void setTime(long time) {
920 public int getThread() {
924 public Event(long time, Transaction t, int num, int threadid, int transnum) {
928 this.threadid=threadid;
929 this.transnum=transnum;
933 //break ties to allow commits to occur earliest
934 public int compareTo(Object o) {
936 long delta=time-e.time;
943 if (((getEvent()+1)==getTransaction().numEvents())&&
944 (e.getEvent()+1)!=e.getTransaction().numEvents())
946 if (((getEvent()+1)!=getTransaction().numEvents())&&
947 (e.getEvent()+1)==e.getTransaction().numEvents())