--- /dev/null
+package iotcloud;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Iterator;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.Vector;
+import java.util.Random;
+import java.util.Queue;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.Collection;
+import java.util.Collections;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
+
+
+/**
+ * IoTTable data structure. Provides client inferface.
+ * @author Brian Demsky
+ * @version 1.0
+ */
+
+final public class Table {
+ private int numslots; //number of slots stored in buffer
+
+ // machine id -> (sequence number, Slot or LastMessage); records last message by each client
+ private HashMap<Long, Pair<Long, Liveness> > lastmessagetable = new HashMap<Long, Pair<Long, Liveness> >();
+ // machine id -> ...
+ private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
+ private Vector<Long> rejectedmessagelist = new Vector<Long>();
+ private SlotBuffer buffer;
+ private CloudComm cloud;
+ private long sequencenumber; //Largest sequence number a client has received
+ private long localmachineid;
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10; //number of slots that should be kept free
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount = 0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ static final int REJECTED_THRESHOLD = 5;
+ private int resizethreshold;
+ private long lastliveslotseqn; //smallest sequence number with a live entry
+ private Random random = new Random();
+ private long lastUncommittedTransaction = 0;
+
+ private int smallestTableStatusSeen = -1;
+ private int largestTableStatusSeen = -1;
+ private int lastSeenPendingTransactionSpeculateIndex = 0;
+ private int commitSequenceNumber = 0;
+ private long localTransactionSequenceNumber = 0;
+
+ private PendingTransaction pendingTransBuild = null; // Pending Transaction used in building
+ private LinkedList<PendingTransaction> pendingTransQueue = null; // Queue of pending transactions
+ private Map<Long, Map<Long, Commit>> commitMap = null; // List of all the most recent live commits
+ private Map<Long, Abort> abortMap = null; // Set of the live aborts
+ private Map<IoTString, Commit> committedMapByKey = null; // Table of committed KV
+ private Map<IoTString, KeyValue> commitedTable = null; // Table of committed KV
+ private Map<IoTString, KeyValue> speculativeTable = null; // Table of speculative KV
+ private Map<Long, Transaction> uncommittedTransactionsMap = null;
+ private Map<IoTString, Long> arbitratorTable = null; // Table of arbitrators
+ private Map<IoTString, NewKey> newKeyTable = null; // Table of speculative KV
+ private Map<Long, Map<Long, Commit>> newCommitMap = null; // Map of all the new commits
+ private Map<Long, Long> lastCommitSeenSeqNumMap = null; // sequence number of the last commit that was seen grouped by arbitrator
+ private Map<Long, Long> lastCommitSeenTransSeqNumMap = null; // transaction sequence number of the last commit that was seen grouped by arbitrator
+ private Map<Long, Long> lastAbortSeenSeqNumMap = null; // sequence number of the last abort that was seen grouped by arbitrator
+ private Map<IoTString, KeyValue> pendingTransSpeculativeTable = null;
+ private List<Commit> pendingCommitsList = null;
+ private List<Commit> pendingCommitsToDelete = null;
+ private Map<Long, LocalComm> localCommunicationChannels;
+ private Map<Long, TransactionStatus> transactionStatusMap = null;
+ private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
+
+ private Semaphore mutex = null;
+
+
+
+ public Table(String hostname, String baseurl, String password, long _localmachineid) {
+ localmachineid = _localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud = new CloudComm(this, hostname, baseurl, password);
+ lastliveslotseqn = 1;
+
+ setupDataStructs();
+ }
+
+ public Table(CloudComm _cloud, long _localmachineid) {
+ localmachineid = _localmachineid;
+ buffer = new SlotBuffer();
+ numslots = buffer.capacity();
+ setResizeThreshold();
+ sequencenumber = 0;
+ cloud = _cloud;
+
+ setupDataStructs();
+ }
+
+ private void setupDataStructs() {
+ pendingTransQueue = new LinkedList<PendingTransaction>();
+ commitMap = new HashMap<Long, Map<Long, Commit>>();
+ abortMap = new HashMap<Long, Abort>();
+ committedMapByKey = new HashMap<IoTString, Commit>();
+ commitedTable = new HashMap<IoTString, KeyValue>();
+ speculativeTable = new HashMap<IoTString, KeyValue>();
+ uncommittedTransactionsMap = new HashMap<Long, Transaction>();
+ arbitratorTable = new HashMap<IoTString, Long>();
+ newKeyTable = new HashMap<IoTString, NewKey>();
+ newCommitMap = new HashMap<Long, Map<Long, Commit>>();
+ lastCommitSeenSeqNumMap = new HashMap<Long, Long>();
+ lastCommitSeenTransSeqNumMap = new HashMap<Long, Long>();
+ lastAbortSeenSeqNumMap = new HashMap<Long, Long>();
+ pendingTransSpeculativeTable = new HashMap<IoTString, KeyValue>();
+ pendingCommitsList = new LinkedList<Commit>();
+ pendingCommitsToDelete = new LinkedList<Commit>();
+ localCommunicationChannels = new HashMap<Long, LocalComm>();
+ transactionStatusMap = new HashMap<Long, TransactionStatus>();
+ transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+ mutex = new Semaphore(1);
+ }
+
+ public void initTable() throws ServerException {
+ cloud.setSalt();//Set the salt
+ Slot s = new Slot(this, 1, localmachineid);
+ TableStatus status = new TableStatus(s, numslots);
+ s.addEntry(status);
+ Slot[] array = cloud.putSlot(s, numslots);
+ if (array == null) {
+ array = new Slot[] {s};
+ /* update data structure */
+ validateandupdate(array, true);
+ } else {
+ throw new Error("Error on initialization");
+ }
+ }
+
+ public void rebuild() throws ServerException, InterruptedException {
+ mutex.acquire();
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+ validateandupdate(newslots, true);
+ mutex.release();
+ }
+
+ // TODO: delete method
+ public void printSlots() {
+ long o = buffer.getOldestSeqNum();
+ long n = buffer.getNewestSeqNum();
+
+ int[] types = new int[10];
+
+ int num = 0;
+
+ int livec = 0;
+ int deadc = 0;
+ for (long i = o; i < (n + 1); i++) {
+ Slot s = buffer.getSlot(i);
+
+ Vector<Entry> entries = s.getEntries();
+
+ for (Entry e : entries) {
+ if (e.isLive()) {
+ int type = e.getType();
+ types[type] = types[type] + 1;
+ num++;
+ livec++;
+ } else {
+ deadc++;
+ }
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ System.out.println(i + " " + types[i]);
+ }
+ System.out.println("Live count: " + livec);
+ System.out.println("Dead count: " + deadc);
+ System.out.println("Old: " + o);
+ System.out.println("New: " + n);
+ System.out.println("Size: " + buffer.size());
+ System.out.println("Commits Key Map: " + commitedTable.size());
+ // System.out.println("Commits Live Map: " + commitMap.size());
+ System.out.println("Pending: " + pendingTransQueue.size());
+
+ // List<IoTString> strList = new ArrayList<IoTString>();
+ // for (int i = 0; i < 100; i++) {
+ // String keyA = "a" + i;
+ // String keyB = "b" + i;
+ // String keyC = "c" + i;
+ // String keyD = "d" + i;
+
+ // IoTString iKeyA = new IoTString(keyA);
+ // IoTString iKeyB = new IoTString(keyB);
+ // IoTString iKeyC = new IoTString(keyC);
+ // IoTString iKeyD = new IoTString(keyD);
+
+ // strList.add(iKeyA);
+ // strList.add(iKeyB);
+ // strList.add(iKeyC);
+ // strList.add(iKeyD);
+ // }
+
+
+ // for (Long l : commitMap.keySet()) {
+ // for (Long l2 : commitMap.get(l).keySet()) {
+ // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+ // strList.remove(kv.getKey());
+ // System.out.print(kv.getKey() + " ");
+ // }
+ // }
+ // }
+
+ // System.out.println();
+ // System.out.println();
+
+ // for (IoTString s : strList) {
+ // System.out.print(s + " ");
+ // }
+ // System.out.println();
+ // System.out.println(strList.size());
+ }
+
+ public long getId() {
+ return localmachineid;
+ }
+
+ public boolean hasConnection() {
+ return cloud.hasConnection();
+ }
+
+ public String toString() {
+ String retString = " Committed Table: \n";
+ retString += "---------------------------\n";
+ retString += commitedTable.toString();
+
+ retString += "\n\n";
+
+ retString += " Speculative Table: \n";
+ retString += "---------------------------\n";
+ retString += speculativeTable.toString();
+
+ return retString;
+ }
+
+ public void addLocalComm(long machineId, LocalComm lc) {
+ localCommunicationChannels.put(machineId, lc);
+ }
+ public Long getArbitrator(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+ Long arb = arbitratorTable.get(key);
+ mutex.release();
+
+ return arb;
+ }
+
+ public IoTString getCommitted(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+ KeyValue kv = commitedTable.get(key);
+ mutex.release();
+
+
+ if (kv != null) {
+ return kv.getValue();
+ } else {
+ return null;
+ }
+ }
+
+ public IoTString getSpeculative(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+
+ KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+ if (kv == null) {
+ kv = speculativeTable.get(key);
+ }
+
+ if (kv == null) {
+ kv = commitedTable.get(key);
+ }
+ mutex.release();
+
+
+ if (kv != null) {
+ return kv.getValue();
+ } else {
+ return null;
+ }
+ }
+
+ public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+
+ KeyValue kv = commitedTable.get(key);
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ mutex.release();
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
+ } else {
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
+ }
+ }
+
+ public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ KeyValue kv = pendingTransSpeculativeTable.get(key);
+
+ if (kv == null) {
+ kv = speculativeTable.get(key);
+ }
+
+ if (kv == null) {
+ kv = commitedTable.get(key);
+ }
+
+ mutex.release();
+
+ if (kv != null) {
+ pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
+ return kv.getValue();
+ } else {
+ pendingTransBuild.addKVGuard(new KeyValue(key, null));
+ return null;
+ }
+ }
+
+ public Pair<Boolean, Boolean> update() throws InterruptedException {
+
+ mutex.acquire();
+
+ boolean gotLatestFromServer = false;
+ boolean didSendLocal = false;
+
+ try {
+ Slot[] newslots = cloud.getSlots(sequencenumber + 1);
+ validateandupdate(newslots, false);
+ gotLatestFromServer = true;
+
+ if (!pendingTransQueue.isEmpty()) {
+
+ // We have a pending transaction so do full insertion
+ processPendingTrans();
+ } else {
+
+ // We dont have a pending transaction so do minimal effort
+ updateWithNotPendingTrans();
+ }
+
+ didSendLocal = true;
+
+
+ } catch (Exception e) {
+ // could not update so do nothing
+ }
+
+
+ mutex.release();
+ return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
+ }
+
+ public Boolean updateFromLocal(long arb) throws InterruptedException {
+ LocalComm lc = localCommunicationChannels.get(arb);
+ if (lc == null) {
+ // Cant talk directly to arbitrator so cant do anything
+ return false;
+ }
+
+ byte[] array = new byte[Long.BYTES ];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(arb);
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
+ }
+
+ mutex.acquire();
+ byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
+ }
+
+ for (Commit commit : newCommits) {
+ // Prepare to process the commit
+ processEntry(commit);
+ }
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+
+ mutex.release();
+
+ return true;
+ }
+
+ public void startTransaction() throws InterruptedException {
+ // Create a new transaction, invalidates any old pending transactions.
+ pendingTransBuild = new PendingTransaction();
+ }
+
+ public void addKV(IoTString key, IoTString value) {
+
+ if (arbitratorTable.get(key) == null) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ KeyValue kv = new KeyValue(key, value);
+ pendingTransBuild.addKV(kv);
+ }
+
+ public TransactionStatus commitTransaction() throws InterruptedException {
+
+ if (pendingTransBuild.getKVUpdates().size() == 0) {
+
+ // transaction with no updates will have no effect on the system
+ return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
+ }
+
+ mutex.acquire();
+
+ TransactionStatus transStatus = null;
+
+ if (pendingTransBuild.getArbitrator() != localmachineid) {
+
+ // set the local sequence number so we can recognize this transaction later
+ pendingTransBuild.setMachineLocalTransSeqNum(localTransactionSequenceNumber);
+ localTransactionSequenceNumber++;
+
+ transStatus = new TransactionStatus(TransactionStatus.StatusPending, pendingTransBuild.getArbitrator());
+ transactionStatusNotSentMap.put(pendingTransBuild.getMachineLocalTransSeqNum(), transStatus);
+
+ // Add the pending transaction to the queue
+ pendingTransQueue.add(pendingTransBuild);
+
+
+ for (int i = lastSeenPendingTransactionSpeculateIndex; i < pendingTransQueue.size(); i++) {
+ PendingTransaction pt = pendingTransQueue.get(i);
+
+ if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+
+ lastSeenPendingTransactionSpeculateIndex = i;
+
+ for (KeyValue kv : pt.getKVUpdates()) {
+ pendingTransSpeculativeTable.put(kv.getKey(), kv);
+ }
+
+ }
+ }
+ } else {
+ Transaction ut = new Transaction(null,
+ -1,
+ localmachineid,
+ pendingTransBuild.getArbitrator(),
+ pendingTransBuild.getKVUpdates(),
+ pendingTransBuild.getKVGuard());
+
+ Pair<Boolean, List<Commit>> retData = doLocalUpdateAndArbitrate(ut, lastCommitSeenSeqNumMap.get(localmachineid));
+
+ if (retData.getFirst()) {
+ transStatus = new TransactionStatus(TransactionStatus.StatusCommitted, pendingTransBuild.getArbitrator());
+ } else {
+ transStatus = new TransactionStatus(TransactionStatus.StatusAborted, pendingTransBuild.getArbitrator());
+ }
+ }
+
+ // Try to insert transactions if possible
+ if (!pendingTransQueue.isEmpty()) {
+ // We have a pending transaction so do full insertion
+ processPendingTrans();
+ } else {
+ try {
+ // We dont have a pending transaction so do minimal effort
+ updateWithNotPendingTrans();
+ } catch (Exception e) {
+ // Do nothing
+ }
+ }
+
+ // reset it so next time is fresh
+ pendingTransBuild = new PendingTransaction();
+
+
+ mutex.release();
+ return transStatus;
+ }
+
+ public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
+ try {
+ mutex.acquire();
+
+ while (true) {
+ if (arbitratorTable.get(keyName) != null) {
+ // There is already an arbitrator
+ mutex.release();
+ return false;
+ }
+
+ if (tryput(keyName, machineId, false)) {
+ // If successfully inserted
+ mutex.release();
+ return true;
+ }
+ }
+ } catch (ServerException e) {
+ mutex.release();
+ throw e;
+ }
+ }
+
+ private void processPendingTrans() throws InterruptedException {
+
+ boolean sentAllPending = false;
+ try {
+ while (!pendingTransQueue.isEmpty()) {
+ if (tryput( pendingTransQueue.peek(), false)) {
+ pendingTransQueue.poll();
+ }
+ }
+
+ // if got here then all pending transactions were sent
+ sentAllPending = true;
+ } catch (Exception e) {
+ // There was a connection error
+ sentAllPending = false;
+ }
+
+ if (!sentAllPending) {
+
+ for (Iterator<PendingTransaction> i = pendingTransQueue.iterator(); i.hasNext(); ) {
+ PendingTransaction pt = i.next();
+ LocalComm lc = localCommunicationChannels.get(pt.getArbitrator());
+ if (lc == null) {
+ // Cant talk directly to arbitrator so cant do anything
+ continue;
+ }
+
+
+ Transaction ut = new Transaction(null,
+ -1,
+ localmachineid,
+ pendingTransBuild.getArbitrator(),
+ pendingTransBuild.getKVUpdates(),
+ pendingTransBuild.getKVGuard());
+
+
+ Pair<Boolean, List<Commit>> retData = sendTransactionToLocal(ut, lc);
+
+ for (Commit commit : retData.getSecond()) {
+ // Prepare to process the commit
+ processEntry(commit);
+ }
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ if (retData.getFirst()) {
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ if (transStatus != null) {
+ transStatus.setStatus(TransactionStatus.StatusCommitted);
+ }
+
+ } else {
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTransBuild.getMachineLocalTransSeqNum());
+ if (transStatus != null) {
+ transStatus.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+ i.remove();
+ }
+ }
+ }
+
+ private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
+ boolean doEnd = false;
+ boolean needResize = false;
+ while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
+ boolean resize = needResize;
+ needResize = false;
+
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ doRejectedMessages(s);
+
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ needResize = true;
+ continue;
+ }
+
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
+
+ // Did need to arbitrate
+ doEnd = !doArbitration(s);
+
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+
+ int max = 0;
+ if (resize) {
+ max = newsize;
+ }
+
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+
+ // Delete pending commits that were sent to the cloud
+ deletePendingCommits();
+
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ doEnd = false;
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+ }
+ }
+
+ private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
+
+ // encode the request
+ byte[] array = new byte[Long.BYTES + ut.getSize()];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ Long lastSeenCommit = lastCommitSeenSeqNumMap.get(ut.getArbitrator());
+ if (lastSeenCommit != null) {
+ bbEncode.putLong(lastSeenCommit);
+ } else {
+ bbEncode.putLong(0);
+ }
+ ut.encode(bbEncode);
+
+ byte[] data = lc.sendDataToLocalDevice(ut.getArbitrator(), bbEncode.array());
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ boolean didCommit = bbDecode.get() == 1;
+ int numberOfCommites = bbDecode.getInt();
+
+ List<Commit> newCommits = new LinkedList<Commit>();
+ for (int i = 0; i < numberOfCommites; i++ ) {
+ bbDecode.get();
+ Commit com = (Commit)Commit.decode(null, bbDecode);
+ newCommits.add(com);
+ }
+
+ return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
+ }
+
+ public byte[] localCommInput(byte[] data) throws InterruptedException {
+
+
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastSeenCommit = bbDecode.getLong();
+
+ Transaction ut = null;
+ if (data.length != Long.BYTES) {
+ bbDecode.get();
+ ut = (Transaction)Transaction.decode(null, bbDecode);
+ }
+
+ mutex.acquire();
+ // Do the local update and arbitrate
+ Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+ mutex.release();
+
+
+ // Calculate the size of the response
+ int size = Byte.BYTES + Integer.BYTES;
+ for (Commit com : returnData.getSecond()) {
+ size += com.getSize();
+ }
+
+ // encode the response
+ byte[] array = new byte[size];
+ ByteBuffer bbEncode = ByteBuffer.wrap(array);
+ if (returnData.getFirst()) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ bbEncode.putInt(returnData.getSecond().size());
+
+ for (Commit com : returnData.getSecond()) {
+ com.encode(bbEncode);
+ }
+
+ return bbEncode.array();
+ }
+
+ private Pair<Boolean, List<Commit>> doLocalUpdateAndArbitrate(Transaction ut, Long lastCommitSeen) {
+
+ List<Commit> returnCommits = new ArrayList<Commit>();
+
+ if ((lastCommitSeenSeqNumMap.get(localmachineid) != null) && (lastCommitSeenSeqNumMap.get(localmachineid) > lastCommitSeen)) {
+ // There is a commit that the other client has not seen yet
+
+ Map<Long, Commit> cm = commitMap.get(localmachineid);
+ if (cm != null) {
+
+ List<Long> commitKeys = new ArrayList<Long>(cm.keySet());
+ Collections.sort(commitKeys);
+
+
+ for (int i = (commitKeys.size() - 1); i >= 0; i--) {
+ Commit com = cm.get(commitKeys.get(i));
+
+ if (com.getSequenceNumber() <= lastCommitSeen) {
+ break;
+ }
+ returnCommits.add((Commit)com.getCopy(null));
+ }
+ }
+ }
+
+
+ if ((ut == null) || (ut.getArbitrator() != localmachineid)) {
+ // We are not the arbitrator for that transaction so the other device is talking to the wrong arbitrator
+ // or there is no transaction to process
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
+ if (!ut.evaluateGuard(commitedTable, null)) {
+ // Guard evaluated as false so return only the commits that the other device has not seen yet
+ return new Pair<Boolean, List<Commit>>(false, returnCommits);
+ }
+
+ // create the commit
+ Commit commit = new Commit(null,
+ -1,
+ commitSequenceNumber,
+ ut.getArbitrator(),
+ ut.getkeyValueUpdateSet());
+ commitSequenceNumber = commitSequenceNumber + 1;
+
+ // Add to the pending commits list
+ pendingCommitsList.add(commit);
+
+ // Add this commit so we can send it back
+ returnCommits.add(commit);
+
+ // Prepare to process the commit
+ processEntry(commit);
+
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ return new Pair<Boolean, List<Commit>>(true, returnCommits);
+ }
+
+ public void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+ private void setResizeThreshold() {
+ int resize_lower = (int) (RESIZE_THRESHOLD * numslots);
+ resizethreshold = resize_lower - 1 + random.nextInt(numslots - resize_lower);
+ }
+
+ private boolean tryput(PendingTransaction pendingTrans, boolean resize) throws ServerException {
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ doRejectedMessages(s);
+
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ return tryput(pendingTrans, true);
+ }
+
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
+
+ doArbitration(s);
+
+ Transaction trans = new Transaction(s,
+ s.getSequenceNumber(),
+ localmachineid,
+ pendingTrans.getArbitrator(),
+ pendingTrans.getKVUpdates(),
+ pendingTrans.getKVGuard());
+ boolean insertedTrans = false;
+ if (s.hasSpace(trans)) {
+ s.addEntry(trans);
+ insertedTrans = true;
+ }
+
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+ Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedTrans, resize, newsize);
+
+ if (sendRetData.getFirst()) {
+ // update the status and change what the sequence number is for the
+ TransactionStatus transStatus = transactionStatusNotSentMap.remove(pendingTrans.getMachineLocalTransSeqNum());
+ transStatus.setStatus(TransactionStatus.StatusSent);
+ transStatus.setSentTransaction();
+ transactionStatusMap.put(trans.getSequenceNumber(), transStatus);
+ }
+
+
+ if (sendRetData.getSecond().length != 0) {
+ // insert into the local block chain
+ validateandupdate(sendRetData.getSecond(), true);
+ }
+
+ return sendRetData.getFirst();
+ }
+
+ private boolean tryput(IoTString keyName, long arbMachineid, boolean resize) throws ServerException {
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ doRejectedMessages(s);
+ ThreeTuple<Boolean, Boolean, Long> retTup = doMandatoryResuce(s, resize);
+
+ // Resize was needed so redo call
+ if (retTup.getFirst()) {
+ return tryput(keyName, arbMachineid, true);
+ }
+
+ // Extract working variables
+ boolean seenliveslot = retTup.getSecond();
+ long seqn = retTup.getThird();
+
+ doArbitration(s);
+
+ NewKey newKey = new NewKey(s, keyName, arbMachineid);
+
+ boolean insertedNewKey = false;
+ if (s.hasSpace(newKey)) {
+ s.addEntry(newKey);
+ insertedNewKey = true;
+ }
+
+ doOptionalRescue(s, seenliveslot, seqn, resize);
+ Pair<Boolean, Slot[]> sendRetData = doSendSlots(s, insertedNewKey, resize, newsize);
+
+ if (sendRetData.getSecond().length != 0) {
+ // insert into the local block chain
+ validateandupdate(sendRetData.getSecond(), true);
+ }
+
+ return sendRetData.getFirst();
+ }
+
+ private void doRejectedMessages(Slot s) {
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn = rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn = rejectedmessagelist.lastElement();
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i = 0;
+ /* Go through list of missing messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn = curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ long machineid = s_msg.getMachineID();
+ RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+ }
+
+ private ThreeTuple<Boolean, Boolean, Long> doMandatoryResuce(Slot s, boolean resize) {
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; // smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
+
+
+ // Mandatory Rescue
+ for (; seqn < threshold; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ // Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.println("B"); //?
+ return new ThreeTuple<Boolean, Boolean, Long>(true, seenliveslot, seqn);
+ }
+ }
+ }
+ }
+
+ // Did not resize
+ return new ThreeTuple<Boolean, Boolean, Long>(false, seenliveslot, seqn);
+ }
+
+ private boolean doArbitration(Slot s) {
+
+ // flag whether we have finished all arbitration
+ boolean stillHasArbitration = false;
+
+ pendingCommitsToDelete.clear();
+
+ // First add queue commits
+ for (Commit commit : pendingCommitsList) {
+ if (s.hasSpace(commit)) {
+ s.addEntry(commit);
+ pendingCommitsToDelete.add(commit);
+ } else {
+ // Ran out of space so move on but still not done
+ stillHasArbitration = true;
+ return stillHasArbitration;
+ }
+ }
+
+ // Arbitrate
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
+ List<Long> transSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+ // Sort from oldest to newest
+ Collections.sort(transSeqNums);
+
+ for (Long transNum : transSeqNums) {
+ Transaction ut = uncommittedTransactionsMap.get(transNum);
+
+ // Check if this machine arbitrates for this transaction
+ if (ut.getArbitrator() != localmachineid ) {
+ continue;
+ }
+
+ // we did have something to arbitrate on
+ stillHasArbitration = true;
+
+ Entry newEntry = null;
+
+ if (ut.evaluateGuard(commitedTable, speculativeTableTmp)) {
+ // Guard evaluated as true
+
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+
+ // create the commit
+ newEntry = new Commit(s,
+ ut.getSequenceNumber(),
+ commitSequenceNumber,
+ ut.getArbitrator(),
+ ut.getkeyValueUpdateSet());
+ commitSequenceNumber = commitSequenceNumber + 1;
+ } else {
+ // Guard was false
+
+ // create the abort
+ newEntry = new Abort(s,
+ ut.getSequenceNumber(),
+ ut.getMachineID(),
+ ut.getArbitrator());
+ }
+
+ if ((newEntry != null) && s.hasSpace(newEntry)) {
+ s.addEntry(newEntry);
+ } else {
+ break;
+ }
+ }
+
+ return stillHasArbitration;
+ }
+
+ private void deletePendingCommits() {
+ for (Commit com : pendingCommitsToDelete) {
+ pendingCommitsList.remove(com);
+ }
+ pendingCommitsToDelete.clear();
+ }
+
+ private void doOptionalRescue(Slot s, boolean seenliveslot, long seqn, boolean resize) {
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount = 0;
+ long newestseqnum = buffer.getNewestSeqNum();
+ search:
+ for (; seqn <= newestseqnum; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+ }
+
+ private Pair<Boolean, Slot[]> doSendSlots(Slot s, boolean inserted, boolean resize, int newsize) throws ServerException {
+ int max = 0;
+ if (resize)
+ max = newsize;
+
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+
+ // Delete pending commits that were sent to the cloud
+ deletePendingCommits();
+ } else {
+ // if (array.length == 0)
+ // throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ inserted = false;
+ }
+
+ return new Pair<Boolean, Slot[]>(inserted, array);
+ }
+
+ private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
+ /* The cloud communication layer has checked slot HMACs already
+ before decoding */
+ if (newslots.length == 0) return;
+
+ // Reset the table status declared sizes
+ smallestTableStatusSeen = -1;
+ largestTableStatusSeen = -1;
+
+ long firstseqnum = newslots[0].getSequenceNumber();
+ if (firstseqnum <= sequencenumber) {
+ throw new Error("Server Error: Sent older slots!");
+ }
+
+ SlotIndexer indexer = new SlotIndexer(newslots, buffer);
+ checkHMACChain(indexer, newslots);
+
+ HashSet<Long> machineSet = new HashSet<Long>(lastmessagetable.keySet()); //
+
+ // initExpectedSize(firstseqnum);
+ for (Slot slot : newslots) {
+ processSlot(indexer, slot, acceptupdatestolocal, machineSet);
+ // updateExpectedSize();
+ }
+
+ /* If there is a gap, check to see if the server sent us everything. */
+ if (firstseqnum != (sequencenumber + 1)) {
+
+ // TODO: Check size
+ checkNumSlots(newslots.length);
+ if (!machineSet.isEmpty()) {
+ throw new Error("Missing record for machines: " + machineSet);
+ }
+ }
+
+
+ commitNewMaxSize();
+
+ /* Commit new to slots. */
+ for (Slot slot : newslots) {
+ buffer.putSlot(slot);
+ liveslotcount++;
+ }
+ sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
+
+ // Process all on key value pairs
+ boolean didCommitOrSpeculate = proccessAllNewCommits();
+
+ // Go through all uncommitted transactions and kill the ones that are dead
+ deleteDeadUncommittedTransactions();
+
+ // Speculate on key value pairs
+ didCommitOrSpeculate |= createSpeculativeTable();
+
+ createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+ }
+
+ private boolean proccessAllNewCommits() {
+ // Process only if there are commit
+ if (newCommitMap.keySet().size() == 0) {
+ return false;
+ }
+ boolean didProcessNewCommit = false;
+
+ for (Long arb : newCommitMap.keySet()) {
+
+ List<Long> commitSeqNums = new ArrayList<Long>(newCommitMap.get(arb).keySet());
+
+ // Sort from oldest to newest commit
+ Collections.sort(commitSeqNums);
+
+ // Go through each new commit one by one
+ for (Long entrySeqNum : commitSeqNums) {
+ Commit entry = newCommitMap.get(arb).get(entrySeqNum);
+
+ long lastCommitSeenSeqNum = -1;
+ if (lastCommitSeenSeqNumMap.get(entry.getTransArbitrator()) != null) {
+ lastCommitSeenSeqNum = lastCommitSeenSeqNumMap.get(entry.getTransArbitrator());
+ }
+
+ if (entry.getSequenceNumber() <= lastCommitSeenSeqNum) {
+ Map<Long, Commit> cm = commitMap.get(arb);
+ if (cm == null) {
+ cm = new HashMap<Long, Commit>();
+ }
+
+ Commit prevCommit = cm.put(entry.getSequenceNumber(), entry);
+ commitMap.put(arb, cm);
+
+ if (prevCommit != null) {
+ prevCommit.setDead();
+
+ for (KeyValue kv : prevCommit.getkeyValueUpdateSet()) {
+ committedMapByKey.put(kv.getKey(), entry);
+ }
+ }
+
+ continue;
+ }
+
+ Set<Commit> commitsToEditSet = new HashSet<Commit>();
+
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ commitsToEditSet.add(committedMapByKey.get(kv.getKey()));
+ }
+
+ commitsToEditSet.remove(null);
+
+ for (Commit prevCommit : commitsToEditSet) {
+
+ Set<KeyValue> deletedKV = prevCommit.updateLiveKeys(entry.getkeyValueUpdateSet());
+
+ if (!prevCommit.isLive()) {
+ Map<Long, Commit> cm = commitMap.get(arb);
+
+ // remove it from the map so that it can be set as dead
+ if (cm != null) {
+ cm.remove(prevCommit.getSequenceNumber());
+ commitMap.put(arb, cm);
+ }
+ }
+ }
+
+ // Add the new commit
+ Map<Long, Commit> cm = commitMap.get(arb);
+ if (cm == null) {
+ cm = new HashMap<Long, Commit>();
+ }
+ cm.put(entry.getSequenceNumber(), entry);
+ commitMap.put(arb, cm);
+
+ lastCommitSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getSequenceNumber());
+
+ // set the trans sequence number if we are able to
+ if (entry.getTransSequenceNumber() != -1) {
+ lastCommitSeenTransSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ }
+
+ didProcessNewCommit = true;
+
+ // Update the committed table list
+ for (KeyValue kv : entry.getkeyValueUpdateSet()) {
+ IoTString key = kv.getKey();
+ commitedTable.put(key, kv);
+ committedMapByKey.put(key, entry);
+ }
+ }
+ }
+ // Clear the new commits storage so we can use it later
+ newCommitMap.clear();
+
+
+ // go through all saved transactions and update the status of those that can be updated
+ for (Iterator<Map.Entry<Long, TransactionStatus>> i = transactionStatusMap.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<Long, TransactionStatus> entry = i.next();
+ long seqnum = entry.getKey();
+ TransactionStatus status = entry.getValue();
+
+ if (status.getSentTransaction()) {
+
+ Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(status.getArbitrator());
+ Long abortSeqNum = lastAbortSeenSeqNumMap.get(status.getArbitrator());
+
+ if (((commitSeqNum != null) && (seqnum <= commitSeqNum)) ||
+ ((abortSeqNum != null) && (seqnum <= abortSeqNum))) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ i.remove();
+ }
+ }
+ }
+
+ return didProcessNewCommit;
+ }
+
+ private void deleteDeadUncommittedTransactions() {
+ // Make dead the transactions
+ for (Iterator<Map.Entry<Long, Transaction>> i = uncommittedTransactionsMap.entrySet().iterator(); i.hasNext();) {
+ Transaction prevtrans = i.next().getValue();
+ long transArb = prevtrans.getArbitrator();
+
+ Long commitSeqNum = lastCommitSeenTransSeqNumMap.get(transArb);
+ Long abortSeqNum = lastAbortSeenSeqNumMap.get(transArb);
+
+ if (((commitSeqNum != null) && (prevtrans.getSequenceNumber() <= commitSeqNum)) ||
+ ((abortSeqNum != null) && (prevtrans.getSequenceNumber() <= abortSeqNum))) {
+ i.remove();
+ prevtrans.setDead();
+ }
+ }
+ }
+
+ private boolean createSpeculativeTable() {
+ if (uncommittedTransactionsMap.keySet().size() == 0) {
+ return false;
+ }
+
+ Map<IoTString, KeyValue> speculativeTableTmp = new HashMap<IoTString, KeyValue>();
+ List<Long> utSeqNums = new ArrayList<Long>(uncommittedTransactionsMap.keySet());
+
+ // Sort from oldest to newest commit
+ Collections.sort(utSeqNums);
+
+ if (utSeqNums.get(0) > (lastUncommittedTransaction)) {
+
+ speculativeTable.clear();
+ lastUncommittedTransaction = -1;
+
+ for (Long key : utSeqNums) {
+ Transaction trans = uncommittedTransactionsMap.get(key);
+
+ lastUncommittedTransaction = key;
+
+ if (trans.evaluateGuard(commitedTable, speculativeTableTmp)) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+ }
+
+ }
+ } else {
+ for (Long key : utSeqNums) {
+
+ if (key <= lastUncommittedTransaction) {
+ continue;
+ }
+
+ lastUncommittedTransaction = key;
+
+ Transaction trans = uncommittedTransactionsMap.get(key);
+
+ if (trans.evaluateGuard(speculativeTable, speculativeTableTmp)) {
+ for (KeyValue kv : trans.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+ }
+ }
+ }
+
+ for (IoTString key : speculativeTableTmp.keySet()) {
+ speculativeTable.put(key, speculativeTableTmp.get(key));
+ }
+
+ return true;
+ }
+
+ private void createPendingTransactionSpeculativeTable(boolean didCommitOrSpeculate) {
+
+ if (didCommitOrSpeculate) {
+ pendingTransSpeculativeTable.clear();
+ lastSeenPendingTransactionSpeculateIndex = 0;
+
+ int index = 0;
+ for (PendingTransaction pt : pendingTransQueue) {
+ if (pt.evaluateGuard(commitedTable, speculativeTable, pendingTransSpeculativeTable)) {
+
+ lastSeenPendingTransactionSpeculateIndex = index;
+ index++;
+
+ for (KeyValue kv : pt.getKVUpdates()) {
+ pendingTransSpeculativeTable.put(kv.getKey(), kv);
+ }
+
+ }
+ }
+ }
+ }
+
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+
+
+ // We only have 1 size so we must have this many slots
+ if (largestTableStatusSeen == smallestTableStatusSeen) {
+ if (numslots != smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
+ } else {
+ // We have more than 1
+ if (numslots < smallestTableStatusSeen) {
+ throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numslots);
+ }
+ }
+
+ // if (numslots != expectedsize) {
+ // throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numslots);
+ // }
+ }
+
+ private void initExpectedSize(long firstsequencenumber) {
+ long prevslots = firstsequencenumber;
+ expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
+ currmaxsize = numslots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize) {
+ expectedsize = currmaxsize;
+ }
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize = newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+
+ if (largestTableStatusSeen == -1) {
+ currmaxsize = numslots;
+ } else {
+ currmaxsize = largestTableStatusSeen;
+ }
+
+ if (numslots != currmaxsize) {
+ buffer.resize(currmaxsize);
+ }
+
+ numslots = currmaxsize;
+ setResizeThreshold();
+ }
+
+ private void processEntry(LastMessage entry, HashSet<Long> machineSet) {
+ updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
+ }
+
+ private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ long oldseqnum = entry.getOldSeqNum();
+ long newseqnum = entry.getNewSeqNum();
+ boolean isequal = entry.getEqual();
+ long machineid = entry.getMachineID();
+ for (long seqnum = oldseqnum; seqnum <= newseqnum; seqnum++) {
+ Slot slot = indexer.getSlot(seqnum);
+ if (slot != null) {
+ long slotmachineid = slot.getMachineID();
+ if (isequal != (slotmachineid == machineid)) {
+ throw new Error("Server Error: Trying to insert rejected message for slot " + seqnum);
+ }
+ }
+ }
+
+ HashSet<Long> watchset = new HashSet<Long>();
+ for (Map.Entry<Long, Pair<Long, Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
+ long entry_mid = lastmsg_entry.getKey();
+ /* We've seen it, don't need to continue to watch. Our next
+ * message will implicitly acknowledge it. */
+ if (entry_mid == localmachineid)
+ continue;
+ Pair<Long, Liveness> v = lastmsg_entry.getValue();
+ long entry_seqn = v.getFirst();
+ if (entry_seqn < newseqnum) {
+ addWatchList(entry_mid, entry);
+ watchset.add(entry_mid);
+ }
+ }
+ if (watchset.isEmpty())
+ entry.setDead();
+ else
+ entry.setWatchSet(watchset);
+ }
+
+ private void processEntry(NewKey entry) {
+ arbitratorTable.put(entry.getKey(), entry.getMachineID());
+
+ NewKey oldNewKey = newKeyTable.put(entry.getKey(), entry);
+
+ if (oldNewKey != null) {
+ oldNewKey.setDead();
+ }
+ }
+
+ private void processEntry(Transaction entry) {
+
+ long arb = entry.getArbitrator();
+ Long comLast = lastCommitSeenTransSeqNumMap.get(arb);
+ Long abLast = lastAbortSeenSeqNumMap.get(arb);
+
+ Transaction prevTrans = null;
+
+ if ((comLast != null) && (comLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else if ((abLast != null) && (abLast >= entry.getSequenceNumber())) {
+ prevTrans = uncommittedTransactionsMap.remove(entry.getSequenceNumber());
+ } else {
+ prevTrans = uncommittedTransactionsMap.put(entry.getSequenceNumber(), entry);
+ }
+
+ // Duplicate so delete old copy
+ if (prevTrans != null) {
+ prevTrans.setDead();
+ }
+ }
+
+ private void processEntry(Abort entry) {
+ if (lastmessagetable.get(entry.getMachineID()).getFirst() < entry.getTransSequenceNumber()) {
+ // Abort has not been seen yet so we need to keep track of it
+
+ Abort prevAbort = abortMap.put(entry.getTransSequenceNumber(), entry);
+ if (prevAbort != null) {
+ prevAbort.setDead(); // delete old version of the duplicate
+ }
+
+ if ((lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()) != null) && (entry.getTransSequenceNumber() > lastAbortSeenSeqNumMap.get(entry.getTransArbitrator()))) {
+ lastAbortSeenSeqNumMap.put(entry.getTransArbitrator(), entry.getTransSequenceNumber());
+ }
+ } else {
+ // The machine already saw this so it is dead
+ entry.setDead();
+ }
+
+ // Update the status of the transaction and remove it since we are done with this transaction
+ TransactionStatus status = transactionStatusMap.remove(entry.getTransSequenceNumber());
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+
+ private void processEntry(Commit entry) {
+ Map<Long, Commit> arbMap = newCommitMap.get(entry.getTransArbitrator());
+
+ if (arbMap == null) {
+ arbMap = new HashMap<Long, Commit>();
+ }
+
+ Commit prevCommit = arbMap.put(entry.getSequenceNumber(), entry);
+ newCommitMap.put(entry.getTransArbitrator(), arbMap);
+
+ if (prevCommit != null) {
+ prevCommit.setDead();
+ }
+ }
+
+ private void processEntry(TableStatus entry) {
+ int newnumslots = entry.getMaxSlots();
+ // updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
+
+ if ((smallestTableStatusSeen == -1) || (newnumslots < smallestTableStatusSeen)) {
+ smallestTableStatusSeen = newnumslots;
+ }
+
+ if ((largestTableStatusSeen == -1) || (newnumslots > largestTableStatusSeen)) {
+ largestTableStatusSeen = newnumslots;
+ }
+ }
+
+ private void addWatchList(long machineid, RejectedMessage entry) {
+ HashSet<RejectedMessage> entries = watchlist.get(machineid);
+ if (entries == null)
+ watchlist.put(machineid, entries = new HashSet<RejectedMessage>());
+ entries.add(entry);
+ }
+
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ machineSet.remove(machineid);
+
+ HashSet<RejectedMessage> watchset = watchlist.get(machineid);
+ if (watchset != null) {
+ for (Iterator<RejectedMessage> rmit = watchset.iterator(); rmit.hasNext(); ) {
+ RejectedMessage rm = rmit.next();
+ if (rm.getNewSeqNum() <= seqnum) {
+ /* Remove it from our watchlist */
+ rmit.remove();
+ /* Decrement machines that need to see this notification */
+ rm.removeWatcher(machineid);
+ }
+ }
+ }
+
+ if (machineid == localmachineid) {
+ /* Our own messages are immediately dead. */
+ if (liveness instanceof LastMessage) {
+ ((LastMessage)liveness).setDead();
+ } else if (liveness instanceof Slot) {
+ ((Slot)liveness).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+ // Set dead the abort
+ for (Iterator<Map.Entry<Long, Abort>> i = abortMap.entrySet().iterator(); i.hasNext();) {
+ Abort abort = i.next().getValue();
+
+ if ((abort.getMachineID() == machineid) && (abort.getTransSequenceNumber() <= seqnum)) {
+ abort.setDead();
+ i.remove();
+ }
+ }
+
+ Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
+ if (lastmsgentry == null)
+ return;
+
+ long lastmsgseqnum = lastmsgentry.getFirst();
+ Liveness lastentry = lastmsgentry.getSecond();
+ if (machineid != localmachineid) {
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).setDead();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+ if (machineid == localmachineid) {
+ if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqnum + " got: " + lastmsgseqnum);
+ } else {
+ if (lastmsgseqnum > seqnum)
+ throw new Error("Server Error: Rollback on remote machine sequence number");
+ }
+ }
+
+ private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
+ updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
+ for (Entry entry : slot.getEntries()) {
+ switch (entry.getType()) {
+
+ case Entry.TypeNewKey:
+ processEntry((NewKey)entry);
+ break;
+
+ case Entry.TypeCommit:
+ processEntry((Commit)entry);
+ break;
+
+ case Entry.TypeAbort:
+ processEntry((Abort)entry);
+ break;
+
+ case Entry.TypeTransaction:
+ processEntry((Transaction)entry);
+ break;
+
+ case Entry.TypeLastMessage:
+ processEntry((LastMessage)entry, machineSet);
+ break;
+
+ case Entry.TypeRejectedMessage:
+ processEntry((RejectedMessage)entry, indexer);
+ break;
+
+ case Entry.TypeTableStatus:
+ processEntry((TableStatus)entry);
+ break;
+
+ default:
+ throw new Error("Unrecognized type: " + entry.getType());
+ }
+ }
+ }
+
+ private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+ for (int i = 0; i < newslots.length; i++) {
+ Slot currslot = newslots[i];
+ Slot prevslot = indexer.getSlot(currslot.getSequenceNumber() - 1);
+ if (prevslot != null &&
+ !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
+ throw new Error("Server Error: Invalid HMAC Chain" + currslot + " " + prevslot);
+ }
+ }
+}
\ No newline at end of file