simulator updates
[IRC.git] / Robust / TransSim / FlexScheduler.java
1 import java.util.*;
2
3 public class FlexScheduler extends Thread {
4   Executor e;
5   int abortThreshold;
6   int abortRatio;
7   int deadlockcount;
8   int checkdepth;
9   int barriercount;
10
11   public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
12     this(e, policy, p);
13     this.abortThreshold=abortThreshold;
14     this.abortRatio=abortRatio;
15     this.checkdepth=checkdepth;
16   }
17   
18   public void run() {
19     dosim();
20   }
21
22   public FlexScheduler(Executor e, int policy, Plot p) {
23     this.e=e;
24     barriercount=e.numThreads();
25     aborted=new boolean[e.numThreads()];
26     currentevents=new Event[e.numThreads()];
27     rdobjmap=new Hashtable();
28     wrobjmap=new Hashtable();
29     this.policy=policy;
30     r=new Random(100);
31     eq=new PriorityQueue();
32     backoff=new int[e.numThreads()];
33     retrycount=new int[e.numThreads()];
34     objtoinfo=new Hashtable();
35     threadinfo=new ThreadInfo[e.numThreads()];
36     blocked=new boolean[e.numThreads()];
37
38     for(int i=0;i<e.numThreads();i++) {
39       backoff[i]=BACKOFFSTART;
40       threadinfo[i]=new ThreadInfo(this);
41     }
42     this.p=p;
43     if (p!=null) {
44       serCommit=p.getSeries("COMMIT");
45       serStart=p.getSeries("START");
46       serAbort=p.getSeries("ABORT");
47       serStall=p.getSeries("STALL");
48       serWake=p.getSeries("WAKE");
49       serAvoid=p.getSeries("AVOIDDEADLOCK");
50     }
51   }
52
53   Plot p;
54   Series serCommit;
55   Series serStart;
56   Series serAbort;
57   Series serStall;
58   Series serAvoid;
59   Series serWake;
60
61   public int getDeadLockCount() {
62     return deadlockcount;
63   }
64
65   //Where to start the backoff delay at
66   public static final int BACKOFFSTART=1;
67
68   //Commit options
69   public static final int LAZY=0;
70   public static final int COMMIT=1;
71   public static final int ATTACK=2;
72   public static final int SUICIDE=3;
73   public static final int TIMESTAMP=4;
74   public static final int LOCK=5;
75   public static final int LOCKCOMMIT=6;
76   public static final int RANDOM=7;
77   public static final int KARMA=8;
78   public static final int POLITE=9;
79   public static final int ERUPTION=10;
80
81   PriorityQueue eq;
82   int policy;
83   boolean[] aborted;
84   long shorttesttime;
85   long starttime=-1;
86   Hashtable rdobjmap;
87   Hashtable wrobjmap;
88   int abortcount;
89   int commitcount;
90   Event[] currentevents;
91   Random r;
92   int[] backoff;
93   int[] retrycount;
94   Hashtable objtoinfo;
95   ThreadInfo[] threadinfo;
96   
97   boolean[] blocked;
98
99   public boolean isEager() {
100     return policy==ATTACK||policy==SUICIDE||policy==TIMESTAMP||policy==RANDOM||policy==KARMA||policy==POLITE;
101   }
102
103   public boolean countObjects() {
104     return policy==KARMA||policy==ERUPTION;
105   }
106
107   public boolean isLock() {
108     return policy==LOCK||policy==LOCKCOMMIT;
109   }
110
111   public int getAborts() {
112     return abortcount;
113   }
114
115   public int getCommits() {
116     return commitcount;
117   }
118
119   public long getTime() {
120     return shorttesttime-starttime;
121   }
122
123   //Aborts another thread...
124   public void reschedule(int currthread, long time) {
125     currentevents[currthread].makeInvalid();
126     if (threadinfo[currthread].isStalled()) {
127       //remove from waiter list
128       threadinfo[currthread].setStall(false);
129       getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
130     }
131     if (serAbort!=null) {
132       serAbort.addPoint(time, currthread);
133     }
134     Transaction trans=currentevents[currthread].getTransaction();
135     
136     releaseObjects(trans, currthread, time);
137     Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
138     currentevents[currthread]=nev;
139     eq.add(nev);
140   }
141
142   //Aborts another thread...
143   public void stall(Event ev, long time) {
144     ev.setTime(time);
145     eq.add(ev);
146   }
147
148
149   private void releaseObjects(Transaction trans, int currthread, long time) {
150     //remove all events
151     for(int i=0;i<trans.numEvents();i++) {
152       ObjIndex object=trans.getObjIndex(i);
153
154       if (object!=null&&rdobjmap.containsKey(object)) {
155         ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
156       }
157       if (object!=null&&wrobjmap.containsKey(object)) {
158         ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
159       }
160       if (object!=null&&objtoinfo.containsKey(object)) {
161         ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
162         if (oi.getOwner()==currentevents[currthread].getThread()) {
163           oi.releaseOwner();
164           
165           //wake up one waiter
166           for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
167             //requeue everyone who was waiting on us and start them back up
168             Event waiter=(Event)waitit.next();
169             waitit.remove();
170             waiter.setTime(time);
171             threadinfo[waiter.getThread()].setStall(false);
172             if (serWake!=null)
173               serWake.addPoint(time,waiter.getThread());
174             oi.setOwner(waiter.getThread());
175             eq.add(waiter);
176             break;
177           }
178         }
179       }
180     }
181   }
182
183   /* Initializes things and returns number of transactions */
184   public int startinitial() {
185     int tcount=0;
186     for(int i=0;i<e.numThreads();i++) {
187       Transaction trans=e.getThread(i).getTransaction(0);
188       long time=trans.getTime(0);
189       Event ev=new Event(time, trans, 0, i, 0);
190       currentevents[i]=ev;
191       eq.add(ev);
192       tcount+=e.getThread(i).numTransactions();
193     }
194     return tcount;
195   }
196
197   public void dosim() {
198     long lasttime=0;
199     //start first transactions
200     int numtrans=startinitial();
201     System.out.println("Number of transactions="+numtrans);
202     int tcount=0;
203     while(!eq.isEmpty()) {
204       Event ev=(Event)eq.poll();
205       if (!ev.isValid()) {
206         continue;
207       }
208
209       Transaction trans=ev.getTransaction();
210
211       int event=ev.getEvent();
212       long currtime=ev.getTime();
213       lasttime=currtime;
214       if (trans.started&&starttime==-1)
215         starttime=currtime;
216
217       if (trans.numEvents()==(event+1)) {
218         tryCommit(ev, trans);
219         tcount++;
220         if ((tcount%100000)==0)
221           System.out.println("Attempted "+tcount+"transactions");
222       } else {
223         enqueueEvent(ev, trans);
224       }
225     }
226     shorttesttime=lasttime;
227     if (p!=null)
228       p.close();
229   }
230
231   private ObjectInfo getmapping(ObjIndex obj) {
232     if (!objtoinfo.containsKey(obj))
233       objtoinfo.put(obj, new ObjectInfo(this));
234     return (ObjectInfo)objtoinfo.get(obj);
235   }
236
237   public void tryCommit(Event ev, Transaction trans) {
238     //ready to commit this one
239     long currtime=ev.getTime();
240     releaseObjects(trans, ev.getThread(), currtime);
241     
242     //See if we have been flagged as aborted for the lazy case
243     boolean abort=aborted[ev.getThread()];
244     aborted[ev.getThread()]=false;
245     if (!abort) {
246       //if it is a transaction, increment comit count
247       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
248         threadinfo[ev.getThread()].priority=0;
249         commitcount++;
250         if (serCommit!=null) {
251           serCommit.addPoint(ev.getTime(),ev.getThread());
252         }
253       }
254       //Reset our backoff counter
255       backoff[ev.getThread()]=BACKOFFSTART;
256       retrycount[ev.getThread()]=0;
257
258       //abort the other threads
259       for(int i=0;i<trans.numEvents();i++) {
260         ObjIndex object=trans.getObjIndex(i);
261         int op=trans.getEvent(i);
262         //Mark commits to objects
263         if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
264           if (object==null) {
265             System.out.println(op);
266           }
267           getmapping(object).recordCommit();
268         }
269         //Check for threads we might cause to abort
270         if (op==Transaction.WRITE) {
271           HashSet abortset=new HashSet();
272           if (rdobjmap.containsKey(object)) {
273             for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
274               Integer threadid=(Integer)it.next();
275               abortset.add(threadid);
276               if (isLock()) {
277                 ObjectInfo oi=getmapping(object);
278                 oi.recordAbort();
279               }
280             }
281           }
282           if (wrobjmap.containsKey(object)) {
283             for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
284               Integer threadid=(Integer)it.next();
285               abortset.add(threadid);
286               if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
287                 //if this object hasn't already cause this thread to
288                 //abort, then flag it as an abort cause
289                 ObjectInfo oi=getmapping(object);
290                 oi.recordAbort();
291               }
292             }
293           }
294           for(Iterator abit=abortset.iterator();abit.hasNext();) {
295             Integer threadid=(Integer)abit.next();
296             if (policy==LAZY||policy==LOCK) {
297               //just flag to abort when it trie to commit
298               aborted[threadid]=true;
299               if (serAbort!=null)
300                 serAbort.addPoint(currtime, threadid);
301             } else if (policy==COMMIT||policy==LOCKCOMMIT) {
302               //abort it immediately
303               reschedule(threadid, currtime);
304               abortcount++;
305             }
306           }
307         }
308       }
309     } else {
310       abortcount++;
311     }
312     
313     //add next transaction event...could be us if we aborted
314     int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
315     if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
316       Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
317       if (serStart!=null) {
318         if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
319           serStart.addPoint(ev.getTime(),ev.getThread());
320         }
321       }
322
323       Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
324       currentevents[ev.getThread()]=nev;
325       eq.add(nev);
326     }
327   }
328
329   public Set rdConflictSet(int thread, ObjIndex obj) {
330     if (!wrobjmap.containsKey(obj))
331       return null;
332     HashSet conflictset=new HashSet();
333     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
334       Integer threadid=(Integer)it.next();
335       if (threadid.intValue()!=thread)
336         conflictset.add(threadid);
337     }
338     if (conflictset.isEmpty())
339       return null;
340     else
341       return conflictset;
342   }
343
344   public Set wrConflictSet(int thread, ObjIndex obj) {
345     HashSet conflictset=new HashSet();
346     if (rdobjmap.containsKey(obj)) {
347       for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
348         Integer threadid=(Integer)it.next();
349         if (threadid.intValue()!=thread)
350           conflictset.add(threadid);
351       }
352     }
353     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
354       Integer threadid=(Integer)it.next();
355       if (threadid.intValue()!=thread)
356         conflictset.add(threadid);
357     }
358     if (conflictset.isEmpty())
359       return null;
360     else
361       return conflictset;
362   }
363
364   //Takes as parameter -- current transaction read event ev, conflicting
365   //set of threads, and the current time
366   //Returning false causes current transaction not continue to be scheduled
367
368   public boolean handleConflicts(Event ev, Set threadstokill, long time) {
369     if (policy==RANDOM) {
370       boolean b=r.nextBoolean();
371       if (b) {
372         //delay
373         stall(ev, time+r.nextInt(200));
374         return false;
375       } else {
376         //abort other transactions
377         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
378           Integer thread=(Integer)thit.next();
379           reschedule(thread, time);
380           abortcount++;
381         }
382         return true;
383       }
384     } else if (policy==KARMA) {
385       int maxpriority=0;
386       //abort other transactions
387       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
388         Integer thread=(Integer)thit.next();
389         if (threadinfo[thread].priority>maxpriority)
390           maxpriority=threadinfo[thread].priority;
391       }
392       if (maxpriority>threadinfo[ev.getThread()].priority) {
393         //we lose
394         threadinfo[ev.getThread()].priority++;
395         //stall for a little while
396         stall(ev, time+20);
397         return false;
398       } else {
399         //we win
400         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
401           Integer thread=(Integer)thit.next();
402           reschedule(thread, time);
403           abortcount++;
404         }
405         return true;
406       }
407     } else if (policy==ERUPTION) {
408       int maxpriority=0;
409       //abort other transactions
410       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
411         Integer thread=(Integer)thit.next();
412         if (threadinfo[thread].priority>maxpriority)
413           maxpriority=threadinfo[thread].priority;
414       }
415       if (maxpriority>threadinfo[ev.getThread()].priority) {
416         //we lose
417         //stall for a little while
418         stall(ev, time);
419         int ourpriority=threadinfo[ev.getThread()].priority;
420         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
421           Integer thread=(Integer)thit.next();
422           threadinfo[thread].priority+=ourpriority;;
423         }
424         return false;
425       } else {
426         //we win
427         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
428           Integer thread=(Integer)thit.next();
429           reschedule(thread, time);
430           abortcount++;
431         }
432         return true;
433       }
434     } else if (policy==POLITE) {
435       int retry=retrycount[ev.getThread()]++;
436       if (retry==22) {
437         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
438           Integer thread=(Integer)thit.next();
439           reschedule(thread, time);
440           int dback=backoff[thread.intValue()]*2;
441           if (dback>0)
442             backoff[thread.intValue()]=dback;
443           abortcount++;
444         }
445         return true;
446       } else {
447         //otherwise stall
448         stall(ev, time+r.nextInt((1<<retry)*12));
449         return false;
450       }
451     } else if (policy==ATTACK) {
452       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
453         Integer thread=(Integer)thit.next();
454         reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
455         int dback=backoff[thread.intValue()]*2;
456         if (dback>0)
457           backoff[thread.intValue()]=dback;
458         abortcount++;
459       }
460       return true;
461     } else if (policy==SUICIDE) {
462       reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
463       int dback=backoff[ev.getThread()]*2;
464       if (dback>0)
465         backoff[ev.getThread()]=dback;
466       abortcount++;
467       return false;
468     } else if (policy==TIMESTAMP) {
469       long opponenttime=0;
470
471       for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
472         Integer thread=(Integer)thit.next();
473         Event other=currentevents[thread.intValue()];
474         int eventnum=other.getEvent();
475         long otime=other.getTransaction().getTime(other.getEvent());
476         if (otime>opponenttime)
477           opponenttime=otime;
478       }
479       if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
480         //kill ourself
481         reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
482         abortcount++;
483         return false;
484       } else {
485         //kill the opponents
486         for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
487           Integer thread=(Integer)thit.next();
488           reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
489           abortcount++;
490         }
491         return true;    
492       }
493     }
494
495     //Not eager
496     return true;
497   }
498
499   //Handle current event (read, write, delay) in a transaction and
500   //enqueue the next one
501
502   public void enqueueEvent(Event ev, Transaction trans) {
503     //just enqueue next event
504     int event=ev.getEvent();
505     long currtime=ev.getTime();
506     ObjIndex object=trans.getObjIndex(event);
507     int operation=trans.getEvent(event);
508
509     if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
510       ObjectInfo oi=getmapping(object);
511       
512       if (oi.isRisky()) {
513         if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
514           //we're going to wait
515           boolean deadlocked=true;
516           ObjectInfo toi=oi;
517           for(int i=0;i<checkdepth;i++) {
518             //check if stalling would close the loop
519             if (toi.getOwner()==ev.getThread())
520               break;
521             //see if cycle is broken
522             if (!threadinfo[toi.getOwner()].isStalled()) {
523               deadlocked=false;
524               break;
525             }
526             //follow one more in depth
527             toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
528           }
529           
530           if (!deadlocked) {
531             //don't wait on stalled threads, we could deadlock
532             threadinfo[ev.getThread()].setStall(true);
533             threadinfo[ev.getThread()].setObjIndex(object);
534             if (serStall!=null)
535               serStall.addPoint(ev.getTime(),ev.getThread());
536             oi.addWaiter(ev);
537             return;
538           } else {
539             if (serAvoid!=null)
540               serAvoid.addPoint(ev.getTime(),ev.getThread());
541             deadlockcount++;
542           }
543         } else {
544           //we have object
545           oi.setOwner(ev.getThread());
546         }
547       }
548     }
549     
550     //process the current event
551     if (operation==Transaction.READ) {
552       //record read event
553       if (!rdobjmap.containsKey(object))
554         rdobjmap.put(object,new HashSet());
555       if (((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()))) {
556         //added new object
557         if (countObjects()) {
558           threadinfo[ev.getThread()].priority++;
559         }
560       }
561       if (isEager()) {
562         //do eager contention management
563         Set conflicts=rdConflictSet(ev.getThread(), object);
564         if (conflicts!=null) {
565           if (!handleConflicts(ev, conflicts, currtime))
566             return;
567         }
568       }
569     } else if (operation==Transaction.WRITE) {
570       //record write event
571       if (!wrobjmap.containsKey(object))
572         wrobjmap.put(object,new HashSet());
573       if (((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()))) {
574         if (countObjects()) {
575           threadinfo[ev.getThread()].priority++;
576         }
577       }
578       if (isEager()) {
579         Set conflicts=wrConflictSet(ev.getThread(), object);
580         if (conflicts!=null) {
581           if (!handleConflicts(ev, conflicts, currtime))
582             return;
583         }
584       }
585     } else if (operation==Transaction.BARRIER) {
586       barriercount--;
587       if (barriercount==0) {
588         for(int i=0;i<e.numThreads();i++) {
589           //enqueue the next event
590           Event bev=currentevents[i];
591           int bevent=bev.getEvent();
592           long bcurrtime=bev.getTime();
593           Transaction btrans=bev.getTransaction();
594           long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
595           Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
596           currentevents[bev.getThread()]=nev;
597           eq.add(nev);
598         }
599         barriercount=e.numThreads();
600       } else {
601         //Do nothing
602         //wait until all threads in barrier
603       }
604       return;
605     }
606     
607     //enqueue the next event
608     long deltatime=trans.getTime(event+1)-trans.getTime(event);
609     Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
610     currentevents[ev.getThread()]=nev;
611     eq.add(nev);
612   }
613
614   
615   class Event implements Comparable {
616     boolean valid;
617     long time;
618     int num;
619     Transaction t;
620     int threadid;
621     int transnum;
622
623     public void makeInvalid() {
624       valid=false;
625     }
626
627     public boolean isValid() {
628       return valid;
629     }
630
631     public int getTransNum() {
632       return transnum;
633     }
634
635     public Transaction getTransaction() {
636       return t;
637     }
638
639     public int getEvent() {
640       return num;
641     }
642
643     public long getTime() {
644       return time;
645     }
646     
647     public void setTime(long time) {
648       this.time=time;
649     }
650
651     public int getThread() {
652       return threadid;
653     }
654
655     public Event(long time, Transaction t, int num, int threadid, int transnum) {
656       this.time=time;
657       this.t=t;
658       this.num=num;
659       this.threadid=threadid;
660       this.transnum=transnum;
661       valid=true;
662     }
663
664     //break ties to allow commits to occur earliest
665     public int compareTo(Object o) {
666       Event e=(Event)o;
667       long delta=time-e.time;
668       if (delta!=0) {
669         if (delta>0)
670           return 1;
671         else
672           return -1;
673       }
674       if (((getEvent()+1)==getTransaction().numEvents())&&
675           (e.getEvent()+1)!=e.getTransaction().numEvents())
676         return -1;
677       if (((getEvent()+1)!=getTransaction().numEvents())&&
678           (e.getEvent()+1)==e.getTransaction().numEvents())
679         return 1;
680       return 0;
681     }
682   }
683 }