int readPercent;
Random r;
ThreadClass[] threads;
+ int splitobjects;
+ int splitaccesses;
+ int readPercentSecond;
public String toString() {
String s="";
return s;
}
-
public Executor(int numThreads, int numTrans, int deltaTrans, int numObjects, int numAccesses, int deltaAccesses, int readPercent, int delay, int deltaDelay, int nonTrans, int deltaNonTrans) {
+ this(numThreads, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans, 100, 100, 0);
+ }
+
+ public Executor(int numThreads, int numTrans, int deltaTrans, int numObjects, int numAccesses, int deltaAccesses, int readPercent, int delay, int deltaDelay, int nonTrans, int deltaNonTrans, int splitobjects, int splitaccesses, int readPercentSecond) {
this.numThreads=numThreads;
this.numTrans=numTrans;
this.deltaTrans=deltaTrans;
this.deltaDelay=deltaDelay;
this.nonTrans=nonTrans;
this.deltaNonTrans=deltaNonTrans;
+ this.splitobjects=splitobjects;
+ this.splitaccesses=splitaccesses;
+ this.readPercentSecond=readPercentSecond;
r=new Random();
threads=new ThreadClass[numThreads];
generateThreads();
int accesses=getRandom(numAccesses, deltaAccesses);
Transaction t=new Transaction(accesses);
int time=0;
+ int splitpoint=(numObjects*splitobjects)/100;
for(int i=0;i<(accesses-1); i++) {
- boolean isRead=r.nextInt(100)<readPercent;
- time+=getRandom(delay, deltaDelay);
- int object=r.nextInt(numObjects);
- t.setObject(i, object);
- t.setTime(i, time);
- if (isRead)
- t.setEvent(i, Transaction.READ);
- else
- t.setEvent(i, Transaction.WRITE);
+ if (r.nextInt(100)<splitaccesses) {
+ boolean isRead=r.nextInt(100)<readPercent;
+ time+=getRandom(delay, deltaDelay);
+ int object=r.nextInt(splitpoint);
+ t.setObject(i, object);
+ t.setTime(i, time);
+ if (isRead)
+ t.setEvent(i, Transaction.READ);
+ else
+ t.setEvent(i, Transaction.WRITE);
+ } else {
+ boolean isRead=r.nextInt(100)<readPercentSecond;
+ time+=getRandom(delay, deltaDelay);
+ int object=r.nextInt(numObjects-splitpoint)+splitpoint;
+ t.setObject(i, object);
+ t.setTime(i, time);
+ if (isRead)
+ t.setEvent(i, Transaction.READ);
+ else
+ t.setEvent(i, Transaction.WRITE);
+ }
}
t.setEvent(accesses-1, Transaction.DELAY);
t.setObject(accesses-1, Transaction.DELAY);
}
return t;
}
-}
\ No newline at end of file
+}
public class FlexScheduler {
Executor e;
+ int abortThreshold;
+ int abortRatio;
+ int deadlockcount;
+ int checkdepth;
+
+ public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth) {
+ this(e, policy);
+ this.abortThreshold=abortThreshold;
+ this.abortRatio=abortRatio;
+ this.checkdepth=checkdepth;
+ }
public FlexScheduler(Executor e, int policy) {
this.e=e;
}
}
+ public int getDeadLockCount() {
+ return deadlockcount;
+ }
+
//Where to start the backoff delay at
public static final int BACKOFFSTART=1;
public static final int POLITE=3;
public static final int KARMA=4;
public static final int LOCK=5;
+ public static final int LOCKCOMMIT=6;
PriorityQueue eq;
int policy;
}
public boolean isLock() {
- return policy==LOCK;
+ return policy==LOCK||policy==LOCKCOMMIT;
}
public int getAborts() {
return shorttesttime;
}
+ //Aborts another thread...
public void reschedule(int currthread, int time) {
currentevents[currthread].makeInvalid();
+ if (threadinfo[currthread].isStalled()) {
+ //remove from waiter list
+ threadinfo[currthread].setStall(false);
+ getmapping(threadinfo[currthread].getObject()).getWaiters().remove(currentevents[currthread]);
+ }
+
Transaction trans=currentevents[currthread].getTransaction();
+
+ releaseObjects(trans, currthread, time);
+ Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
+ currentevents[currthread]=nev;
+ eq.add(nev);
+ }
+
+ private void releaseObjects(Transaction trans, int currthread, int time) {
//remove all events
for(int i=0;i<trans.numEvents();i++) {
int object=trans.getObject(i);
- if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
- ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(currthread));
+ Integer obj=new Integer(object);
+ if (object!=-1&&rdobjmap.containsKey(obj)) {
+ ((Set)rdobjmap.get(obj)).remove(new Integer(currthread));
}
- if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
- ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(currthread));
+ if (object!=-1&&wrobjmap.containsKey(obj)) {
+ ((Set)wrobjmap.get(obj)).remove(new Integer(currthread));
+ }
+ if (object!=-1&&objtoinfo.containsKey(obj)) {
+ ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+ if (oi.getOwner()==currentevents[currthread].getThread()) {
+ oi.releaseOwner();
+
+ //wake up one waiter
+ for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
+ //requeue everyone who was waiting on us and start them back up
+ Event waiter=(Event)waitit.next();
+ waitit.remove();
+ waiter.setTime(time);
+ threadinfo[waiter.getThread()].setStall(false);
+ oi.setOwner(waiter.getThread());
+ eq.add(waiter);
+ break;
+ }
+ }
}
}
-
- Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
- currentevents[currthread]=nev;
- eq.add(nev);
}
public void startinitial() {
while(!eq.isEmpty()) {
Event ev=(Event)eq.poll();
- if (!ev.isValid())
+ if (!ev.isValid()) {
continue;
+ }
Transaction trans=ev.getTransaction();
int event=ev.getEvent();
shorttesttime=lasttime;
}
+ private ObjectInfo getmapping(int object) {
+ Integer obj=new Integer(object);
+ if (!objtoinfo.containsKey(obj))
+ objtoinfo.put(obj, new ObjectInfo(this));
+ return (ObjectInfo)objtoinfo.get(obj);
+ }
public void tryCommit(Event ev, Transaction trans) {
//ready to commit this one
int currtime=ev.getTime();
-
- //Remove everything we put in object sets
- for(int i=0;i<trans.numEvents();i++) {
- int object=trans.getObject(i);
- Integer obj=new Integer(object);
- if (object!=-1&&rdobjmap.containsKey(obj)) {
- ((Set)rdobjmap.get(obj)).remove(new Integer(ev.getThread()));
- }
- if (object!=-1&&wrobjmap.containsKey(obj)) {
- ((Set)wrobjmap.get(obj)).remove(new Integer(ev.getThread()));
- }
- if (object!=-1&&objtoinfo.containsKey(obj)) {
- ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
- if (oi.getOwner()==ev.getThread()) {
- oi.releaseOwner();
-
- //wake up one waiter
- for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
- //requeue everyone who was waiting on us and start them back up
- Event waiter=(Event)waitit.next();
- waitit.remove();
- waiter.setTime(currtime);
- threadinfo[waiter.getThread()].setStall(false);
- eq.add(waiter);
- break;
- }
-
- }
- }
- }
+ releaseObjects(trans, ev.getThread(), currtime);
//See if we have been flagged as aborted for the lazy case
boolean abort=aborted[ev.getThread()];
aborted[ev.getThread()]=false;
if (!abort) {
+ //if it is a transaction, increment comit count
if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
commitcount++;
}
//Reset our backoff counter
backoff[ev.getThread()]=BACKOFFSTART;
-
//abort the other threads
for(int i=0;i<trans.numEvents();i++) {
int object=trans.getObject(i);
int op=trans.getEvent(i);
+ //Mark commits to objects
+ if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
+ getmapping(object).recordCommit();
+ }
+ //Check for threads we might cause to abort
if (op==Transaction.WRITE) {
HashSet abortset=new HashSet();
- if (rdobjmap.containsKey(new Integer(object))) {
- for(Iterator it=((Set)rdobjmap.get(new Integer(object))).iterator();it.hasNext();) {
+ Integer obj=new Integer(object);
+ if (rdobjmap.containsKey(obj)) {
+ for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
Integer threadid=(Integer)it.next();
abortset.add(threadid);
+ if (isLock()) {
+ ObjectInfo oi=getmapping(object);
+ oi.recordAbort();
+ }
}
}
- if (wrobjmap.containsKey(new Integer(object))) {
- for(Iterator it=((Set)wrobjmap.get(new Integer(object))).iterator();it.hasNext();) {
+ if (wrobjmap.containsKey(obj)) {
+ for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
Integer threadid=(Integer)it.next();
abortset.add(threadid);
+ if (isLock()&&(!rdobjmap.containsKey(obj)||!((Set)rdobjmap.get(obj)).contains(threadid))) {
+ //if this object hasn't already cause this thread to
+ //abort, then flag it as an abort cause
+ ObjectInfo oi=getmapping(object);
+ oi.recordAbort();
+ }
}
}
for(Iterator abit=abortset.iterator();abit.hasNext();) {
Integer threadid=(Integer)abit.next();
if (policy==LAZY||policy==LOCK) {
+ //just flag to abort when it trie to commit
aborted[threadid]=true;
- } else if (policy==COMMIT) {
+ } else if (policy==COMMIT||policy==LOCKCOMMIT) {
+ //abort it immediately
reschedule(threadid, currtime);
abortcount++;
}
public Set wrConflictSet(int thread, int object) {
Integer obj=new Integer(object);
- if (!rdobjmap.containsKey(obj))
- return null;
+
HashSet conflictset=new HashSet();
- for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
- Integer threadid=(Integer)it.next();
- if (threadid.intValue()!=thread)
- conflictset.add(threadid);
+ if (rdobjmap.containsKey(obj)) {
+ for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
+ Integer threadid=(Integer)it.next();
+ if (threadid.intValue()!=thread)
+ conflictset.add(threadid);
+ }
}
for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
Integer threadid=(Integer)it.next();
int currtime=ev.getTime();
int object=trans.getObject(event);
int operation=trans.getEvent(event);
- //process the current event
- if (operation==Transaction.READ) {
- Integer obj=new Integer(object);
+ Integer obj=new Integer(object);
- //check for lock based approach
- if (isLock()) {
- if (!objtoinfo.containsKey(obj)) {
- objtoinfo.put(obj, new ObjectInfo(this));
- }
- ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+ if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
+ ObjectInfo oi=getmapping(object);
+
+ if (oi.isRisky()) {
if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
//we're going to wait
- if (!threadinfo[oi.getOwner()].isStalled()) {
+ boolean deadlocked=true;
+ ObjectInfo toi=oi;
+ for(int i=0;i<checkdepth;i++) {
+ //check if stalling would close the loop
+ if (toi.getOwner()==ev.getThread())
+ break;
+ //see if cycle is broken
+ if (!threadinfo[toi.getOwner()].isStalled()) {
+ deadlocked=false;
+ break;
+ }
+ //follow one more in depth
+ toi=getmapping(threadinfo[toi.getOwner()].getObject());
+ }
+
+ if (!deadlocked) {
//don't wait on stalled threads, we could deadlock
threadinfo[ev.getThread()].setStall(true);
+ threadinfo[ev.getThread()].setObject(object);
oi.addWaiter(ev);
return;
- }
+ } else
+ deadlockcount++;
} else {
//we have object
oi.setOwner(ev.getThread());
}
}
-
+ }
+
+ //process the current event
+ if (operation==Transaction.READ) {
//record read event
if (!rdobjmap.containsKey(obj))
rdobjmap.put(obj,new HashSet());
if (isEager()) {
//do eager contention management
Set conflicts=rdConflictSet(ev.getThread(), object);
- if (conflicts!=null)
+ if (conflicts!=null) {
if (!handleConflicts(ev, conflicts, currtime))
return;
- }
- } else if (operation==Transaction.WRITE) {
- Integer obj=new Integer(object);
-
- //grab lock
- if (isLock()) {
- if (!objtoinfo.containsKey(obj)) {
- objtoinfo.put(obj, new ObjectInfo(this));
- }
- ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
- if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
- //we're going to wait
- if (!threadinfo[oi.getOwner()].isStalled()) {
- //don't wait on stalled threads, we could deadlock
- threadinfo[ev.getThread()].setStall(true);
- oi.addWaiter(ev);
- return;
- }
- } else {
- //we have object
- oi.setOwner(ev.getThread());
}
}
-
+ } else if (operation==Transaction.WRITE) {
//record write event
if (!wrobjmap.containsKey(obj))
wrobjmap.put(obj,new HashSet());
((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
-
if (isEager()) {
Set conflicts=wrConflictSet(ev.getThread(), object);
- if (conflicts!=null)
+ if (conflicts!=null) {
if (!handleConflicts(ev, conflicts, currtime))
return;
+ }
}
}
public class TransSim {
public static void main(String[] args) {
- int numThreads=16;
- int numTrans=20;
+ int numThreads=1;
+ int numTrans=4;
int deltaTrans=0;
- int numObjects=4000;
- int numAccesses=20;
- int deltaAccesses=5;
- int readPercent=20;
+ int numObjects=400;
+ int numAccesses=10;
+ int deltaAccesses=0;
+ int readPercent=0;
//time for operation
int delay=20;
int deltaDelay=4;
//time between transactions
int nonTrans=20;
int deltaNonTrans=4;
+ //split objects
+ int splitobjects=100;//10 percent of objects special
+ int splitaccesses=100;//40 percent of accesses to special objects
+ int readPercentSecond=30;//20 percent of accesses are reads
+ int abortThreshold=0; //need 4 aborts to declare risky
+ int abortRatio=0;//need 40% aborts vs commits to declare risky
+ int deadlockdepth=10;
long tlazy=0, tcommit=0, tattack=0, tpolite=0, tkarma=0;
- for(int i=0;i<100;i++) {
- Executor e=new Executor(numThreads, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans);
+
+ for(int i=1;i<100;i++) {
+ System.out.println("i="+i);
+ Executor e=new Executor(i, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans, splitobjects, splitaccesses, readPercentSecond);
System.out.println(e.maxTime());
FlexScheduler ls=new FlexScheduler(e, FlexScheduler.LAZY);
ls.dosim();
tlazy+=ls.getTime();
//Lock object accesses
- ls=new FlexScheduler(e, FlexScheduler.LOCK);
+ ls=new FlexScheduler(e, FlexScheduler.LOCK, abortThreshold, abortRatio, deadlockdepth);
ls.dosim();
+ System.out.println("Deadlock count="+ls.getDeadLockCount());
System.out.println("Lock Abort="+ls.getTime());
System.out.println("Aborts="+ls.getAborts()+" Commit="+ls.getCommits());
if (ls.getTime()<besttime)
besttime=ls.getTime();
tcommit+=ls.getTime();
+ //Lock Commit object accesses
+ ls=new FlexScheduler(e, FlexScheduler.LOCKCOMMIT, abortThreshold, abortRatio, deadlockdepth);
+ ls.dosim();
+ System.out.println("Deadlock count="+ls.getDeadLockCount());
+ System.out.println("LockCommit Abort="+ls.getTime());
+ System.out.println("Aborts="+ls.getAborts()+" Commit="+ls.getCommits());
+ if (ls.getTime()<besttime)
+ besttime=ls.getTime();
+ tcommit+=ls.getTime();
+
//Kill others at commit
ls=new FlexScheduler(e, FlexScheduler.COMMIT);
ls.dosim();