3 public class FlexScheduler extends Thread {
11 public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth, Plot p) {
13 this.abortThreshold=abortThreshold;
14 this.abortRatio=abortRatio;
15 this.checkdepth=checkdepth;
22 public FlexScheduler(Executor e, int policy, Plot p) {
24 barriercount=e.numThreads();
25 aborted=new boolean[e.numThreads()];
26 currentevents=new Event[e.numThreads()];
27 rdobjmap=new Hashtable();
28 wrobjmap=new Hashtable();
31 eq=new PriorityQueue();
32 backoff=new int[e.numThreads()];
33 objtoinfo=new Hashtable();
34 threadinfo=new ThreadInfo[e.numThreads()];
35 blocked=new boolean[e.numThreads()];
37 for(int i=0;i<e.numThreads();i++) {
38 backoff[i]=BACKOFFSTART;
39 threadinfo[i]=new ThreadInfo(this);
43 serCommit=p.getSeries("COMMIT");
44 serStart=p.getSeries("START");
45 serAbort=p.getSeries("ABORT");
46 serStall=p.getSeries("STALL");
47 serWake=p.getSeries("WAKE");
48 serAvoid=p.getSeries("AVOIDDEADLOCK");
60 public int getDeadLockCount() {
64 //Where to start the backoff delay at
65 public static final int BACKOFFSTART=1;
68 public static final int LAZY=0;
69 public static final int COMMIT=1;
70 public static final int ATTACK=2;
71 public static final int POLITE=3;
72 public static final int KARMA=4;
73 public static final int LOCK=5;
74 public static final int LOCKCOMMIT=6;
85 Event[] currentevents;
89 ThreadInfo[] threadinfo;
93 public boolean isEager() {
94 return policy==ATTACK||policy==POLITE||policy==KARMA;
97 public boolean isLock() {
98 return policy==LOCK||policy==LOCKCOMMIT;
101 public int getAborts() {
105 public int getCommits() {
109 public long getTime() {
110 return shorttesttime-starttime;
113 //Aborts another thread...
114 public void reschedule(int currthread, long time) {
115 currentevents[currthread].makeInvalid();
116 if (threadinfo[currthread].isStalled()) {
117 //remove from waiter list
118 threadinfo[currthread].setStall(false);
119 getmapping(threadinfo[currthread].getObjIndex()).getWaiters().remove(currentevents[currthread]);
121 if (serAbort!=null) {
122 serAbort.addPoint(time, currthread);
124 Transaction trans=currentevents[currthread].getTransaction();
126 releaseObjects(trans, currthread, time);
127 Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
128 currentevents[currthread]=nev;
133 private void releaseObjects(Transaction trans, int currthread, long time) {
135 for(int i=0;i<trans.numEvents();i++) {
136 ObjIndex object=trans.getObjIndex(i);
138 if (object!=null&&rdobjmap.containsKey(object)) {
139 ((Set)rdobjmap.get(object)).remove(new Integer(currthread));
141 if (object!=null&&wrobjmap.containsKey(object)) {
142 ((Set)wrobjmap.get(object)).remove(new Integer(currthread));
144 if (object!=null&&objtoinfo.containsKey(object)) {
145 ObjectInfo oi=(ObjectInfo)objtoinfo.get(object);
146 if (oi.getOwner()==currentevents[currthread].getThread()) {
150 for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
151 //requeue everyone who was waiting on us and start them back up
152 Event waiter=(Event)waitit.next();
154 waiter.setTime(time);
155 threadinfo[waiter.getThread()].setStall(false);
157 serWake.addPoint(time,waiter.getThread());
158 oi.setOwner(waiter.getThread());
167 /* Initializes things and returns number of transactions */
168 public int startinitial() {
170 for(int i=0;i<e.numThreads();i++) {
171 Transaction trans=e.getThread(i).getTransaction(0);
172 long time=trans.getTime(0);
173 Event ev=new Event(time, trans, 0, i, 0);
176 tcount+=e.getThread(i).numTransactions();
181 public void dosim() {
183 //start first transactions
184 int numtrans=startinitial();
185 System.out.println("Number of transactions="+numtrans);
187 while(!eq.isEmpty()) {
188 Event ev=(Event)eq.poll();
193 Transaction trans=ev.getTransaction();
195 int event=ev.getEvent();
196 long currtime=ev.getTime();
198 if (trans.started&&starttime==-1)
201 if (trans.numEvents()==(event+1)) {
202 tryCommit(ev, trans);
204 if ((tcount%100000)==0)
205 System.out.println("Attempted "+tcount+"transactions");
207 enqueueEvent(ev, trans);
210 shorttesttime=lasttime;
215 private ObjectInfo getmapping(ObjIndex obj) {
216 if (!objtoinfo.containsKey(obj))
217 objtoinfo.put(obj, new ObjectInfo(this));
218 return (ObjectInfo)objtoinfo.get(obj);
221 public void tryCommit(Event ev, Transaction trans) {
222 //ready to commit this one
223 long currtime=ev.getTime();
224 releaseObjects(trans, ev.getThread(), currtime);
226 //See if we have been flagged as aborted for the lazy case
227 boolean abort=aborted[ev.getThread()];
228 aborted[ev.getThread()]=false;
230 //if it is a transaction, increment comit count
231 if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
233 if (serCommit!=null) {
234 serCommit.addPoint(ev.getTime(),ev.getThread());
237 //Reset our backoff counter
238 backoff[ev.getThread()]=BACKOFFSTART;
240 //abort the other threads
241 for(int i=0;i<trans.numEvents();i++) {
242 ObjIndex object=trans.getObjIndex(i);
243 int op=trans.getEvent(i);
244 //Mark commits to objects
245 if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
247 System.out.println(op);
249 getmapping(object).recordCommit();
251 //Check for threads we might cause to abort
252 if (op==Transaction.WRITE) {
253 HashSet abortset=new HashSet();
254 if (rdobjmap.containsKey(object)) {
255 for(Iterator it=((Set)rdobjmap.get(object)).iterator();it.hasNext();) {
256 Integer threadid=(Integer)it.next();
257 abortset.add(threadid);
259 ObjectInfo oi=getmapping(object);
264 if (wrobjmap.containsKey(object)) {
265 for(Iterator it=((Set)wrobjmap.get(object)).iterator();it.hasNext();) {
266 Integer threadid=(Integer)it.next();
267 abortset.add(threadid);
268 if (isLock()&&(!rdobjmap.containsKey(object)||!((Set)rdobjmap.get(object)).contains(threadid))) {
269 //if this object hasn't already cause this thread to
270 //abort, then flag it as an abort cause
271 ObjectInfo oi=getmapping(object);
276 for(Iterator abit=abortset.iterator();abit.hasNext();) {
277 Integer threadid=(Integer)abit.next();
278 if (policy==LAZY||policy==LOCK) {
279 //just flag to abort when it trie to commit
280 aborted[threadid]=true;
282 serAbort.addPoint(currtime, threadid);
283 } else if (policy==COMMIT||policy==LOCKCOMMIT) {
284 //abort it immediately
285 reschedule(threadid, currtime);
295 //add next transaction event...could be us if we aborted
296 int nexttransnum=abort?ev.getTransNum():ev.getTransNum()+1;
297 if (nexttransnum<e.getThread(ev.getThread()).numTransactions()) {
298 Transaction nexttrans=e.getThread(ev.getThread()).getTransaction(nexttransnum);
299 if (serStart!=null) {
300 if (nexttrans.numEvents()>1||nexttrans.getEvent(0)!=Transaction.DELAY) {
301 serStart.addPoint(ev.getTime(),ev.getThread());
305 Event nev=new Event(currtime+nexttrans.getTime(0), nexttrans, 0, ev.getThread(), nexttransnum);
306 currentevents[ev.getThread()]=nev;
311 public Set rdConflictSet(int thread, ObjIndex obj) {
312 if (!wrobjmap.containsKey(obj))
314 HashSet conflictset=new HashSet();
315 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
316 Integer threadid=(Integer)it.next();
317 if (threadid.intValue()!=thread)
318 conflictset.add(threadid);
320 if (conflictset.isEmpty())
326 public Set wrConflictSet(int thread, ObjIndex obj) {
327 HashSet conflictset=new HashSet();
328 if (rdobjmap.containsKey(obj)) {
329 for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
330 Integer threadid=(Integer)it.next();
331 if (threadid.intValue()!=thread)
332 conflictset.add(threadid);
335 for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
336 Integer threadid=(Integer)it.next();
337 if (threadid.intValue()!=thread)
338 conflictset.add(threadid);
340 if (conflictset.isEmpty())
346 //Takes as parameter -- current transaction read event ev, conflicting
347 //set of threads, and the current time
348 //Returning false causes current transaction not continue to be scheduled
350 public boolean handleConflicts(Event ev, Set threadstokill, long time) {
351 if (policy==ATTACK) {
352 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
353 Integer thread=(Integer)thit.next();
354 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
355 int dback=backoff[thread.intValue()]*2;
357 backoff[thread.intValue()]=dback;
361 } else if (policy==POLITE) {
362 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
363 int dback=backoff[ev.getThread()]*2;
365 backoff[ev.getThread()]=dback;
368 } else if (policy==KARMA) {
371 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
372 Integer thread=(Integer)thit.next();
373 Event other=currentevents[thread.intValue()];
374 int eventnum=other.getEvent();
375 long otime=other.getTransaction().getTime(other.getEvent());
376 if (otime>opponenttime)
379 if (opponenttime>ev.getTransaction().getTime(ev.getEvent())) {
381 reschedule(ev.getThread(), time+r.nextInt(backoff[ev.getThread()]));
386 for(Iterator thit=threadstokill.iterator();thit.hasNext();) {
387 Integer thread=(Integer)thit.next();
388 reschedule(thread, time+r.nextInt(backoff[thread.intValue()]));
399 //Handle current event (read, write, delay) in a transaction and
400 //enqueue the next one
402 public void enqueueEvent(Event ev, Transaction trans) {
403 //just enqueue next event
404 int event=ev.getEvent();
405 long currtime=ev.getTime();
406 ObjIndex object=trans.getObjIndex(event);
407 int operation=trans.getEvent(event);
409 if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
410 ObjectInfo oi=getmapping(object);
413 if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
414 //we're going to wait
415 boolean deadlocked=true;
417 for(int i=0;i<checkdepth;i++) {
418 //check if stalling would close the loop
419 if (toi.getOwner()==ev.getThread())
421 //see if cycle is broken
422 if (!threadinfo[toi.getOwner()].isStalled()) {
426 //follow one more in depth
427 toi=getmapping(threadinfo[toi.getOwner()].getObjIndex());
431 //don't wait on stalled threads, we could deadlock
432 threadinfo[ev.getThread()].setStall(true);
433 threadinfo[ev.getThread()].setObjIndex(object);
435 serStall.addPoint(ev.getTime(),ev.getThread());
440 serAvoid.addPoint(ev.getTime(),ev.getThread());
445 oi.setOwner(ev.getThread());
450 //process the current event
451 if (operation==Transaction.READ) {
453 if (!rdobjmap.containsKey(object))
454 rdobjmap.put(object,new HashSet());
455 ((Set)rdobjmap.get(object)).add(new Integer(ev.getThread()));
457 //do eager contention management
458 Set conflicts=rdConflictSet(ev.getThread(), object);
459 if (conflicts!=null) {
460 if (!handleConflicts(ev, conflicts, currtime))
464 } else if (operation==Transaction.WRITE) {
466 if (!wrobjmap.containsKey(object))
467 wrobjmap.put(object,new HashSet());
468 ((Set)wrobjmap.get(object)).add(new Integer(ev.getThread()));
470 Set conflicts=wrConflictSet(ev.getThread(), object);
471 if (conflicts!=null) {
472 if (!handleConflicts(ev, conflicts, currtime))
476 } else if (operation==Transaction.BARRIER) {
478 if (barriercount==0) {
479 for(int i=0;i<e.numThreads();i++) {
480 //enqueue the next event
481 Event bev=currentevents[i];
482 int bevent=bev.getEvent();
483 long bcurrtime=bev.getTime();
484 Transaction btrans=bev.getTransaction();
485 long deltatime=btrans.getTime(bevent+1)-btrans.getTime(bevent);
486 Event nev=new Event(deltatime+currtime, btrans, bevent+1, bev.getThread(), bev.getTransNum());
487 currentevents[bev.getThread()]=nev;
490 barriercount=e.numThreads();
493 //wait until all threads in barrier
498 //enqueue the next event
499 long deltatime=trans.getTime(event+1)-trans.getTime(event);
500 Event nev=new Event(deltatime+currtime, trans, event+1, ev.getThread(), ev.getTransNum());
501 currentevents[ev.getThread()]=nev;
506 class Event implements Comparable {
514 public void makeInvalid() {
518 public boolean isValid() {
522 public int getTransNum() {
526 public Transaction getTransaction() {
530 public int getEvent() {
534 public long getTime() {
538 public void setTime(long time) {
542 public int getThread() {
546 public Event(long time, Transaction t, int num, int threadid, int transnum) {
550 this.threadid=threadid;
551 this.transnum=transnum;
555 //break ties to allow commits to occur earliest
556 public int compareTo(Object o) {
558 long delta=time-e.time;
565 if (((getEvent()+1)==getTransaction().numEvents())&&
566 (e.getEvent()+1)!=e.getTransaction().numEvents())
568 if (((getEvent()+1)!=getTransaction().numEvents())&&
569 (e.getEvent()+1)==e.getTransaction().numEvents())