import java.util.Random;
import java.util.Queue;
import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.Collection;
+import java.util.Collections;
+
/**
* IoTTable data structure. Provides client inferface.
private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10; //number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
- private long liveslotcount = 0;
+ public long liveslotcount = 0; // TODO: MAKE PRIVATE
private int chance;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
static final int REJECTED_THRESHOLD = 5;
- private int resizethreshold;
+ public int resizethreshold; // TODO: MAKE PRIVATE
private long lastliveslotseqn; //smallest sequence number with a live entry
private Random random = new Random();
+ private long lastCommitSeenSeqNum = 0; // sequence number of the last commit that was seen
private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
private List<Commit> commitList = null; // List of all the most recent live commits
+ private List<Long> commitListSeqNum = null; // List of all the most recent live commits trans sequence numbers
+
private Set<Abort> abortSet = null; // Set of the live aborts
- private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ public Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV TODO: Make Private
private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
- private List<Transaction> uncommittedTransactionsList = null; //
+ public Map<Long, Transaction> uncommittedTransactionsMap = null; // TODO: make private
private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+ private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
// private Set<Abort> arbitratorTable = null; // Table of arbitrators
+ private Map<Long, Commit> newCommitMap = null; // Map of all the new commits
+
public Table(String baseurl, String password, long _localmachineid) {
cloud = new CloudComm(this, baseurl, password);
lastliveslotseqn = 1;
- pendingTransQueue = new LinkedList<PendingTransaction>();
- commitList = new LinkedList<Commit>();
- abortSet = new HashSet<Abort>();
- commitedTable = new HashMap<IoTString, KeyValue>();
- speculativeTable = new HashMap<IoTString, KeyValue>();
- uncommittedTransactionsList = new LinkedList<Transaction>();
- arbitratorTable = new HashMap<IoTString, Long>();
+ setupDataStructs();
}
public Table(CloudComm _cloud, long _localmachineid) {
sequencenumber = 0;
cloud = _cloud;
+ setupDataStructs();
+ }
+
+ private void setupDataStructs() {
pendingTransQueue = new LinkedList<PendingTransaction>();
commitList = new LinkedList<Commit>();
abortSet = new HashSet<Abort>();
commitedTable = new HashMap<IoTString, KeyValue>();
speculativeTable = new HashMap<IoTString, KeyValue>();
- uncommittedTransactionsList = new LinkedList<Transaction>();
+ uncommittedTransactionsMap = new HashMap<Long, Transaction>();
arbitratorTable = new HashMap<IoTString, Long>();
+ newKeyTable = new HashMap<IoTString, NewKey>();
+ newCommitMap = new HashMap<Long, Commit> ();
}
public void rebuild() {
validateandupdate(newslots, true);
}
+ // TODO: delete method
+ public void printSlots() {
+ long o = buffer.getOldestSeqNum();
+ long n = buffer.getNewestSeqNum();
+ int[] types = new int[10];
+
+ int num = 0;
+
+ int livec = 0;
+ int deadc = 0;
+ for (long i = o; i < (n + 1); i++) {
+ Slot s = buffer.getSlot(i);
+
+ Vector<Entry> entries = s.getEntries();
+
+ for (Entry e : entries) {
+ if (e.isLive()) {
+ int type = e.getType();
+ types[type] = types[type] + 1;
+ num++;
+ livec++;
+ } else {
+ deadc++;
+ }
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ System.out.println(i + " " + types[i]);
+ }
+ System.out.println("Live count: " + livec);
+ System.out.println("Dead count: " + deadc);
+ System.out.println("Old: " + o);
+ System.out.println("New: " + n);
+ System.out.println("Size: " + buffer.size());
+ System.out.println("Commits Map: " + commitedTable.size());
+ System.out.println("Commits List: " + commitList.size());
+ }
public IoTString getCommitted(IoTString key) {
KeyValue kv = commitedTable.get(key);
}
}
-
public void initTable() {
cloud.setSalt();//Set the salt
Slot s = new Slot(this, 1, localmachineid);
}
public String toString() {
-
-
String retString = " Committed Table: \n";
retString += "---------------------------\n";
retString += commitedTable.toString();
return retString;
}
-
-
-
-
-
public void startTransaction() {
// Create a new transaction, invalidates any old pending transactions.
pendingTransBuild = new PendingTransaction();
public void addKV(IoTString key, IoTString value) {
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
// Make sure new key value pair matches the current arbitrator
if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
- // TODO: Maybe not throw and error
- throw new Error("Not all Key Values match");
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match.");
}
-
-
KeyValue kv = new KeyValue(key, value);
pendingTransBuild.addKV(kv);
}
}
public void update() {
+
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, false);
- if (uncommittedTransactionsList.size() > 0) {
- List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
-
- for (Transaction ut : uncommittedTransactionsList) {
- KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
- long arb = arbitratorTable.get(kv.getKey());
-
- if (arb == localmachineid) {
- uncommittedTransArb.add(ut);
- }
- }
-
+ if (uncommittedTransactionsMap.keySet().size() > 0) {
+ boolean doEnd = false;
boolean needResize = false;
- while (uncommittedTransArb.size() > 0) {
+ while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
boolean resize = needResize;
needResize = false;
s.addEntry(status);
}
- if (! rejectedmessagelist.isEmpty()) {
- /* TODO: We should avoid generating a rejected message entry if
- * there is already a sufficient entry in the queue (e.g.,
- * equalsto value of true and same sequence number). */
-
- long old_seqn = rejectedmessagelist.firstElement();
- if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
- long new_seqn = rejectedmessagelist.lastElement();
- RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
- s.addEntry(rm);
- } else {
- long prev_seqn = -1;
- int i = 0;
- /* Go through list of missing messages */
- for (; i < rejectedmessagelist.size(); i++) {
- long curr_seqn = rejectedmessagelist.get(i);
- Slot s_msg = buffer.getSlot(curr_seqn);
- if (s_msg != null)
- break;
- prev_seqn = curr_seqn;
- }
- /* Generate rejected message entry for missing messages */
- if (prev_seqn != -1) {
- RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
- s.addEntry(rm);
- }
- /* Generate rejected message entries for present messages */
- for (; i < rejectedmessagelist.size(); i++) {
- long curr_seqn = rejectedmessagelist.get(i);
- Slot s_msg = buffer.getSlot(curr_seqn);
- long machineid = s_msg.getMachineID();
- RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
- s.addEntry(rm);
- }
- }
- }
-
- long newestseqnum = buffer.getNewestSeqNum();
- long oldestseqnum = buffer.getOldestSeqNum();
- if (lastliveslotseqn < oldestseqnum) {
- lastliveslotseqn = oldestseqnum;
- }
+ doRejectedMessages(s);
- long seqn = lastliveslotseqn;
- boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
- long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
-
- boolean tryAgain = false;
-
- // Mandatory Rescue
- for (; seqn < threshold; seqn++) {
- Slot prevslot = buffer.getSlot(seqn);
- //Push slot number forward
- if (! seenliveslot)
- lastliveslotseqn = seqn;
-
- if (! prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for (Entry liveentry : liveentries) {
- if (s.hasSpace(liveentry)) {
- s.addEntry(liveentry);
- } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
- if (!resize) {
- System.out.print("B"); //?
- tryAgain = true;
- needResize = true;
- }
- }
- }
- }
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
- if (tryAgain) {
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ needResize = true;
continue;
}
- // Arbitrate
- Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
- for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
- Transaction ut = i.next();
-
- KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
- // Check if this machine arbitrates for this transaction
- if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
- continue;
- }
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
- Entry newEntry = null;
+ // Did need to arbitrate
+ doEnd = !doArbitration(s);
- try {
- if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
- // Guard evaluated as true
-
- // update the local tmp current key set
- for (KeyValue kv : ut.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
-
- // create the commit
- newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
- } else {
- // Guard was false
-
- // create the abort
- newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if ((newEntry != null) && s.hasSpace(newEntry)) {
- s.addEntry(newEntry);
- i.remove();
-
- } else {
- break;
- }
- }
-
- /* now go through live entries from least to greatest sequence number until
- * either all live slots added, or the slot doesn't have enough room
- * for SKIP_THRESHOLD consecutive entries*/
- int skipcount = 0;
- search:
- for (; seqn <= newestseqnum; seqn++) {
- Slot prevslot = buffer.getSlot(seqn);
- //Push slot number forward
- if (!seenliveslot)
- lastliveslotseqn = seqn;
-
- if (!prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for (Entry liveentry : liveentries) {
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else {
- skipcount++;
- if (skipcount > SKIP_THRESHOLD)
- break search;
- }
- }
- }
+ doOptionalRescue(s, seenliveslot, seqn, resize);
int max = 0;
- if (resize)
+ if (resize) {
max = newsize;
+ }
+
Slot[] array = cloud.putSlot(s, max);
if (array == null) {
array = new Slot[] {s};
if (array.length == 0)
throw new Error("Server Error: Did not send any slots");
rejectedmessagelist.add(s.getSequenceNumber());
+ doEnd = false;
}
/* update data structure */
validateandupdate(array, true);
}
-
-
}
}
}
}
- void decrementLiveCount() {
+ public void decrementLiveCount() {
liveslotcount--;
+ // System.out.println("Decrement Live Count");
}
-
private void setResizeThreshold() {
int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
int newsize = 0;
if (liveslotcount > resizethreshold) {
resize = true; //Resize is forced
+ System.out.println("Live count resize: " + liveslotcount + " " + resizethreshold);
+
}
if (resize) {
newsize = (int) (numslots * RESIZE_MULTIPLE);
- TableStatus status = new TableStatus(s, newsize);
- s.addEntry(status);
- }
- if (! rejectedmessagelist.isEmpty()) {
- /* TODO: We should avoid generating a rejected message entry if
- * there is already a sufficient entry in the queue (e.g.,
- * equalsto value of true and same sequence number). */
+ System.out.println("New Size: " + newsize + " old: " + buffer.oldestseqn); // TODO remove
- long old_seqn = rejectedmessagelist.firstElement();
- if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
- long new_seqn = rejectedmessagelist.lastElement();
- RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
- s.addEntry(rm);
- } else {
- long prev_seqn = -1;
- int i = 0;
- /* Go through list of missing messages */
- for (; i < rejectedmessagelist.size(); i++) {
- long curr_seqn = rejectedmessagelist.get(i);
- Slot s_msg = buffer.getSlot(curr_seqn);
- if (s_msg != null)
- break;
- prev_seqn = curr_seqn;
- }
- /* Generate rejected message entry for missing messages */
- if (prev_seqn != -1) {
- RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
- s.addEntry(rm);
- }
- /* Generate rejected message entries for present messages */
- for (; i < rejectedmessagelist.size(); i++) {
- long curr_seqn = rejectedmessagelist.get(i);
- Slot s_msg = buffer.getSlot(curr_seqn);
- long machineid = s_msg.getMachineID();
- RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
- s.addEntry(rm);
- }
- }
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
}
- long newestseqnum = buffer.getNewestSeqNum();
- long oldestseqnum = buffer.getOldestSeqNum();
- if (lastliveslotseqn < oldestseqnum)
- lastliveslotseqn = oldestseqnum;
+ doRejectedMessages(s);
- long seqn = lastliveslotseqn;
- boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
- long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
- // Mandatory Rescue
- for (; seqn < threshold; seqn++) {
- Slot prevslot = buffer.getSlot(seqn);
- //Push slot number forward
- if (! seenliveslot)
- lastliveslotseqn = seqn;
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
- if (! prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for (Entry liveentry : liveentries) {
- if (s.hasSpace(liveentry)) {
- s.addEntry(liveentry);
- } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
- if (!resize) {
- System.out.print("B"); //?
- return tryput(pendingTrans, true);
- }
- }
- }
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ return tryput(pendingTrans, true);
}
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
- // Arbitrate
- Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
- for (Transaction ut : uncommittedTransactionsList) {
-
- KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
- // Check if this machine arbitrates for this transaction
- if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
- continue;
- }
-
- Entry newEntry = null;
-
- try {
- if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
- // Guard evaluated as true
-
- // update the local tmp current key set
- for (KeyValue kv : ut.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
-
- // create the commit
- newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
- } else {
- // Guard was false
-
- // create the abort
- newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- if ((newEntry != null) && s.hasSpace(newEntry)) {
- s.addEntry(newEntry);
- } else {
- break;
- }
- }
+ doArbitration(s);
Transaction trans = new Transaction(s,
s.getSequenceNumber(),
insertedTrans = true;
}
- /* now go through live entries from least to greatest sequence number until
- * either all live slots added, or the slot doesn't have enough room
- * for SKIP_THRESHOLD consecutive entries*/
- int skipcount = 0;
- search:
- for (; seqn <= newestseqnum; seqn++) {
- Slot prevslot = buffer.getSlot(seqn);
- //Push slot number forward
- if (!seenliveslot)
- lastliveslotseqn = seqn;
-
- if (!prevslot.isLive())
- continue;
- seenliveslot = true;
- Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
- for (Entry liveentry : liveentries) {
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else {
- skipcount++;
- if (skipcount > SKIP_THRESHOLD)
- break search;
- }
- }
- }
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+ insertedTrans = doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
- int max = 0;
- if (resize)
- max = newsize;
- Slot[] array = cloud.putSlot(s, max);
- if (array == null) {
- array = new Slot[] {s};
- rejectedmessagelist.clear();
- } else {
- if (array.length == 0)
- throw new Error("Server Error: Did not send any slots");
- rejectedmessagelist.add(s.getSequenceNumber());
- insertedTrans = false;
+ if (insertedTrans) {
+ // System.out.println("Inserted: " + trans.getSequenceNumber());
}
- /* update data structure */
- validateandupdate(array, true);
-
return insertedTrans;
}
s.addEntry(status);
}
+ doRejectedMessages(s);
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ return tryput(keyName, arbMachineid, true);
+ }
+
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
+
+
+ doArbitration(s);
+
+ NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+ boolean insertedNewKey = false;
+ if (s.hasSpace(newKey)) {
+ s.addEntry(newKey);
+ insertedNewKey = true;
+ }
+
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+ return doSendSlotsAndInsert(s, insertedNewKey, resize, newsize);
+ }
+
+ private void doRejectedMessages(Slot s) {
if (! rejectedmessagelist.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
}
}
}
+ }
+ private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
long newestseqnum = buffer.getNewestSeqNum();
long oldestseqnum = buffer.getOldestSeqNum();
if (lastliveslotseqn < oldestseqnum)
long seqn = lastliveslotseqn;
boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
- long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+ long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
// Mandatory Rescue
for (; seqn < threshold; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
- //Push slot number forward
+ // Push slot number forward
if (! seenliveslot)
lastliveslotseqn = seqn;
s.addEntry(liveentry);
} else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
if (!resize) {
- System.out.print("B"); //?
- return tryput(keyName, arbMachineid, true);
+ System.out.println("B"); //?
+
+ System.out.println("==============================NEEEEDDDD RESIZING");
+ return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
}
}
}
}
+ // Did not resize
+ return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
+ }
- // // Arbitrate
- // Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
- // for (Transaction ut : uncommittedTransactionsList) {
+ private boolean doArbitration(Slot s) {
+ // Arbitrate
+ Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
- // KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
- // // Check if this machine arbitrates for this transaction
- // if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
- // continue;
- // }
+ List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
- // Entry newEntry = null;
+ // Sort from oldest to newest
+ Collections.sort(transSeqNums);
- // try {
- // if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
- // // Guard evaluated as true
- // // update the local tmp current key set
- // for (KeyValue kv : ut.getkeyValueUpdateSet()) {
- // speculativeTableTmp.put(kv.getKey(), kv);
- // }
+ boolean didNeedArbitration = false;
+ for (Long transNum : transSeqNums) {
+ Transaction ut = uncommittedTransactionsMap.get(transNum);
- // // create the commit
- // newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
- // } else {
- // // Guard was false
+ KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+ // Check if this machine arbitrates for this transaction
+ if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ continue;
+ }
- // // create the abort
- // newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
- // }
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
+ // we did have something to arbitrate on
+ didNeedArbitration = true;
- // if ((newEntry != null) && s.hasSpace(newEntry)) {
+ Entry newEntry = null;
- // // TODO: Remove print
- // System.out.println("Arbitrating...");
- // s.addEntry(newEntry);
- // } else {
- // break;
- // }
- // }
+ try {
+ if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ // Guard evaluated as true
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
- NewKey newKey = new NewKey(s, keyName, arbMachineid);
+ // create the commit
+ newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+ } else {
+ // Guard was false
- boolean insertedNewKey = false;
- if (s.hasSpace(newKey)) {
- s.addEntry(newKey);
- insertedNewKey = true;
+ // create the abort
+ newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if ((newEntry != null) && s.hasSpace(newEntry)) {
+ s.addEntry(newEntry);
+ } else {
+ break;
+ }
}
+ return didNeedArbitration;
+ }
+
+ private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
/* now go through live entries from least to greatest sequence number until
* either all live slots added, or the slot doesn't have enough room
* for SKIP_THRESHOLD consecutive entries*/
int skipcount = 0;
+ long newestseqnum = buffer.getNewestSeqNum();
search:
for (; seqn <= newestseqnum; seqn++) {
Slot prevslot = buffer.getSlot(seqn);
}
}
}
+ }
+ private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
int max = 0;
if (resize)
max = newsize;
if (array.length == 0)
throw new Error("Server Error: Did not send any slots");
rejectedmessagelist.add(s.getSequenceNumber());
- insertedNewKey = false;
+ inserted = false;
}
/* update data structure */
validateandupdate(array, true);
- return insertedNewKey;
+ return inserted;
}
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
updateExpectedSize();
}
+ proccessAllNewCommits();
+
/* If there is a gap, check to see if the server sent us everything. */
if (firstseqnum != (sequencenumber + 1)) {
createSpeculativeTable();
}
+ public void proccessAllNewCommits() {
+
+ // Process only if there are commit
+ if (newCommitMap.keySet().size() == 0) {
+ return;
+ }
+
+ List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.keySet());
+
+ // Sort from oldest to newest commit
+ Collections.sort(commitSeqNums);
+
+ // Go through each new commit one by one
+ for (Long entrySeqNum : commitSeqNums) {
+ Commit entry = newCommitMap.get(entrySeqNum);
+
+ if (entry.getTransSequenceNumber() <= lastCommitSeenSeqNum) {
+
+ // Remove any old commits
+ for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+ Commit prevcommit = i.next();
+
+ if (entry.getTransSequenceNumber() == prevcommit.getTransSequenceNumber()) {
+ prevcommit.setDead();
+ i.remove();
+ }
+ }
+ commitList.add(entry);
+ continue;
+ }
+
+ // Remove any old commits
+ for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
+ Commit prevcommit = i.next();
+ prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+ if (!prevcommit.isLive()) {
+ i.remove();
+ }
+ }
+
+ // Add the new commit
+ commitList.add(entry);
+ lastCommitSeenSeqNum = entry.getTransSequenceNumber();
+ // System.out.println("Last Seq Num: " + lastCommitSeenSeqNum);
+
+
+ // Update the committed table list
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ IoTString key = kv.getKey();
+ commitedTable.put(key, kv);
+ }
+
+ long committedTransSeq = entry.getTransSequenceNumber();
+
+ // Make dead the transactions
+ for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next().getValue();
+
+ if (prevtrans.getSequenceNumber() <= committedTransSeq) {
+ i.remove();
+ prevtrans.setDead();
+ }
+ }
+ }
+
+
+ // Clear the new commits storage so we can use it later
+ newCommitMap.clear();
+ }
+
private void createSpeculativeTable() {
Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+ // Sort from oldest to newest commit
+ Collections.sort(utSeqNums);
- for (Transaction trans : uncommittedTransactionsList) {
+
+ for (Long key : utSeqNums) {
+ Transaction trans = uncommittedTransactionsMap.get(key);
try {
if (trans.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
}
private void commitNewMaxSize() {
- if (numslots != currmaxsize)
+ if (numslots != currmaxsize) {
+ System.out.println("Resizing the buffer"); // TODO: Remove
buffer.resize(currmaxsize);
+ }
numslots = currmaxsize;
setResizeThreshold();
}
-
-
-
-
private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
private void processEntry(NewKey entry) {
arbitratorTable.put(entry.getKey(), entry.getMachineID());
+
+ NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
+
+ if (oldNewKey != null) {
+ oldNewKey.setDead();
+ }
}
private void processEntry(Transaction entry) {
- uncommittedTransactionsList.add(entry);
+ Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+ // Duplicate so delete old copy
+ if (prevTrans != null) {
+ prevTrans.setDead();
+ }
}
private void processEntry(Abort entry) {
-
if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
// Abort has not been seen yet so we need to keep track of it
abortSet.add(entry);
entry.setDead();
}
- for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
- Transaction prevtrans = i.next();
- if (prevtrans.getSequenceNumber() == entry.getTransSequenceNumber()) {
- uncommittedTransactionsList.remove(prevtrans);
- prevtrans.setDead();
- return;
- }
- }
- }
-
- private void processEntry(Commit entry) {
-
- for (Iterator<Commit> i = commitList.iterator(); i.hasNext();) {
- Commit prevcommit = i.next();
- prevcommit.updateLiveKeys(entry.getkeyValueUpdateSet());
-
- if (!prevcommit.isLive()) {
- //commitList.remove(prevcommit);
- i.remove();
- }
- }
-
- commitList.add(entry);
-
- // Update the committed table list
- for (KeyValue kv : entry.getkeyValueUpdateSet()) {
- IoTString key = kv.getKey();
- commitedTable.put(key, kv);
- }
-
- long committedTransSeq = entry.getTransSequenceNumber();
-
// Make dead the transactions
- for (Iterator<Transaction> i = uncommittedTransactionsList.iterator(); i.hasNext();) {
- Transaction prevtrans = i.next();
+ for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next().getValue();
- if (prevtrans.getSequenceNumber() <= committedTransSeq) {
- // uncommittedTransactionsList.remove(prevtrans);
+ if (prevtrans.getSequenceNumber() <= entry.getTransSequenceNumber()) {
i.remove();
prevtrans.setDead();
}
}
}
+ private void processEntry(Commit entry, Slot s) {
+ Commit prevCommit = newCommitMap.put(entry.getTransSequenceNumber(), entry);
+ if (prevCommit != null) {
+ prevCommit.setDead();
+ }
+ }
+
private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
updateCurrMaxSize(newnumslots);
break;
case Entry.TypeCommit:
- processEntry((Commit)entry);
+ processEntry((Commit)entry, slot);
break;
case Entry.TypeAbort:
*/
public class Test {
+
+ public static final int NUMBER_OF_TESTS = 20000; //66
+
public static void main(String[] args) {
if (args[0].equals("2")) {
test2();
+ } else if (args[0].equals("3")) {
+ test3();
+ } else if (args[0].equals("4")) {
+ test4();
}
}
- static void test2() {
+
+
+ static void test5() {
Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
- t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
- t2.update();
+ t1.rebuild();
+ System.out.println(t1);
+ // // Print the results
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ // String a = "a" + i;
+ // String b = "b" + i;
+ // IoTString ia = new IoTString(a);
+ // IoTString ib = new IoTString(b);
+ // System.out.println(ib + " -> " + t1.getCommitted(ib));
+ // System.out.println(ia + " -> " + t2.getCommitted(ia));
+ // System.out.println();
+ // }
+ }
- final int NUMBER_OF_TESTS = 200;
+ static Thread buildThreadTest4(String prefix, Table t) {
+ return new Thread() {
+ public void run() {
+ for (int i = 0; i < (NUMBER_OF_TESTS * 3); i++) {
- for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ int num = i % NUMBER_OF_TESTS;
+ String key = prefix + num;
+ String value = prefix + (num + 2000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
- System.out.println("Doing: " + i);
+ t.startTransaction();
+ t.addKV(iKey, iValue);
+ t.commitTransaction();
+ }
+ }
+ };
+ }
+ static void test4() {
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ t1.rebuild();
+ t2.rebuild();
+
+ Thread thr1 = buildThreadTest4("b", t1);
+ Thread thr2 = buildThreadTest4("a", t2);
+ thr1.start();
+ thr2.start();
+ try {
+ thr1.join();
+ thr2.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ t1.update();
+ t2.update();
+ // t1.update();
+
+ // Print the results
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
String a = "a" + i;
String b = "b" + i;
IoTString ia = new IoTString(a);
IoTString ib = new IoTString(b);
+ System.out.println(ib + " -> " + t1.getCommitted(ib));
+ System.out.println(ia + " -> " + t2.getCommitted(ia));
+ System.out.println();
+ }
+ }
+
+ static void test3() {
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ t1.rebuild();
+ t2.rebuild();
+
- t1.createNewKey(ia, 351);
- t2.createNewKey(ib, 321);
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ String key = "a" + i;
+ String value = "a" + (i + 1000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
t1.startTransaction();
- t1.addKV(ia, ia);
+ t1.addKV(iKey, iValue);
t1.commitTransaction();
}
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ String key = "b" + i;
+ String value = "b" + (i + 1000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
- System.out.println("Doing: " + i);
+ t2.startTransaction();
+ t2.addKV(iKey, iValue);
+ t2.commitTransaction();
+ }
+
+ // Make sure t1 sees the new updates from t2
+ t1.update();
+ // Print the results
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
String a = "a" + i;
String b = "b" + i;
IoTString ia = new IoTString(a);
IoTString ib = new IoTString(b);
- t2.startTransaction();
- t2.addKV(ib, ib);
- t2.commitTransaction();
+ System.out.println(ib + " -> " + t1.getCommitted(ib));
+ System.out.println(ia + " -> " + t2.getCommitted(ia));
+ System.out.println();
}
+ }
+ static void test2() {
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ t1.initTable();
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ t2.update();
- t1.update();
- // t2.update();
- // t1.update();
-
+ // Make the Keys
+ System.out.println("Setting up keys");
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
String a = "a" + i;
String b = "b" + i;
IoTString ia = new IoTString(a);
IoTString ib = new IoTString(b);
+ t1.createNewKey(ia, 321);
+ t1.createNewKey(ib, 321);
+ }
- System.out.println(ib + " -> " + t1.getCommitted(ib));
- System.out.println(ia + " -> " + t2.getCommitted(ia));
- System.out.println();
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold);
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold);
+ // System.out.println();
+
+
+
+ // Do Updates for the keys
+ System.out.println("Writing Keys a...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+
+ String key = "a" + i;
+ String value = "a" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+
+ t1.startTransaction();
+ t1.addKV(iKey, iValue);
+ t1.commitTransaction();
+ }
+
+ // Do Updates for the keys
+ System.out.println("Writing Keys a...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+
+ String key = "a" + i;
+ String value = "a" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+ t1.startTransaction();
+ t1.addKV(iKey, iValue);
+ t1.commitTransaction();
}
+
+
+ t2.update();
+ System.out.println("Writing Keys b...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+
+ String key = "b" + i;
+ String value = "b" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+
+ t2.startTransaction();
+ t2.addKV(iKey, iValue);
+ t2.commitTransaction();
+ }
+
+
+ // Do Updates for the keys
+ System.out.println("Writing Keys a...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i += 2) {
+ System.out.println(i);
+
+ String key = "a" + i;
+ String value = "a" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+ t1.startTransaction();
+ t1.addKV(iKey, iValue);
+ t1.commitTransaction();
+ }
+
+
+ t1.update();
+ t2.update();
+
+ System.out.println("Checking a keys...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+ String key = "a" + i;
+ String value = "a" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+ IoTString testVal = t1.getCommitted(iKey);
+
+ if ((testVal == null) || (testVal.equals(iValue) == false)) {
+ System.out.println("Key val incorrect: " + key);
+ }
+
+ key = "b" + i;
+ value = "b" + (i + 10000);
+ iKey = new IoTString(key);
+ iValue = new IoTString(value);
+
+ testVal = t1.getCommitted(iKey);
+
+ if ((testVal == null) || (testVal.equals(iValue) == false)) {
+ System.out.println("Key val incorrect: " + key);
+ }
+ }
+
+ System.out.println("Checking b keys...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+ String key = "a" + i;
+ String value = "a" + (i + 10000);
+ IoTString iKey = new IoTString(key);
+ IoTString iValue = new IoTString(value);
+
+ IoTString testVal = t2.getCommitted(iKey);
+
+ if ((testVal == null) || (testVal.equals(iValue) == false)) {
+ System.out.println("Key val incorrect: " + key);
+ }
+
+ key = "b" + i;
+ value = "b" + (i + 10000);
+ iKey = new IoTString(key);
+ iValue = new IoTString(value);
+
+ testVal = t2.getCommitted(iKey);
+
+ if ((testVal == null) || (testVal.equals(iValue) == false)) {
+ System.out.println("Key val incorrect: " + key);
+ }
+ }
+
+
+
+
+ System.out.println();
+ System.out.println();
+ System.out.println("Update");
+ // Make sure t1 sees the new updates from t2
+ t1.update();
+ t2.update();
+ t1.update();
+ System.out.println();
+ System.out.println();
+ System.out.println();
+ System.out.println();
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+ t2.update();
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+ System.out.println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-");
+
+
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size());
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() );
+ System.out.println();
+
+ t1.printSlots();
+ System.out.println();
+ System.out.println();
+ System.out.println();
+ System.out.println();
+ t2.printSlots();
+ System.out.println();
+
+
+
+ // // Do Updates for the keys
+ // System.out.println("Writing Keys a (actual)");
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ // String key = "a" + i;
+ // String value = "a" + i;
+ // IoTString iKey = new IoTString(key);
+ // IoTString iValue = new IoTString(value);
+
+ // t1.startTransaction();
+ // t1.addKV(iKey, iValue);
+ // t1.commitTransaction();
+ // }
+
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size());
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size());
+ // System.out.println();
+
+
+
+ // System.out.println("Writing Keys b (actual)");
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ // String key = "b" + i;
+ // String value = "b" + i;
+ // IoTString iKey = new IoTString(key);
+ // IoTString iValue = new IoTString(value);
+
+ // t2.startTransaction();
+ // t2.addKV(iKey, iValue);
+ // t2.commitTransaction();
+ // }
+
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size() );
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() );
+ // System.out.println();
+
+
+ // // Do Updates for the keys
+ // System.out.println("Writing Keys a (actual)");
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ // String key = "a" + i;
+ // String value = "a" + i;
+ // IoTString iKey = new IoTString(key);
+ // IoTString iValue = new IoTString(value);
+
+ // t1.startTransaction();
+ // t1.addKV(iKey, iValue);
+ // t1.commitTransaction();
+ // }
+
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size());
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size());
+ // System.out.println();
+
+
+
+ // System.out.println("Writing Keys b (actual)");
+ // for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ // String key = "b" + i;
+ // String value = "b" + i;
+ // IoTString iKey = new IoTString(key);
+ // IoTString iValue = new IoTString(value);
+
+ // t2.startTransaction();
+ // t2.addKV(iKey, iValue);
+ // t2.commitTransaction();
+ // }
+
+ // System.out.println("=========t1 live" + t1.liveslotcount + " thresh: " + t1.resizethreshold + " commits: " + t1.commitedTable.size() + " uncommits: " + t1.uncommittedTransactionsList.size() );
+ // System.out.println("=========t2 live" + t2.liveslotcount + " thresh: " + t2.resizethreshold + " commits: " + t2.commitedTable.size() + " uncommits: " + t2.uncommittedTransactionsList.size() );
+ // System.out.println();
+
+ // t1.printSlots();
+ // System.out.println();
+ // t2.printSlots();
+
+
+
+ // Make sure t1 sees the new updates from t2
+ // t1.update();
+
+ // // Print the results
+ // for (int i = NUMBER_OF_TESTS - 10; i < NUMBER_OF_TESTS; i++) {
+ // String a = "a" + i;
+ // String b = "b" + i;
+ // IoTString ia = new IoTString(a);
+ // IoTString ib = new IoTString(b);
+
+ // System.out.println(ib + " -> " + t1.getCommitted(ib));
+ // System.out.println(ia + " -> " + t2.getCommitted(ia));
+ // System.out.println();
+ // }
}
}