simulator now models two locking modes:
authorbdemsky <bdemsky>
Tue, 15 Sep 2009 19:40:21 +0000 (19:40 +0000)
committerbdemsky <bdemsky>
Tue, 15 Sep 2009 19:40:21 +0000 (19:40 +0000)
lock -- lazy conflict detection
lockcommit -- kill threads as soon as committing thread conflicts with them

Robust/TransSim/Executor.java
Robust/TransSim/FlexScheduler.java
Robust/TransSim/ObjectInfo.java
Robust/TransSim/ThreadInfo.java
Robust/TransSim/TransSim.java

index 0707eed262466b5a471086451cfa50ee526a31f4..1294e6504c4142ddf482e708ee8b3601e6b6cc07 100644 (file)
@@ -14,6 +14,9 @@ public class Executor {
   int readPercent;
   Random r;
   ThreadClass[] threads;
+  int splitobjects;
+  int splitaccesses;
+  int readPercentSecond;
 
   public String toString() {
     String s="";
@@ -23,8 +26,11 @@ public class Executor {
     return s;
   }
 
-
   public Executor(int numThreads, int numTrans, int deltaTrans, int numObjects, int numAccesses, int deltaAccesses, int readPercent, int delay, int deltaDelay, int nonTrans, int deltaNonTrans) {
+    this(numThreads, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans, 100, 100, 0);
+  }
+
+  public Executor(int numThreads, int numTrans, int deltaTrans, int numObjects, int numAccesses, int deltaAccesses, int readPercent, int delay, int deltaDelay, int nonTrans, int deltaNonTrans, int splitobjects, int splitaccesses, int readPercentSecond) {
     this.numThreads=numThreads;
     this.numTrans=numTrans;
     this.deltaTrans=deltaTrans;
@@ -36,6 +42,9 @@ public class Executor {
     this.deltaDelay=deltaDelay;
     this.nonTrans=nonTrans;
     this.deltaNonTrans=deltaNonTrans;
+    this.splitobjects=splitobjects;
+    this.splitaccesses=splitaccesses;
+    this.readPercentSecond=readPercentSecond;
     r=new Random();
     threads=new ThreadClass[numThreads];
     generateThreads();
@@ -98,16 +107,29 @@ public class Executor {
     int accesses=getRandom(numAccesses, deltaAccesses);
     Transaction t=new Transaction(accesses);
     int time=0;
+    int splitpoint=(numObjects*splitobjects)/100;
     for(int i=0;i<(accesses-1); i++) {
-      boolean isRead=r.nextInt(100)<readPercent;
-      time+=getRandom(delay, deltaDelay);
-      int object=r.nextInt(numObjects);
-      t.setObject(i, object);
-      t.setTime(i, time);
-      if (isRead)
-       t.setEvent(i, Transaction.READ);
-      else
-       t.setEvent(i, Transaction.WRITE);
+      if (r.nextInt(100)<splitaccesses) {
+       boolean isRead=r.nextInt(100)<readPercent;
+       time+=getRandom(delay, deltaDelay);
+       int object=r.nextInt(splitpoint);
+       t.setObject(i, object);
+       t.setTime(i, time);
+       if (isRead)
+         t.setEvent(i, Transaction.READ);
+       else
+         t.setEvent(i, Transaction.WRITE);
+      } else {
+       boolean isRead=r.nextInt(100)<readPercentSecond;
+       time+=getRandom(delay, deltaDelay);
+       int object=r.nextInt(numObjects-splitpoint)+splitpoint;
+       t.setObject(i, object);
+       t.setTime(i, time);
+       if (isRead)
+         t.setEvent(i, Transaction.READ);
+       else
+         t.setEvent(i, Transaction.WRITE);
+      }
     }
     t.setEvent(accesses-1, Transaction.DELAY);
     t.setObject(accesses-1, Transaction.DELAY);
@@ -131,4 +153,4 @@ public class Executor {
     }
     return t;
   }
-}
\ No newline at end of file
+}
index 3aa06b75b1285f0ff39a831b585d1eb1ea27f71c..ccec667e74c4a8b79c2c887c472cfe86e70e2608 100644 (file)
@@ -2,6 +2,17 @@ import java.util.*;
 
 public class FlexScheduler {
   Executor e;
+  int abortThreshold;
+  int abortRatio;
+  int deadlockcount;
+  int checkdepth;
+
+  public FlexScheduler(Executor e, int policy, int abortThreshold, int abortRatio, int checkdepth) {
+    this(e, policy);
+    this.abortThreshold=abortThreshold;
+    this.abortRatio=abortRatio;
+    this.checkdepth=checkdepth;
+  }
   
   public FlexScheduler(Executor e, int policy) {
     this.e=e;
@@ -23,6 +34,10 @@ public class FlexScheduler {
     }
   }
 
+  public int getDeadLockCount() {
+    return deadlockcount;
+  }
+
   //Where to start the backoff delay at
   public static final int BACKOFFSTART=1;
 
@@ -33,6 +48,7 @@ public class FlexScheduler {
   public static final int POLITE=3;
   public static final int KARMA=4;
   public static final int LOCK=5;
+  public static final int LOCKCOMMIT=6;
 
   PriorityQueue eq;
   int policy;
@@ -55,7 +71,7 @@ public class FlexScheduler {
   }
 
   public boolean isLock() {
-    return policy==LOCK;
+    return policy==LOCK||policy==LOCKCOMMIT;
   }
 
   public int getAborts() {
@@ -70,24 +86,54 @@ public class FlexScheduler {
     return shorttesttime;
   }
 
+  //Aborts another thread...
   public void reschedule(int currthread, int time) {
     currentevents[currthread].makeInvalid();
+    if (threadinfo[currthread].isStalled()) {
+      //remove from waiter list
+      threadinfo[currthread].setStall(false);
+      getmapping(threadinfo[currthread].getObject()).getWaiters().remove(currentevents[currthread]);
+    }
+    
     Transaction trans=currentevents[currthread].getTransaction();
+    
+    releaseObjects(trans, currthread, time);
+    Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
+    currentevents[currthread]=nev;
+    eq.add(nev);
+  }
 
+
+  private void releaseObjects(Transaction trans, int currthread, int time) {
     //remove all events
     for(int i=0;i<trans.numEvents();i++) {
       int object=trans.getObject(i);
-      if (object!=-1&&rdobjmap.containsKey(new Integer(object))) {
-       ((Set)rdobjmap.get(new Integer(object))).remove(new Integer(currthread));
+      Integer obj=new Integer(object);
+      if (object!=-1&&rdobjmap.containsKey(obj)) {
+       ((Set)rdobjmap.get(obj)).remove(new Integer(currthread));
       }
-      if (object!=-1&&wrobjmap.containsKey(new Integer(object))) {
-       ((Set)wrobjmap.get(new Integer(object))).remove(new Integer(currthread));
+      if (object!=-1&&wrobjmap.containsKey(obj)) {
+       ((Set)wrobjmap.get(obj)).remove(new Integer(currthread));
+      }
+      if (object!=-1&&objtoinfo.containsKey(obj)) {
+       ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+       if (oi.getOwner()==currentevents[currthread].getThread()) {
+         oi.releaseOwner();
+         
+         //wake up one waiter
+         for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
+           //requeue everyone who was waiting on us and start them back up
+           Event waiter=(Event)waitit.next();
+           waitit.remove();
+           waiter.setTime(time);
+           threadinfo[waiter.getThread()].setStall(false);
+           oi.setOwner(waiter.getThread());
+           eq.add(waiter);
+           break;
+         }
+       }
       }
     }
-    
-    Event nev=new Event(time+trans.getTime(0), trans, 0, currthread, currentevents[currthread].getTransNum());
-    currentevents[currthread]=nev;
-    eq.add(nev);
   }
 
   public void startinitial() {
@@ -107,8 +153,9 @@ public class FlexScheduler {
 
     while(!eq.isEmpty()) {
       Event ev=(Event)eq.poll();
-      if (!ev.isValid())
+      if (!ev.isValid()) {
        continue;
+      }
 
       Transaction trans=ev.getTransaction();
       int event=ev.getEvent();
@@ -124,75 +171,70 @@ public class FlexScheduler {
     shorttesttime=lasttime;
   }
 
+  private ObjectInfo getmapping(int object) {
+    Integer obj=new Integer(object);
+    if (!objtoinfo.containsKey(obj))
+      objtoinfo.put(obj, new ObjectInfo(this));
+    return (ObjectInfo)objtoinfo.get(obj);
+  }
 
   public void tryCommit(Event ev, Transaction trans) {
     //ready to commit this one
     int currtime=ev.getTime();
-
-    //Remove everything we put in object sets
-    for(int i=0;i<trans.numEvents();i++) {
-      int object=trans.getObject(i);
-      Integer obj=new Integer(object);
-      if (object!=-1&&rdobjmap.containsKey(obj)) {
-       ((Set)rdobjmap.get(obj)).remove(new Integer(ev.getThread()));
-      }
-      if (object!=-1&&wrobjmap.containsKey(obj)) {
-       ((Set)wrobjmap.get(obj)).remove(new Integer(ev.getThread()));
-      }
-      if (object!=-1&&objtoinfo.containsKey(obj)) {
-       ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
-       if (oi.getOwner()==ev.getThread()) {
-         oi.releaseOwner();
-      
-         //wake up one waiter
-         for(Iterator waitit=oi.getWaiters().iterator();waitit.hasNext();) {
-           //requeue everyone who was waiting on us and start them back up
-           Event waiter=(Event)waitit.next();
-           waitit.remove();
-           waiter.setTime(currtime);
-           threadinfo[waiter.getThread()].setStall(false);
-           eq.add(waiter);
-           break;
-         }
-
-       }
-      }
-    }
+    releaseObjects(trans, ev.getThread(), currtime);
     
     //See if we have been flagged as aborted for the lazy case
     boolean abort=aborted[ev.getThread()];
     aborted[ev.getThread()]=false;
     if (!abort) {
+      //if it is a transaction, increment comit count
       if (trans.numEvents()>1||trans.getEvent(0)!=Transaction.DELAY) {
        commitcount++;
       }
       //Reset our backoff counter
       backoff[ev.getThread()]=BACKOFFSTART;
 
-
       //abort the other threads
       for(int i=0;i<trans.numEvents();i++) {
        int object=trans.getObject(i);
        int op=trans.getEvent(i);
+       //Mark commits to objects
+       if (isLock()&&(op==Transaction.WRITE||op==Transaction.READ)) {
+         getmapping(object).recordCommit();
+       }
+       //Check for threads we might cause to abort
        if (op==Transaction.WRITE) {
          HashSet abortset=new HashSet();
-         if (rdobjmap.containsKey(new Integer(object))) {
-           for(Iterator it=((Set)rdobjmap.get(new Integer(object))).iterator();it.hasNext();) {
+         Integer obj=new Integer(object);
+         if (rdobjmap.containsKey(obj)) {
+           for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
              Integer threadid=(Integer)it.next();
              abortset.add(threadid);
+             if (isLock()) {
+               ObjectInfo oi=getmapping(object);
+               oi.recordAbort();
+             }
            }
          }
-         if (wrobjmap.containsKey(new Integer(object))) {
-           for(Iterator it=((Set)wrobjmap.get(new Integer(object))).iterator();it.hasNext();) {
+         if (wrobjmap.containsKey(obj)) {
+           for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
              Integer threadid=(Integer)it.next();
              abortset.add(threadid);
+             if (isLock()&&(!rdobjmap.containsKey(obj)||!((Set)rdobjmap.get(obj)).contains(threadid))) {
+               //if this object hasn't already cause this thread to
+               //abort, then flag it as an abort cause
+               ObjectInfo oi=getmapping(object);
+               oi.recordAbort();
+             }
            }
          }
          for(Iterator abit=abortset.iterator();abit.hasNext();) {
            Integer threadid=(Integer)abit.next();
            if (policy==LAZY||policy==LOCK) {
+             //just flag to abort when it trie to commit
              aborted[threadid]=true;
-           } else if (policy==COMMIT) {
+           } else if (policy==COMMIT||policy==LOCKCOMMIT) {
+             //abort it immediately
              reschedule(threadid, currtime);
              abortcount++;
            }
@@ -231,13 +273,14 @@ public class FlexScheduler {
 
   public Set wrConflictSet(int thread, int object) {
     Integer obj=new Integer(object);
-    if (!rdobjmap.containsKey(obj))
-      return null;
+
     HashSet conflictset=new HashSet();
-    for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
-      Integer threadid=(Integer)it.next();
-      if (threadid.intValue()!=thread)
-       conflictset.add(threadid);
+    if (rdobjmap.containsKey(obj)) {
+      for(Iterator it=((Set)rdobjmap.get(obj)).iterator();it.hasNext();) {
+       Integer threadid=(Integer)it.next();
+       if (threadid.intValue()!=thread)
+         conflictset.add(threadid);
+      }
     }
     for(Iterator it=((Set)wrobjmap.get(obj)).iterator();it.hasNext();) {
       Integer threadid=(Integer)it.next();
@@ -308,30 +351,46 @@ public class FlexScheduler {
     int currtime=ev.getTime();
     int object=trans.getObject(event);
     int operation=trans.getEvent(event);
-    //process the current event
-    if (operation==Transaction.READ) {
-      Integer obj=new Integer(object);
+    Integer obj=new Integer(object);
 
-      //check for lock based approach
-      if (isLock()) {
-       if (!objtoinfo.containsKey(obj)) {
-         objtoinfo.put(obj, new ObjectInfo(this));
-       }
-       ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
+    if ((operation==Transaction.READ||operation==Transaction.WRITE)&&isLock()) {
+      ObjectInfo oi=getmapping(object);
+      
+      if (oi.isRisky()) {
        if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
          //we're going to wait
-         if (!threadinfo[oi.getOwner()].isStalled()) {
+         boolean deadlocked=true;
+         ObjectInfo toi=oi;
+         for(int i=0;i<checkdepth;i++) {
+           //check if stalling would close the loop
+           if (toi.getOwner()==ev.getThread())
+             break;
+           //see if cycle is broken
+           if (!threadinfo[toi.getOwner()].isStalled()) {
+             deadlocked=false;
+             break;
+           }
+           //follow one more in depth
+           toi=getmapping(threadinfo[toi.getOwner()].getObject());
+         }
+         
+         if (!deadlocked) {
            //don't wait on stalled threads, we could deadlock
            threadinfo[ev.getThread()].setStall(true);
+           threadinfo[ev.getThread()].setObject(object);
            oi.addWaiter(ev);
            return;
-         }
+         } else
+           deadlockcount++;
        } else {
          //we have object
          oi.setOwner(ev.getThread());
        }
       }
-
+    }
+    
+    //process the current event
+    if (operation==Transaction.READ) {
       //record read event
       if (!rdobjmap.containsKey(obj))
        rdobjmap.put(obj,new HashSet());
@@ -339,43 +398,22 @@ public class FlexScheduler {
       if (isEager()) {
        //do eager contention management
        Set conflicts=rdConflictSet(ev.getThread(), object);
-       if (conflicts!=null)
+       if (conflicts!=null) {
          if (!handleConflicts(ev, conflicts, currtime))
            return;
-      }
-    } else if (operation==Transaction.WRITE) {
-      Integer obj=new Integer(object);
-
-      //grab lock
-      if (isLock()) {
-       if (!objtoinfo.containsKey(obj)) {
-         objtoinfo.put(obj, new ObjectInfo(this));
-       }
-       ObjectInfo oi=(ObjectInfo)objtoinfo.get(obj);
-       if (oi.isOwned()&&oi.getOwner()!=ev.getThread()) {
-         //we're going to wait
-         if (!threadinfo[oi.getOwner()].isStalled()) {
-           //don't wait on stalled threads, we could deadlock
-           threadinfo[ev.getThread()].setStall(true);
-           oi.addWaiter(ev);
-           return;
-         }
-       } else {
-         //we have object
-         oi.setOwner(ev.getThread());
        }
       }
-
+    } else if (operation==Transaction.WRITE) {
       //record write event
       if (!wrobjmap.containsKey(obj))
        wrobjmap.put(obj,new HashSet());
       ((Set)wrobjmap.get(obj)).add(new Integer(ev.getThread()));
-
       if (isEager()) {
        Set conflicts=wrConflictSet(ev.getThread(), object);
-       if (conflicts!=null)
+       if (conflicts!=null) {
          if (!handleConflicts(ev, conflicts, currtime))
            return;
+       }
       }
     }
     
index 0357dbe0bd24cfce3f1195bcbcf2b1c020ba2b08..aa689127f8a6529e70bca85ecbecb58f5d1aac3d 100644 (file)
@@ -3,11 +3,35 @@ import java.util.*;
 public class ObjectInfo {
   FlexScheduler fs;
   Set waiters;
+  int aborts;
+  int commits;
+  boolean riskyflag;
 
   public ObjectInfo(FlexScheduler fs) {
     this.fs=fs;
     threadowner=-1;
     this.waiters=new HashSet();
+    if (fs.isLock()&&fs.abortThreshold==0)
+      riskyflag=true;
+  }
+
+  public boolean isRisky() {
+    return riskyflag;
+  }
+
+  public void setRisky(boolean risky) {
+    this.riskyflag=risky;
+  }
+
+  public void recordAbort() {
+    aborts++;
+    if (fs.isLock()&&(aborts>fs.abortThreshold)&&
+       aborts>(commits*fs.abortRatio/100))
+      setRisky(true);
+  }
+
+  public void recordCommit() {
+    commits++;
   }
 
   public void addWaiter(FlexScheduler.Event ev) {
index 92ba65da518624d1e049e7aee7e8f49b292bddab..c8133798357ad0d7119abc112209d1ac57752d8a 100644 (file)
@@ -6,6 +6,15 @@ public class ThreadInfo {
     this.fs=fs;
   }
   boolean stalled;
+  int oid;
+
+  public void setObject(int oid) {
+    this.oid=oid;
+  }
+
+  public int getObject() {
+    return oid;
+  }
 
   public boolean isStalled() {
     return stalled;
index 4d439913d22384e23f1a7aaf741e1142d5d9e9dd..6f2a62813fa4c6e5748b4ab8c41df04dcd4afa64 100644 (file)
@@ -1,22 +1,31 @@
 public class TransSim {
   public static void main(String[] args) {
-    int numThreads=16;
-    int numTrans=20;
+    int numThreads=1;
+    int numTrans=4;
     int deltaTrans=0;
-    int numObjects=4000;
-    int numAccesses=20;
-    int deltaAccesses=5;
-    int readPercent=20;
+    int numObjects=400;
+    int numAccesses=10;
+    int deltaAccesses=0;
+    int readPercent=0;
     //time for operation
     int delay=20;
     int deltaDelay=4;
     //time between transactions
     int nonTrans=20;
     int deltaNonTrans=4;
+    //split objects
+    int splitobjects=100;//10 percent of objects special
+    int splitaccesses=100;//40 percent of accesses to special objects
+    int readPercentSecond=30;//20 percent of accesses are reads
+    int abortThreshold=0; //need 4 aborts to declare risky
+    int abortRatio=0;//need 40% aborts vs commits to declare risky
+    int deadlockdepth=10;
 
     long tlazy=0, tcommit=0, tattack=0, tpolite=0, tkarma=0;
-    for(int i=0;i<100;i++) {
-      Executor e=new Executor(numThreads, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans);
+
+    for(int i=1;i<100;i++) {
+      System.out.println("i="+i);
+      Executor e=new Executor(i, numTrans, deltaTrans, numObjects, numAccesses, deltaAccesses, readPercent, delay, deltaDelay, nonTrans, deltaNonTrans, splitobjects, splitaccesses, readPercentSecond);
       System.out.println(e.maxTime());
       FlexScheduler ls=new FlexScheduler(e, FlexScheduler.LAZY);
       ls.dosim();
@@ -26,14 +35,25 @@ public class TransSim {
       tlazy+=ls.getTime();
 
       //Lock object accesses
-      ls=new FlexScheduler(e, FlexScheduler.LOCK);
+      ls=new FlexScheduler(e, FlexScheduler.LOCK, abortThreshold, abortRatio, deadlockdepth);
       ls.dosim();
+      System.out.println("Deadlock count="+ls.getDeadLockCount());
       System.out.println("Lock Abort="+ls.getTime());
       System.out.println("Aborts="+ls.getAborts()+" Commit="+ls.getCommits());
       if (ls.getTime()<besttime)
        besttime=ls.getTime();
       tcommit+=ls.getTime();
 
+      //Lock Commit object accesses
+      ls=new FlexScheduler(e, FlexScheduler.LOCKCOMMIT, abortThreshold, abortRatio, deadlockdepth);
+      ls.dosim();
+      System.out.println("Deadlock count="+ls.getDeadLockCount());
+      System.out.println("LockCommit Abort="+ls.getTime());
+      System.out.println("Aborts="+ls.getAborts()+" Commit="+ls.getCommits());
+      if (ls.getTime()<besttime)
+       besttime=ls.getTime();
+      tcommit+=ls.getTime();
+
       //Kill others at commit
       ls=new FlexScheduler(e, FlexScheduler.COMMIT);
       ls.dosim();