private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10; //number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
- private long liveslotcount = 0;
+ private long liveslotcount = 0;
private int chance;
static final double RESIZE_MULTIPLE = 1.2;
static final double RESIZE_THRESHOLD = 0.75;
private Random random = new Random();
private long lastUncommittedTransaction = 0;
+ private int smallestTableStatusSeen = -1;
+ private int largestTableStatusSeen = -1;
+
private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
private Queue<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
private Map<Long, Commit> commitMap = null; // List of all the most recent live commits
private Map<Long, Abort> abortMap = null; // Set of the live aborts
- private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
- private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
+ private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
private Map<Long, Transaction> uncommittedTransactionsMap = null;
private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
}
- public void rebuild() {
+ public void rebuild() throws ServerException {
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, true);
}
- // TODO: delete method
+ // // TODO: delete method
// public void printSlots() {
// long o = buffer.getOldestSeqNum();
// long n = buffer.getNewestSeqNum();
}
}
+ public IoTString getCommittedAtomic(IoTString key) {
+ KeyValue kv = commitedTable.get(key);
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
+ } else {
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
+ }
+ }
+
+ public IoTString getSpeculativeAtomic(IoTString key) {
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ KeyValue kv = speculativeTable.get(key);
+ if (kv == null) {
+ kv = commitedTable.get(key);
+ }
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
+ } else {
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
+ }
+ }
+
public Long getArbitrator(IoTString key) {
return arbitratorTable.get(key);
}
- public void initTable() {
+ public void initTable() throws ServerException {
cloud.setSalt();//Set the salt
Slot s = new Slot(this, 1, localmachineid);
TableStatus status = new TableStatus(s, numslots);
pendingTransBuild = new PendingTransaction();
}
- public void commitTransaction() {
+ public void commitTransaction() throws ServerException {
if (pendingTransBuild.getKVUpdates().size() == 0) {
// If no updates are made then there is no point inserting into the chain
// Add the pending transaction to the queue
pendingTransQueue.add(pendingTransBuild);
+ // Delete since already inserted
+ pendingTransBuild = new PendingTransaction();
+
while (!pendingTransQueue.isEmpty()) {
if (tryput( pendingTransQueue.peek(), false)) {
pendingTransQueue.poll();
// Make sure new key value pair matches the current arbitrator
if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
// TODO: Maybe not throw en error
- throw new Error("Not all Key Values Match.");
+ throw new Error("Not all Key Values Match Arbitrator.");
}
KeyValue kv = new KeyValue(key, value);
pendingTransBuild.addKV(kv);
}
- public void addGuard(Guard guard) {
- pendingTransBuild.addGuard(guard);
- }
-
- public void update() {
+ public void update() throws ServerException {
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, false);
- if (uncommittedTransactionsMap.keySet().size() > 0) {
- boolean doEnd = false;
- boolean needResize = false;
- while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
- boolean resize = needResize;
- needResize = false;
+ if (!pendingTransQueue.isEmpty()) {
+ // We have a pending transaction so do full insertion
- Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- int newsize = 0;
- if (liveslotcount > resizethreshold) {
- resize = true; //Resize is forced
+ while (!pendingTransQueue.isEmpty()) {
+ if (tryput( pendingTransQueue.peek(), false)) {
+ pendingTransQueue.poll();
}
+ }
+ } else {
+ // We dont have a pending transaction so do minimal effort
+ if (uncommittedTransactionsMap.keySet().size() > 0) {
+
+ boolean doEnd = false;
+ boolean needResize = false;
+ while (!doEnd && (uncommittedTransactionsMap.keySet().size() > 0)) {
+ boolean resize = needResize;
+ needResize = false;
+
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
- if (resize) {
- newsize = (int) (numslots * RESIZE_MULTIPLE);
- TableStatus status = new TableStatus(s, newsize);
- s.addEntry(status);
- }
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
- doRejectedMessages(s);
+ doRejectedMessages(s);
- ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
- // Resize was needed so redo call
- if (retTup.getFirst()) {
- needResize = true;
- continue;
- }
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ needResize = true;
+ continue;
+ }
- // Extract working variables
- boolean seenliveslot = retTup.getSecond();
- long seqn = retTup.getThird();
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
- // Did need to arbitrate
- doEnd = !doArbitration(s);
+ // Did need to arbitrate
+ doEnd = !doArbitration(s);
- doOptionalRescue(s, seenliveslot, seqn, resize);
+ doOptionalRescue(s, seenliveslot, seqn, resize);
- int max = 0;
- if (resize) {
- max = newsize;
- }
+ int max = 0;
+ if (resize) {
+ max = newsize;
+ }
- Slot[] array = cloud.putSlot(s, max);
- if (array == null) {
- array = new Slot[] {s};
- rejectedmessagelist.clear();
- } else {
- if (array.length == 0)
- throw new Error("Server Error: Did not send any slots");
- rejectedmessagelist.add(s.getSequenceNumber());
- doEnd = false;
- }
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ doEnd = false;
+ }
- /* update data structure */
- validateandupdate(array, true);
+ /* update data structure */
+ validateandupdate(array, true);
+ }
}
}
}
- public boolean createNewKey(IoTString keyName, long machineId) {
+ public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
while (true) {
if (arbitratorTable.get(keyName) != null) {
resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
}
- private boolean tryput(PendingTransaction pendingTrans, boolean resize) {
+ private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
boolean seenliveslot = retTup.getSecond();
long seqn = retTup.getThird();
-
doArbitration(s);
Transaction trans = new Transaction(s,
localmachineid,
pendingTrans.getArbitrator(),
pendingTrans.getKVUpdates(),
- pendingTrans.getGuard());
+ pendingTrans.getKVGuard());
boolean insertedTrans = false;
if (s.hasSpace(trans)) {
s.addEntry(trans);
return doSendSlotsAndInsert(s, insertedTrans, resize, newsize);
}
- private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) {
+ private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
if (liveslotcount > resizethreshold) {
boolean seenliveslot = retTup.getSecond();
long seqn = retTup.getThird();
-
doArbitration(s);
NewKey newKey = new NewKey(s, keyName, arbMachineid);
private boolean doArbitration(Slot s) {
// Arbitrate
- Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
-
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
// Sort from oldest to newest
Collections.sort(transSeqNums);
-
boolean didNeedArbitration = false;
for (Long transNum : transSeqNums) {
Transaction ut = uncommittedTransactionsMap.get(transNum);
- KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
// Check if this machine arbitrates for this transaction
- if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ if (ut.getArbitrator() != localmachineid ) {
continue;
}
Entry newEntry = null;
- try {
- if ( ut.getGuard().evaluate(speculativeTableTmp.values())) {
- // Guard evaluated as true
+ if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
+ // Guard evaluated as true
- // update the local tmp current key set
- for (KeyValue kv : ut.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
- // create the commit
- newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
- } else {
- // Guard was false
+ // create the commit
+ newEntry = new Commit(s, ut.getSequenceNumber(), ut.getArbitrator(), ut.getkeyValueUpdateSet());
+ } else {
+ // Guard was false
- // create the abort
- newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
- }
- } catch (Exception e) {
- e.printStackTrace();
+ // create the abort
+ newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID(), ut.getArbitrator());
}
if ((newEntry != null) && s.hasSpace(newEntry)) {
}
}
- private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) {
+ private boolean doSendSlotsAndInsert(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
int max = 0;
if (resize)
max = newsize;
before decoding */
if (newslots.length == 0) return;
+ // Reset the table status declared sizes
+ smallestTableStatusSeen = -1;
+ largestTableStatusSeen = -1;
+
long firstseqnum = newslots[0].getSequenceNumber();
if (firstseqnum <= sequencenumber) {
throw new Error("Server Error: Sent older slots!");
HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
- initExpectedSize(firstseqnum);
+ // initExpectedSize(firstseqnum);
for (Slot slot : newslots) {
processSlot(indexer, slot, acceptupdatestolocal, machineSet);
- updateExpectedSize();
+ // updateExpectedSize();
}
-
- boolean hasGap = false;
/* If there is a gap, check to see if the server sent us everything. */
if (firstseqnum != (sequencenumber + 1)) {
}
}
+
commitNewMaxSize();
/* Commit new to slots. */
private void createSpeculativeTable() {
if (uncommittedTransactionsMap.keySet().size() == 0) {
- speculativeTable = commitedTable; // Ok that they are the same object
+ // speculativeTable = commitedTable; // Ok that they are the same object
return;
}
- Map speculativeTableTmp = null;
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
// Sort from oldest to newest commit
Collections.sort(utSeqNums);
if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
- speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+
+ speculativeTable.clear();
+ lastUncommittedTransaction = -1;
for (Long key : utSeqNums) {
Transaction trans = uncommittedTransactionsMap.get(key);
lastUncommittedTransaction = key;
- try {
- if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
- for (KeyValue kv : trans.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
+ if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
}
-
- } catch (Exception e) {
- e.printStackTrace();
}
+
}
} else {
- speculativeTableTmp = new HashMap<IoTString, KeyValue>(speculativeTable);
-
for (Long key : utSeqNums) {
if (key <= lastUncommittedTransaction) {
Transaction trans = uncommittedTransactionsMap.get(key);
- try {
- if (trans.getGuard().evaluate(speculativeTableTmp.values())) {
- for (KeyValue kv : trans.getkeyValueUpdateSet()) {
- speculativeTableTmp.put(kv.getKey(), kv);
- }
+ if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
}
-
- } catch (Exception e) {
- e.printStackTrace();
}
}
}
- speculativeTable = speculativeTableTmp;
+ for (IoTString key : speculativeTableTmp.keySet()) {
+ speculativeTable.put(key, speculativeTableTmp.get(key));
+ }
+
+ // speculativeTable = speculativeTableTmp;
}
private int expectedsize, currmaxsize;
private void checkNumSlots(int numslots) {
- if (numslots != expectedsize) {
- throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+
+
+ // We only have 1 size so we must have this many slots
+ if (largestTableStatusSeen == smallestTableStatusSeen) {
+ if (numslots != smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
+ } else {
+ // We have more than 1
+ if (numslots < smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
}
+
+ // if (numslots != expectedsize) {
+ // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+ // }
}
private void initExpectedSize(long firstsequencenumber) {
private void updateExpectedSize() {
expectedsize++;
if (expectedsize > currmaxsize) {
+ System.out.println("Maxing Out: " + expectedsize + " " + currmaxsize);
expectedsize = currmaxsize;
}
}
}
private void commitNewMaxSize() {
+
+ if (largestTableStatusSeen == -1) {
+ currmaxsize = numslots;
+ } else {
+ currmaxsize = largestTableStatusSeen;
+ }
+
if (numslots != currmaxsize) {
buffer.resize(currmaxsize);
}
}
private void processEntry(Transaction entry) {
- Transaction prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+
+ long arb = entry.getArbitrator();
+ Long comLast = lastCommitSeenSeqNumMap.get(arb);
+ Long abLast = lastAbortSeenSeqNumMap.get(arb);
+
+ Transaction prevTrans = null;
+
+ if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else {
+ prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+ }
// Duplicate so delete old copy
if (prevTrans != null) {
entry.setDead();
}
- lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+ lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ }
}
private void processEntry(Commit entry, Slot s) {
private void processEntry(TableStatus entry) {
int newnumslots = entry.getMaxSlots();
- updateCurrMaxSize(newnumslots);
+ // updateCurrMaxSize(newnumslots);
if (lastTableStatus != null)
lastTableStatus.setDead();
lastTableStatus = entry;
+
+ if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
+ smallestTableStatusSeen = newnumslots;
+ }
+
+ if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
+ largestTableStatusSeen = newnumslots;
+ }
+
+ // System.out.println("Table Stat: " + newnumslots + " large: " + largestTableStatusSeen + " small: " + smallestTableStatusSeen);
}
private void addWatchList(long machineid, RejectedMessage entry) {