--- /dev/null
+#include "Table.h"
+#include "CloudComm.h"
+#include "SlotBuffer.h"
+#include "NewKey.h"
+#include "Slot.h"
+#include "KeyValue.h"
+#include "Error.h"
+#include "PendingTransaction.h"
+#include "TableStatus.h"
+#include "TransactionStatus.h"
+#include "Transaction.h"
+#include "LastMessage.h"
+#include "SecureRandom.h"
+#include "ByteBuffer.h"
+#include "Abort.h"
+#include "CommitPart.h"
+#include "ArbitrationRound.h"
+#include "TransactionPart.h"
+#include "Commit.h"
+#include "RejectedMessage.h"
+#include "SlotIndexer.h"
+#include <stdlib.h>
+
+int compareInt64(const void *a, const void *b) {
+ const int64_t *pa = (const int64_t *) a;
+ const int64_t *pb = (const int64_t *) b;
+ if (*pa < *pb)
+ return -1;
+ else if (*pa > *pb)
+ return 1;
+ else
+ return 0;
+}
+
+Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
+ buffer(NULL),
+ cloud(new CloudComm(this, baseurl, password, listeningPort)),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumver(1),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localSequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedsize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
+ init();
+}
+
+Table::Table(CloudComm *_cloud, int64_t _localMachineId) :
+ buffer(NULL),
+ cloud(_cloud),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumver(1),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localSequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedsize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
+ init();
+}
+
+Table::~Table() {
+ delete cloud;
+ delete random;
+ delete buffer;
+ // init data structs
+ delete committedKeyValueTable;
+ delete speculatedKeyValueTable;
+ delete pendingTransactionSpeculatedKeyValueTable;
+ delete liveNewKeyTable;
+ {
+ SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+ while (lmit->hasNext()) {
+ Pair<int64_t, Liveness *> * pair = lastMessageTable->get(lmit->next());
+ delete pair;
+ }
+ delete lmit;
+ delete lastMessageTable;
+ }
+ if (pendingTransactionBuilder != NULL)
+ delete pendingTransactionBuilder;
+ {
+ SetIterator<int64_t, Hashset<RejectedMessage *> *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable);
+ while(rmit->hasNext()) {
+ int64_t machineid = rmit->next();
+ Hashset<RejectedMessage *> * rmset = rejectedMessageWatchVectorTable->get(machineid);
+ SetIterator<RejectedMessage *, RejectedMessage *> * mit = rmset->iterator();
+ while (mit->hasNext()) {
+ RejectedMessage * rm = mit->next();
+ delete rm;
+ }
+ delete mit;
+ delete rmset;
+ }
+ delete rmit;
+ delete rejectedMessageWatchVectorTable;
+ }
+ delete arbitratorTable;
+ delete liveAbortTable;
+ {
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+ while (partsit->hasNext()) {
+ int64_t machineId = partsit->next();
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+ SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+ while(pit->hasNext()) {
+ Pair<int64_t, int32_t> * pair=pit->next();
+ pit->currVal()->releaseRef();
+ }
+ delete pit;
+
+ delete parts;
+ }
+ delete partsit;
+ delete newTransactionParts;
+ }
+ {
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
+ while (partsit->hasNext()) {
+ int64_t machineId = partsit->next();
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal();
+ SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts);
+ while(pit->hasNext()) {
+ Pair<int64_t, int32_t> * pair=pit->next();
+ pit->currVal()->releaseRef();
+ }
+ delete pit;
+ delete parts;
+ }
+ delete partsit;
+ delete newCommitParts;
+ }
+ delete lastArbitratedTransactionNumberByArbitratorTable;
+ delete liveTransactionBySequenceNumberTable;
+ delete liveTransactionByTransactionIdTable;
+ {
+ SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
+ while (liveit->hasNext()) {
+ int64_t arbitratorId = liveit->next();
+
+ // Get all the commits for a specific arbitrator
+ Hashtable<int64_t, Commit *> *commitForClientTable = liveit->currVal();
+ {
+ SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
+ while (clientit->hasNext()) {
+ int64_t id = clientit->next();
+ delete commitForClientTable->get(id);
+ }
+ delete clientit;
+ }
+
+ delete commitForClientTable;
+ }
+ delete liveit;
+ delete liveCommitsTable;
+ }
+ delete liveCommitsByKeyTable;
+ delete lastCommitSeenSequenceNumberByArbitratorTable;
+ delete rejectedSlotVector;
+ {
+ uint size = pendingTransactionQueue->size();
+ for (uint iter = 0; iter < size; iter++) {
+ delete pendingTransactionQueue->get(iter);
+ }
+ delete pendingTransactionQueue;
+ }
+ delete pendingSendArbitrationEntriesToDelete;
+ {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ delete trit->currVal();
+ }
+ delete trit;
+ delete transactionPartsSent;
+ }
+ delete outstandingTransactionStatus;
+ delete liveAbortsGeneratedByLocal;
+ delete offlineTransactionsCommittedAndAtServer;
+ delete localCommunicationTable;
+ delete lastTransactionSeenFromMachineFromServer;
+ {
+ for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) {
+ delete pendingSendArbitrationRounds->get(i);
+ }
+ delete pendingSendArbitrationRounds;
+ }
+ if (lastTransactionPartsSent != NULL)
+ delete lastTransactionPartsSent;
+ delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+ if (lastNewKey)
+ delete lastNewKey;
+}
+
+/**
+ * Init all the stuff needed for for table usage
+ */
+void Table::init() {
+ // Init helper objects
+ random = new SecureRandom();
+ buffer = new SlotBuffer();
+
+ // init data structs
+ committedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ speculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+ liveNewKeyTable = new Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals >();
+ lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> * >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
+ arbitratorTable = new Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals>();
+ liveAbortTable = new Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
+ newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
+ lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
+ liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
+ liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> * >();
+ liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals>();
+ lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
+ rejectedSlotVector = new Vector<int64_t>();
+ pendingTransactionQueue = new Vector<Transaction *>();
+ pendingSendArbitrationEntriesToDelete = new Vector<Entry *>();
+ transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
+ outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
+ liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
+ offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> *>();
+ lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
+ pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable<int64_t, int64_t>();
+
+ // Other init stuff
+ numberOfSlots = buffer->capacity();
+ setResizeThreshold();
+}
+
+/**
+ * Initialize the table by inserting a table status as the first entry
+ * into the table status also initialize the crypto stuff.
+ */
+void Table::initTable() {
+ cloud->initSecurity();
+
+ // Create the first insertion into the block chain which is the table status
+ Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber);
+ localSequenceNumber++;
+ TableStatus *status = new TableStatus(s, numberOfSlots);
+ s->addShallowEntry(status);
+ Array<Slot *> *array = cloud->putSlot(s, numberOfSlots);
+
+ if (array == NULL) {
+ array = new Array<Slot *>(1);
+ array->set(0, s);
+ // update local block chain
+ validateAndUpdate(array, true);
+ delete array;
+ } else if (array->length() == 1) {
+ // in case we did push the slot BUT we failed to init it
+ validateAndUpdate(array, true);
+ delete s;
+ delete array;
+ } else {
+ delete s;
+ delete array;
+ throw new Error("Error on initialization");
+ }
+}
+
+/**
+ * Rebuild the table from scratch by pulling the latest block chain
+ * from the server.
+ */
+void Table::rebuild() {
+ // Just pull the latest slots from the server
+ Array<Slot *> *newslots = cloud->getSlots(sequenceNumber + 1);
+ validateAndUpdate(newslots, true);
+ delete newslots;
+ sendToServer(NULL);
+ updateLiveTransactionsAndStatus();
+}
+
+void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) {
+ localCommunicationTable->put(arbitrator, new Pair<IoTString *, int32_t>(hostName, portNumber));
+}
+
+int64_t Table::getArbitrator(IoTString *key) {
+ return arbitratorTable->get(key);
+}
+
+void Table::close() {
+ cloud->closeCloud();
+}
+
+IoTString *Table::getCommitted(IoTString *key) {
+ KeyValue *kv = committedKeyValueTable->get(key);
+
+ if (kv != NULL) {
+ return new IoTString(kv->getValue());
+ } else {
+ return NULL;
+ }
+}
+
+IoTString *Table::getSpeculative(IoTString *key) {
+ KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
+
+ if (kv == NULL) {
+ kv = speculatedKeyValueTable->get(key);
+ }
+
+ if (kv == NULL) {
+ kv = committedKeyValueTable->get(key);
+ }
+
+ if (kv != NULL) {
+ return new IoTString(kv->getValue());
+ } else {
+ return NULL;
+ }
+}
+
+IoTString *Table::getCommittedAtomic(IoTString *key) {
+ KeyValue *kv = committedKeyValueTable->get(key);
+
+ if (!arbitratorTable->contains(key)) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ if (kv != NULL) {
+ pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
+ return new IoTString(kv->getValue());
+ } else {
+ pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
+ return NULL;
+ }
+}
+
+IoTString *Table::getSpeculativeAtomic(IoTString *key) {
+ if (!arbitratorTable->contains(key)) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key);
+
+ if (kv == NULL) {
+ kv = speculatedKeyValueTable->get(key);
+ }
+
+ if (kv == NULL) {
+ kv = committedKeyValueTable->get(key);
+ }
+
+ if (kv != NULL) {
+ pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue()));
+ return new IoTString(kv->getValue());
+ } else {
+ pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL));
+ return NULL;
+ }
+}
+
+bool Table::update() {
+ try {
+ Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
+ validateAndUpdate(newSlots, false);
+ delete newSlots;
+ sendToServer(NULL);
+ updateLiveTransactionsAndStatus();
+ return true;
+ } catch (Exception *e) {
+ SetIterator<int64_t, Pair<IoTString *, int32_t> *> *kit = getKeyIterator(localCommunicationTable);
+ while (kit->hasNext()) {
+ int64_t m = kit->next();
+ updateFromLocal(m);
+ }
+ delete kit;
+ }
+
+ return false;
+}
+
+bool Table::createNewKey(IoTString *keyName, int64_t machineId) {
+ while (true) {
+ if (arbitratorTable->contains(keyName)) {
+ // There is already an arbitrator
+ return false;
+ }
+ NewKey *newKey = new NewKey(NULL, keyName, machineId);
+
+ if (sendToServer(newKey)) {
+ // If successfully inserted
+ return true;
+ }
+ }
+}
+
+void Table::startTransaction() {
+ // Create a new transaction, invalidates any old pending transactions.
+ if (pendingTransactionBuilder != NULL)
+ delete pendingTransactionBuilder;
+ pendingTransactionBuilder = new PendingTransaction(localMachineId);
+}
+
+void Table::put(IoTString *key, IoTString *value) {
+ // Make sure it is a valid key
+ if (!arbitratorTable->contains(key)) {
+ throw new Error("Key not Found.");
+ }
+
+ // Make sure new key value pair matches the current arbitrator
+ if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) {
+ // TODO: Maybe not throw en error
+ throw new Error("Not all Key Values Match Arbitrator.");
+ }
+
+ // Add the key value to this transaction
+ KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value));
+ pendingTransactionBuilder->addKV(kv);
+}
+
+TransactionStatus *Table::commitTransaction() {
+ if (pendingTransactionBuilder->getKVUpdates()->size() == 0) {
+ // transaction with no updates will have no effect on the system
+ return new TransactionStatus(TransactionStatus_StatusNoEffect, -1);
+ }
+
+ // Set the local transaction sequence number and increment
+ pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber);
+ localTransactionSequenceNumber++;
+
+ // Create the transaction status
+ TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator());
+
+ // Create the new transaction
+ Transaction *newTransaction = pendingTransactionBuilder->createTransaction();
+ newTransaction->setTransactionStatus(transactionStatus);
+
+ if (pendingTransactionBuilder->getArbitrator() != localMachineId) {
+ // Add it to the queue and invalidate the builder for safety
+ pendingTransactionQueue->add(newTransaction);
+ } else {
+ arbitrateOnLocalTransaction(newTransaction);
+ delete newTransaction;
+ updateLiveStateFromLocal();
+ }
+ if (pendingTransactionBuilder != NULL)
+ delete pendingTransactionBuilder;
+
+ pendingTransactionBuilder = new PendingTransaction(localMachineId);
+
+ try {
+ sendToServer(NULL);
+ } catch (ServerException *e) {
+
+ Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
+ uint size = pendingTransactionQueue->size();
+ uint oldindex = 0;
+ for (uint iter = 0; iter < size; iter++) {
+ Transaction *transaction = pendingTransactionQueue->get(iter);
+ pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
+
+ if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
+ // Already contacted this client so ignore all attempts to contact this client
+ // to preserve ordering for arbitrator
+ continue;
+ }
+
+ Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
+
+ if (sendReturn.getFirst()) {
+ // Failed to contact over local
+ arbitratorTriedAndFailed->add(transaction->getArbitrator());
+ } else {
+ // Successful contact or should not contact
+
+ if (sendReturn.getSecond()) {
+ // did arbitrate
+ delete transaction;
+ oldindex--;
+ }
+ }
+ }
+ pendingTransactionQueue->setSize(oldindex);
+ }
+
+ updateLiveStateFromLocal();
+
+ return transactionStatus;
+}
+
+/**
+ * Recalculate the new resize threshold
+ */
+void Table::setResizeThreshold() {
+ int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots);
+ bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower);
+}
+
+int64_t Table::getLocalSequenceNumber() {
+ return localSequenceNumber;
+}
+
+void Table::processTransactionList(bool handlePartial) {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Update which transactions parts still need to be sent
+ transaction->removeSentParts(lastTransactionPartsSent->get(transaction));
+ // Add the transaction status to the outstanding list
+ outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus());
+
+ // Update the transaction status
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial);
+
+ // Check if all the transaction parts were successfully
+ // sent and if so then remove it from pending
+ if (transaction->didSendAllParts()) {
+ transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully);
+ pendingTransactionQueue->remove(transaction);
+ delete transaction;
+ } else if (handlePartial) {
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ }
+ delete trit;
+}
+
+NewKey * Table::handlePartialSend(NewKey * newKey) {
+ //Didn't receive acknowledgement for last send
+ //See if the server has received a newer slot
+
+ Array<Slot *> *newSlots = cloud->getSlots(sequenceNumber + 1);
+ if (newSlots->length() == 0) {
+ //Retry sending old slot
+ bool wasInserted = false;
+ bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots);
+
+ if (sendSlotsReturn) {
+ lastSlotAttemptedToSend = NULL;
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
+ newKey = NULL;
+ }
+ }
+ processTransactionList(false);
+ } else {
+ if (checkSend(newSlots, lastSlotAttemptedToSend)) {
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
+ newKey = NULL;
+ }
+ }
+ processTransactionList(true);
+ }
+ }
+
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+
+ if (newSlots->length() != 0) {
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ } else {
+ if (checkSend(newSlots, lastSlotAttemptedToSend)) {
+ if (newKey != NULL) {
+ if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
+ delete newKey;
+ newKey = NULL;
+ }
+ }
+
+ processTransactionList(true);
+ } else {
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(lastTransactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetServerFailure();
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+ }
+
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ delete newSlots;
+ return newKey;
+}
+
+void Table::clearSentParts() {
+ // Clear the sent data since we are trying again
+ pendingSendArbitrationEntriesToDelete->clear();
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ delete trit->currVal();
+ }
+ delete trit;
+ transactionPartsSent->clear();
+}
+
+bool Table::sendToServer(NewKey *newKey) {
+ if (hadPartialSendToServer) {
+ newKey = handlePartialSend(newKey);
+ }
+
+ try {
+ // While we have stuff that needs inserting into the block chain
+ while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) {
+ if (hadPartialSendToServer) {
+ throw new Error("Should Be error free");
+ }
+
+ // If there is a new key with same name then end
+ if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) {
+ delete newKey;
+ return false;
+ }
+
+ // Create the slot
+ Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array<char>(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber);
+ localSequenceNumber++;
+
+ // Try to fill the slot with data
+ int newSize = 0;
+ bool insertedNewKey = false;
+ bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
+
+ if (needsResize) {
+ // Reset which transaction to send
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetNextPartToSend();
+
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+
+ // Clear the sent data since we are trying again
+ clearSentParts();
+
+ // We needed a resize so try again
+ fillSlot(slot, true, newKey, newSize, insertedNewKey);
+ }
+ if (lastSlotAttemptedToSend != NULL)
+ delete lastSlotAttemptedToSend;
+
+ lastSlotAttemptedToSend = slot;
+ lastIsNewKey = (newKey != NULL);
+ lastInsertedNewKey = insertedNewKey;
+ lastNewSize = newSize;
+ if (( newKey != lastNewKey) && (lastNewKey != NULL))
+ delete lastNewKey;
+ lastNewKey = newKey;
+ if (lastTransactionPartsSent != NULL)
+ delete lastTransactionPartsSent;
+ lastTransactionPartsSent = transactionPartsSent->clone();
+
+ Array<Slot *> * newSlots = NULL;
+ bool wasInserted = false;
+ bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots);
+
+ if (sendSlotsReturn) {
+ lastSlotAttemptedToSend = NULL;
+ // Did insert into the block chain
+ if (insertedNewKey) {
+ // This slot was what was inserted not a previous slot
+ // New Key was successfully inserted into the block chain so dont want to insert it again
+ newKey = NULL;
+ }
+
+ // Remove the aborts and commit parts that were sent from the pending to send queue
+ uint size = pendingSendArbitrationRounds->size();
+ uint oldcount = 0;
+ for (uint i = 0; i < size; i++) {
+ ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
+ round->removeParts(pendingSendArbitrationEntriesToDelete);
+
+ if (!round->isDoneSending()) {
+ //Add part back in
+ pendingSendArbitrationRounds->set(oldcount++,
+ pendingSendArbitrationRounds->get(i));
+ } else
+ delete pendingSendArbitrationRounds->get(i);
+ }
+ pendingSendArbitrationRounds->setSize(oldcount);
+ processTransactionList(false);
+ } else {
+ // Reset which transaction to send
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetNextPartToSend();
+
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+ }
+
+ // Clear the sent data in preparation for next send
+ clearSentParts();
+
+ if (newSlots->length() != 0) {
+ // insert into the local block chain
+ validateAndUpdate(newSlots, true);
+ }
+ delete newSlots;
+ }
+ } catch (ServerException *e) {
+ if (e->getType() != ServerException_TypeInputTimeout) {
+ // Nothing was able to be sent to the server so just clear these data structures
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetNextPartToSend();
+
+ // Set the transaction sequence number back to nothing
+ if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) {
+ transaction->setSequenceNumber(-1);
+ }
+ }
+ delete trit;
+ } else {
+ // There was a partial send to the server
+ hadPartialSendToServer = true;
+
+ // Nothing was able to be sent to the server so just clear these data structures
+ SetIterator<Transaction *, Vector<int> *> *trit = getKeyIterator(transactionPartsSent);
+ while (trit->hasNext()) {
+ Transaction *transaction = trit->next();
+ transaction->resetNextPartToSend();
+ transaction->setServerFailure();
+ }
+ delete trit;
+ }
+
+ clearSentParts();
+
+ throw e;
+ }
+
+ return newKey == NULL;
+}
+
+bool Table::updateFromLocal(int64_t machineId) {
+ if (!localCommunicationTable->contains(machineId))
+ return false;
+
+ Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(machineId);
+
+ // Get the size of the send data
+ int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
+
+ int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId);
+ }
+
+ Array<char> *sendData = new Array<char>(sendDataSize);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
+
+ // Encode the data
+ bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode->putInt(0);
+
+ // Send by local
+ Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+ localSequenceNumber++;
+
+ if (returnData == NULL) {
+ // Could not contact server
+ return false;
+ }
+
+ // Decode the data
+ ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
+ int numberOfEntries = bbDecode->getInt();
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ char type = bbDecode->get();
+ if (type == TypeAbort) {
+ Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
+ processEntry(abort);
+ } else if (type == TypeCommitPart) {
+ CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ return true;
+}
+
+Pair<bool, bool> Table::sendTransactionToLocal(Transaction *transaction) {
+
+ // Get the devices local communications
+ if (!localCommunicationTable->contains(transaction->getArbitrator()))
+ return Pair<bool, bool>(true, false);
+
+ Pair<IoTString *, int32_t> *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator());
+
+ // Get the size of the send data
+ int sendDataSize = sizeof(int32_t) + sizeof(int64_t);
+ {
+ Vector<TransactionPart *> *tParts = transaction->getParts();
+ uint tPartsSize = tParts->size();
+ for (uint i = 0; i < tPartsSize; i++) {
+ TransactionPart *part = tParts->get(i);
+ sendDataSize += part->getSize();
+ }
+ }
+
+ int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator());
+ }
+
+ // Make the send data size
+ Array<char> *sendData = new Array<char>(sendDataSize);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(sendData);
+
+ // Encode the data
+ bbEncode->putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode->putInt(transaction->getParts()->size());
+ {
+ Vector<TransactionPart *> *tParts = transaction->getParts();
+ uint tPartsSize = tParts->size();
+ for (uint i = 0; i < tPartsSize; i++) {
+ TransactionPart *part = tParts->get(i);
+ part->encode(bbEncode);
+ }
+ }
+
+ // Send by local
+ Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+ localSequenceNumber++;
+
+ if (returnData == NULL) {
+ // Could not contact server
+ return Pair<bool, bool>(true, false);
+ }
+
+ // Decode the data
+ ByteBuffer *bbDecode = ByteBuffer_wrap(returnData);
+ bool didCommit = bbDecode->get() == 1;
+ bool couldArbitrate = bbDecode->get() == 1;
+ int numberOfEntries = bbDecode->getInt();
+ bool foundAbort = false;
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ char type = bbDecode->get();
+ if (type == TypeAbort) {
+ Abort *abort = (Abort *)Abort_decode(NULL, bbDecode);
+
+ if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
+ foundAbort = true;
+ }
+
+ processEntry(abort);
+ } else if (type == TypeCommitPart) {
+ CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ if (couldArbitrate) {
+ TransactionStatus *status = transaction->getTransactionStatus();
+ if (didCommit) {
+ status->setStatus(TransactionStatus_StatusCommitted);
+ } else {
+ status->setStatus(TransactionStatus_StatusAborted);
+ }
+ } else {
+ TransactionStatus *status = transaction->getTransactionStatus();
+ if (foundAbort) {
+ status->setStatus(TransactionStatus_StatusAborted);
+ } else {
+ status->setStatus(TransactionStatus_StatusCommitted);
+ }
+ }
+
+ return Pair<bool, bool>(false, true);
+}
+
+Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
+ // Decode the data
+ ByteBuffer *bbDecode = ByteBuffer_wrap(data);
+ int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong();
+ int numberOfParts = bbDecode->getInt();
+
+ // If we did commit a transaction or not
+ bool didCommit = false;
+ bool couldArbitrate = false;
+
+ if (numberOfParts != 0) {
+
+ // decode the transaction
+ Transaction *transaction = new Transaction();
+ for (int i = 0; i < numberOfParts; i++) {
+ bbDecode->get();
+ TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode);
+ transaction->addPartDecode(newPart);
+ }
+
+ // Arbitrate on transaction and pull relevant return data
+ Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ couldArbitrate = localArbitrateReturn.getFirst();
+ didCommit = localArbitrateReturn.getSecond();
+
+ updateLiveStateFromLocal();
+
+ // Transaction was sent to the server so keep track of it to prevent double commit
+ if (transaction->getSequenceNumber() != -1) {
+ offlineTransactionsCommittedAndAtServer->add(new Pair<int64_t, int64_t>(transaction->getId()));
+ }
+ }
+
+ // The data to send back
+ int returnDataSize = 0;
+ Vector<Entry *> *unseenArbitrations = new Vector<Entry *>();
+
+ // Get the aborts to send back
+ Vector<int64_t> *abortLocalSequenceNumbers = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Abort *> *abortit = getKeyIterator(liveAbortsGeneratedByLocal);
+ while (abortit->hasNext())
+ abortLocalSequenceNumbers->add(abortit->next());
+ delete abortit;
+ }
+
+ qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
+
+ uint asize = abortLocalSequenceNumbers->size();
+ for (uint i = 0; i < asize; i++) {
+ int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i);
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber);
+ unseenArbitrations->add(abort);
+ returnDataSize += abort->getSize();
+ }
+
+ // Get the commits to send back
+ Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(localMachineId);
+ if (commitForClientTable != NULL) {
+ Vector<int64_t> *commitLocalSequenceNumbers = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Commit *> *commitit = getKeyIterator(commitForClientTable);
+ while (commitit->hasNext())
+ commitLocalSequenceNumbers->add(commitit->next());
+ delete commitit;
+ }
+ qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64);
+
+ uint clsSize = commitLocalSequenceNumbers->size();
+ for (uint clsi = 0; clsi < clsSize; clsi++) {
+ int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi);
+ Commit *commit = commitForClientTable->get(localSequenceNumber);
+
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ {
+ Vector<CommitPart *> *parts = commit->getParts();
+ uint nParts = parts->size();
+ for (uint i = 0; i < nParts; i++) {
+ CommitPart *commitPart = parts->get(i);
+ unseenArbitrations->add(commitPart);
+ returnDataSize += commitPart->getSize();
+ }
+ }
+ }
+ }
+
+ // Number of arbitration entries to decode
+ returnDataSize += 2 * sizeof(int32_t);
+
+ // bool of did commit or not
+ if (numberOfParts != 0) {
+ returnDataSize += sizeof(char);
+ }
+
+ // Data to send Back
+ Array<char> *returnData = new Array<char>(returnDataSize);
+ ByteBuffer *bbEncode = ByteBuffer_wrap(returnData);
+
+ if (numberOfParts != 0) {
+ if (didCommit) {
+ bbEncode->put((char)1);
+ } else {
+ bbEncode->put((char)0);
+ }
+ if (couldArbitrate) {
+ bbEncode->put((char)1);
+ } else {
+ bbEncode->put((char)0);
+ }
+ }
+
+ bbEncode->putInt(unseenArbitrations->size());
+ uint size = unseenArbitrations->size();
+ for (uint i = 0; i < size; i++) {
+ Entry *entry = unseenArbitrations->get(i);
+ entry->encode(bbEncode);
+ }
+
+ localSequenceNumber++;
+ return returnData;
+}
+
+/** Checks whether a given slot was sent using new slots in
+ array. Returns true if sent and false otherwise. */
+
+bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
+ uint size = array->length();
+ for (uint i = 0; i < size; i++) {
+ Slot *s = array->get(i);
+ if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+ return true;
+ }
+ }
+
+ //Also need to see if other machines acknowledged our message
+ for (uint i = 0; i < size; i++) {
+ Slot *s = array->get(i);
+
+ // Process each entry in the slot
+ Vector<Entry *> *entries = s->getEntries();
+ uint eSize = entries->size();
+ for (uint ei = 0; ei < eSize; ei++) {
+ Entry *entry = entries->get(ei);
+
+ if (entry->getType() == TypeLastMessage) {
+ LastMessage *lastMessage = (LastMessage *)entry;
+
+ if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
+ return true;
+ }
+ }
+ }
+ }
+ //Not found
+ return false;
+}
+
+/** Method tries to send slot to server. Returns status in tuple.
+ isInserted returns whether last un-acked send (if any) was
+ successful. Returns whether send was confirmed.x
+ */
+
+bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array<Slot *> **array) {
+ attemptedToSendToServer = true;
+
+ *array = cloud->putSlot(slot, newSize);
+ if (*array == NULL) {
+ *array = new Array<Slot *>(1);
+ (*array)->set(0, slot);
+ rejectedSlotVector->clear();
+ *isInserted = false;
+ return true;
+ } else {
+ if ((*array)->length() == 0) {
+ throw new Error("Server Error: Did not send any slots");
+ }
+
+ if (hadPartialSendToServer) {
+ *isInserted = checkSend(*array, slot);
+
+ if (!(*isInserted)) {
+ rejectedSlotVector->add(slot->getSequenceNumber());
+ }
+
+ return false;
+ } else {
+ rejectedSlotVector->add(slot->getSequenceNumber());
+ *isInserted = false;
+ return false;
+ }
+ }
+}
+
+/**
+ * Returns true if a resize was needed but not done.
+ */
+bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
+ newSize = 0;//special value to indicate no resize
+ if (liveSlotCount > bufferResizeThreshold) {
+ resize = true;//Resize is forced
+ }
+
+ if (resize) {
+ newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE);
+ TableStatus *status = new TableStatus(slot, newSize);
+ slot->addShallowEntry(status);
+ }
+
+ // Fill with rejected slots first before doing anything else
+ doRejectedMessages(slot);
+
+ // Do mandatory rescue of entries
+ ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryRescue(slot, resize);
+
+ // Extract working variables
+ bool needsResize = mandatoryRescueReturn.getFirst();
+ bool seenLiveSlot = mandatoryRescueReturn.getSecond();
+ int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
+
+ if (needsResize && !resize) {
+ // We need to resize but we are not resizing so return true to force on retry
+ return true;
+ }
+
+ insertedKey = false;
+ if (newKeyEntry != NULL) {
+ newKeyEntry->setSlot(slot);
+ if (slot->hasSpace(newKeyEntry)) {
+ slot->addEntry(newKeyEntry);
+ insertedKey = true;
+ }
+ }
+
+ // Clear the transactions, aborts and commits that were sent previously
+ clearSentParts();
+ uint size = pendingSendArbitrationRounds->size();
+ for (uint i = 0; i < size; i++) {
+ ArbitrationRound *round = pendingSendArbitrationRounds->get(i);
+ bool isFull = false;
+ round->generateParts();
+ Vector<Entry *> *parts = round->getParts();
+
+ // Insert pending arbitration data
+ uint vsize = parts->size();
+ for (uint vi = 0; vi < vsize; vi++) {
+ Entry *arbitrationData = parts->get(vi);
+
+ // If it is an abort then we need to set some information
+ if (arbitrationData->getType() == TypeAbort) {
+ ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber());
+ }
+
+ if (!slot->hasSpace(arbitrationData)) {
+ // No space so cant do anything else with these data entries
+ isFull = true;
+ break;
+ }
+
+ // Add to this current slot and add it to entries to delete
+ slot->addEntry(arbitrationData);
+ pendingSendArbitrationEntriesToDelete->add(arbitrationData);
+ }
+
+ if (isFull) {
+ break;
+ }
+ }
+
+ if (pendingTransactionQueue->size() > 0) {
+ Transaction *transaction = pendingTransactionQueue->get(0);
+ // Set the transaction sequence number if it has yet to be inserted into the block chain
+ if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) {
+ transaction->setSequenceNumber(slot->getSequenceNumber());
+ }
+
+ while (true) {
+ TransactionPart *part = transaction->getNextPartToSend();
+ if (part == NULL) {
+ // Ran out of parts to send for this transaction so move on
+ break;
+ }
+
+ if (slot->hasSpace(part)) {
+ slot->addEntry(part);
+ Vector<int32_t> *partsSent = transactionPartsSent->get(transaction);
+ if (partsSent == NULL) {
+ partsSent = new Vector<int32_t>();
+ transactionPartsSent->put(transaction, partsSent);
+ }
+ partsSent->add(part->getPartNumber());
+ transactionPartsSent->put(transaction, partsSent);
+ } else {
+ break;
+ }
+ }
+ }
+
+ // Fill the remainder of the slot with rescue data
+ doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
+
+ return false;
+}
+
+void Table::doRejectedMessages(Slot *s) {
+ if (!rejectedSlotVector->isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e->g->,
+ * equalsto value of true and same sequence number)-> */
+
+ int64_t old_seqn = rejectedSlotVector->get(0);
+ if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) {
+ int64_t new_seqn = rejectedSlotVector->lastElement();
+ RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false);
+ s->addShallowEntry(rm);
+ } else {
+ int64_t prev_seqn = -1;
+ uint i = 0;
+ /* Go through list of missing messages */
+ for (; i < rejectedSlotVector->size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector->get(i);
+ Slot *s_msg = buffer->getSlot(curr_seqn);
+ if (s_msg != NULL)
+ break;
+ prev_seqn = curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false);
+ s->addShallowEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for (; i < rejectedSlotVector->size(); i++) {
+ int64_t curr_seqn = rejectedSlotVector->get(i);
+ Slot *s_msg = buffer->getSlot(curr_seqn);
+ int64_t machineid = s_msg->getMachineID();
+ RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true);
+ s->addShallowEntry(rm);
+ }
+ }
+ }
+}
+
+ThreeTuple<bool, bool, int64_t> Table::doMandatoryRescue(Slot *slot, bool resize) {
+ int64_t newestSequenceNumber = buffer->getNewestSeqNum();
+ int64_t oldestSequenceNumber = buffer->getOldestSeqNum();
+ if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) {
+ oldestLiveSlotSequenceNumver = oldestSequenceNumber;
+ }
+
+ int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver;
+ bool seenLiveSlot = false;
+ int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full
+ int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point
+
+
+ // Mandatory Rescue
+ for (; currentSequenceNumber < threshold; currentSequenceNumber++) {
+ Slot *previousSlot = buffer->getSlot(currentSequenceNumber);
+ // Push slot number forward
+ if (!seenLiveSlot) {
+ oldestLiveSlotSequenceNumver = currentSequenceNumber;
+ }
+
+ if (!previousSlot->isLive()) {
+ continue;
+ }
+
+ // We have seen a live slot
+ seenLiveSlot = true;
+
+ // Get all the live entries for a slot
+ Vector<Entry *> *liveEntries = previousSlot->getLiveEntries(resize);
+
+ // Iterate over all the live entries and try to rescue them
+ uint lESize = liveEntries->size();
+ for (uint i = 0; i < lESize; i++) {
+ Entry *liveEntry = liveEntries->get(i);
+ if (slot->hasSpace(liveEntry)) {
+ // Enough space to rescue the entry
+ slot->addEntry(liveEntry);
+ } else if (currentSequenceNumber == firstIfFull) {
+ //if there's no space but the entry is about to fall off the queue
+ return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
+ }
+ }
+ }
+
+ // Did not resize
+ return ThreeTuple<bool, bool, int64_t>(false, seenLiveSlot, currentSequenceNumber);
+}
+
+void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) {
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount = 0;
+ int64_t newestseqnum = buffer->getNewestSeqNum();
+ for (; seqn <= newestseqnum; seqn++) {
+ Slot *prevslot = buffer->getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ oldestLiveSlotSequenceNumver = seqn;
+
+ if (!prevslot->isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry *> *liveentries = prevslot->getLiveEntries(resize);
+ uint lESize = liveentries->size();
+ for (uint i = 0; i < lESize; i++) {
+ Entry *liveentry = liveentries->get(i);
+ if (s->hasSpace(liveentry))
+ s->addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > Table_SKIP_THRESHOLD) {
+ delete liveentries;
+ goto donesearch;
+ }
+ }
+ }
+ delete liveentries;
+ }
+donesearch:
+ ;
+}
+
+/**
+ * Checks for malicious activity and updates the local copy of the block chain->
+ */
+void Table::validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal) {
+ // The cloud communication layer has checked slot HMACs already
+ // before decoding
+ if (newSlots->length() == 0) {
+ return;
+ }
+
+ // Make sure all slots are newer than the last largest slot this
+ // client has seen
+ int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber();
+ if (firstSeqNum <= sequenceNumber) {
+ throw new Error("Server Error: Sent older slots!");
+ }
+
+ // Create an object that can access both new slots and slots in our
+ // local chain without committing slots to our local chain
+ SlotIndexer *indexer = new SlotIndexer(newSlots, buffer);
+
+ // Check that the HMAC chain is not broken
+ checkHMACChain(indexer, newSlots);
+
+ // Set to keep track of messages from clients
+ Hashset<int64_t> *machineSet = new Hashset<int64_t>();
+ {
+ SetIterator<int64_t, Pair<int64_t, Liveness *> *> *lmit = getKeyIterator(lastMessageTable);
+ while (lmit->hasNext())
+ machineSet->add(lmit->next());
+ delete lmit;
+ }
+
+ // Process each slots data
+ {
+ uint numSlots = newSlots->length();
+ for (uint i = 0; i < numSlots; i++) {
+ Slot *slot = newSlots->get(i);
+ processSlot(indexer, slot, acceptUpdatesToLocal, machineSet);
+ updateExpectedSize();
+ }
+ }
+ delete indexer;
+
+ // If there is a gap, check to see if the server sent us
+ // everything->
+ if (firstSeqNum != (sequenceNumber + 1)) {
+
+ // Check the size of the slots that were sent down by the server->
+ // Can only check the size if there was a gap
+ checkNumSlots(newSlots->length());
+
+ // Since there was a gap every machine must have pushed a slot or
+ // must have a last message message-> If not then the server is
+ // hiding slots
+ if (!machineSet->isEmpty()) {
+ delete machineSet;
+ throw new Error("Missing record for machines: ");
+ }
+ }
+ delete machineSet;
+ // Update the size of our local block chain->
+ commitNewMaxSize();
+
+ // Commit new to slots to the local block chain->
+ {
+ uint numSlots = newSlots->length();
+ for (uint i = 0; i < numSlots; i++) {
+ Slot *slot = newSlots->get(i);
+
+ // Insert this slot into our local block chain copy->
+ buffer->putSlot(slot);
+
+ // Keep track of how many slots are currently live (have live data
+ // in them)->
+ liveSlotCount++;
+ }
+ }
+ // Get the sequence number of the latest slot in the system
+ sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber();
+ updateLiveStateFromServer();
+
+ // No Need to remember after we pulled from the server
+ offlineTransactionsCommittedAndAtServer->clear();
+
+ // This is invalidated now
+ hadPartialSendToServer = false;
+}
+
+void Table::updateLiveStateFromServer() {
+ // Process the new transaction parts
+ processNewTransactionParts();
+
+ // Do arbitration on new transactions that were received
+ arbitrateFromServer();
+
+ // Update all the committed keys
+ bool didCommitOrSpeculate = updateCommittedTable();
+
+ // Delete the transactions that are now dead
+ updateLiveTransactionsAndStatus();
+
+ // Do speculations
+ didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
+ updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
+}
+
+void Table::updateLiveStateFromLocal() {
+ // Update all the committed keys
+ bool didCommitOrSpeculate = updateCommittedTable();
+
+ // Delete the transactions that are now dead
+ updateLiveTransactionsAndStatus();
+
+ // Do speculations
+ didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate);
+ updatePendingTransactionSpeculativeTable(didCommitOrSpeculate);
+}
+
+void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) {
+ int64_t prevslots = firstSequenceNumber;
+
+ if (didFindTableStatus) {
+ } else {
+ expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots;
+ }
+
+ didFindTableStatus = true;
+ currMaxSize = numberOfSlots;
+}
+
+void Table::updateExpectedSize() {
+ expectedsize++;
+
+ if (expectedsize > currMaxSize) {
+ expectedsize = currMaxSize;
+ }
+}
+
+
+/**
+ * Check the size of the block chain to make sure there are enough
+ * slots sent back by the server-> This is only called when we have a
+ * gap between the slots that we have locally and the slots sent by
+ * the server therefore in the slots sent by the server there will be
+ * at least 1 Table status message
+ */
+void Table::checkNumSlots(int numberOfSlots) {
+ if (numberOfSlots != expectedsize) {
+ throw new Error("Server Error: Server did not send all slots-> Expected: ");
+ }
+}
+
+/**
+ * Update the size of of the local buffer if it is needed->
+ */
+void Table::commitNewMaxSize() {
+ didFindTableStatus = false;
+
+ // Resize the local slot buffer
+ if (numberOfSlots != currMaxSize) {
+ buffer->resize((int32_t)currMaxSize);
+ }
+
+ // Change the number of local slots to the new size
+ numberOfSlots = (int32_t)currMaxSize;
+
+ // Recalculate the resize threshold since the size of the local
+ // buffer has changed
+ setResizeThreshold();
+}
+
+/**
+ * Process the new transaction parts from this latest round of slots
+ * received from the server
+ */
+void Table::processNewTransactionParts() {
+
+ if (newTransactionParts->size() == 0) {
+ // Nothing new to process
+ return;
+ }
+
+ // Iterate through all the machine Ids that we received new parts
+ // for
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts);
+ while (tpit->hasNext()) {
+ int64_t machineId = tpit->next();
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal();
+
+ SetIterator<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts);
+ // Iterate through all the parts for that machine Id
+ while (ptit->hasNext()) {
+ Pair<int64_t, int32_t> *partId = ptit->next();
+ TransactionPart *part = parts->get(partId);
+
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) {
+ int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId());
+ if (lastTransactionNumber >= part->getSequenceNumber()) {
+ // Set dead the transaction part
+ part->setDead();
+ part->releaseRef();
+ continue;
+ }
+ }
+
+ // Get the transaction object for that sequence number
+ Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber());
+
+ if (transaction == NULL) {
+ // This is a new transaction that we dont have so make a new one
+ transaction = new Transaction();
+
+ // Add that part to the transaction
+ transaction->addPartDecode(part);
+
+ // Insert this new transaction into the live tables
+ liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction);
+ liveTransactionByTransactionIdTable->put(transaction->getId(), transaction);
+ }
+ part->releaseRef();
+ }
+ delete ptit;
+ }
+ delete tpit;
+ // Clear all the new transaction parts in preparation for the next
+ // time the server sends slots
+ {
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts);
+ while (partsit->hasNext()) {
+ int64_t machineId = partsit->next();
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId);
+ delete parts;
+ }
+ delete partsit;
+ newTransactionParts->clear();
+ }
+}
+
+void Table::arbitrateFromServer() {
+ if (liveTransactionBySequenceNumberTable->size() == 0) {
+ // Nothing to arbitrate on so move on
+ return;
+ }
+
+ // Get the transaction sequence numbers and sort from oldest to newest
+ Vector<int64_t> *transactionSequenceNumbers = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
+ while (trit->hasNext())
+ transactionSequenceNumbers->add(trit->next());
+ delete trit;
+ }
+ qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64);
+
+ // Collection of key value pairs that are
+ Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculativeTableTmp = new Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals>();
+
+ // The last transaction arbitrated on
+ int64_t lastTransactionCommitted = -1;
+ Hashset<Abort *> *generatedAborts = new Hashset<Abort *>();
+ uint tsnSize = transactionSequenceNumbers->size();
+ for (uint i = 0; i < tsnSize; i++) {
+ int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i);
+ Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
+
+ // Check if this machine arbitrates for this transaction if not
+ // then we cant arbitrate this transaction
+ if (transaction->getArbitrator() != localMachineId) {
+ continue;
+ }
+
+ if (transactionSequenceNumber < lastSeqNumArbOn) {
+ continue;
+ }
+
+ if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) {
+ // We have seen this already locally so dont commit again
+ continue;
+ }
+
+ if (!transaction->isComplete()) {
+ // Will arbitrate in incorrect order if we continue so just break
+ // Most likely this
+ break;
+ }
+
+ // update the largest transaction seen by arbitrator from server
+ if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
+ lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
+ } else {
+ int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId());
+ if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
+ lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber());
+ }
+ }
+
+ if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) {
+ // Guard evaluated as true
+ // Update the local changes so we can make the commit
+ SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ speculativeTableTmp->put(kv->getKey(), kv);
+ }
+ delete kvit;
+
+ // Update what the last transaction committed was for use in batch commit
+ lastTransactionCommitted = transactionSequenceNumber;
+ } else {
+ // Guard evaluated was false so create abort
+ // Create the abort
+ Abort *newAbort = new Abort(NULL,
+ transaction->getClientLocalSequenceNumber(),
+ transaction->getSequenceNumber(),
+ transaction->getMachineId(),
+ transaction->getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+ generatedAborts->add(newAbort);
+
+ // Insert the abort so we can process
+ processEntry(newAbort);
+ }
+
+ lastSeqNumArbOn = transactionSequenceNumber;
+ }
+
+ delete transactionSequenceNumbers;
+
+ Commit *newCommit = NULL;
+
+ // If there is something to commit
+ if (speculativeTableTmp->size() != 0) {
+ // Create the commit and increment the commit sequence number
+ newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
+ localArbitrationSequenceNumber++;
+
+ // Add all the new keys to the commit
+ SetIterator<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *spit = getKeyIterator(speculativeTableTmp);
+ while (spit->hasNext()) {
+ IoTString *string = spit->next();
+ KeyValue *kv = speculativeTableTmp->get(string);
+ newCommit->addKV(kv);
+ }
+ delete spit;
+
+ // create the commit parts
+ newCommit->createCommitParts();
+
+ // Append all the commit parts to the end of the pending queue
+ // waiting for sending to the server
+ // Insert the commit so we can process it
+ Vector<CommitPart *> *parts = newCommit->getParts();
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ CommitPart *commitPart = parts->get(i);
+ processEntry(commitPart);
+ }
+ }
+ delete speculativeTableTmp;
+
+ if ((newCommit != NULL) || (generatedAborts->size() > 0)) {
+ ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
+ pendingSendArbitrationRounds->add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
+ if (newArbitrationRound->getCommit() != NULL) {
+ Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ CommitPart *commitPart = parts->get(i);
+ processEntry(commitPart);
+ }
+ }
+ }
+ } else {
+ delete generatedAborts;
+ }
+}
+
+Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction *transaction) {
+
+ // Check if this machine arbitrates for this transaction if not then
+ // we cant arbitrate this transaction
+ if (transaction->getArbitrator() != localMachineId) {
+ return Pair<bool, bool>(false, false);
+ }
+
+ if (!transaction->isComplete()) {
+ // Will arbitrate in incorrect order if we continue so just break
+ // Most likely this
+ return Pair<bool, bool>(false, false);
+ }
+
+ if (transaction->getMachineId() != localMachineId) {
+ // dont do this check for local transactions
+ if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) {
+ if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) {
+ // We've have already seen this from the server
+ return Pair<bool, bool>(false, false);
+ }
+ }
+ }
+
+ if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) {
+ // Guard evaluated as true Create the commit and increment the
+ // commit sequence number
+ Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
+ localArbitrationSequenceNumber++;
+
+ // Update the local changes so we can make the commit
+ SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ newCommit->addKV(kv);
+ }
+ delete kvit;
+
+ // create the commit parts
+ newCommit->createCommitParts();
+
+ // Append all the commit parts to the end of the pending queue
+ // waiting for sending to the server
+ ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
+ pendingSendArbitrationRounds->add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
+ Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ CommitPart *commitPart = parts->get(i);
+ processEntry(commitPart);
+ }
+ } else {
+ // Insert the commit so we can process it
+ Vector<CommitPart *> *parts = newCommit->getParts();
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ CommitPart *commitPart = parts->get(i);
+ processEntry(commitPart);
+ }
+ }
+
+ if (transaction->getMachineId() == localMachineId) {
+ TransactionStatus *status = transaction->getTransactionStatus();
+ if (status != NULL) {
+ status->setStatus(TransactionStatus_StatusCommitted);
+ }
+ }
+
+ updateLiveStateFromLocal();
+ return Pair<bool, bool>(true, true);
+ } else {
+ if (transaction->getMachineId() == localMachineId) {
+ // For locally created messages update the status
+ // Guard evaluated was false so create abort
+ TransactionStatus *status = transaction->getTransactionStatus();
+ if (status != NULL) {
+ status->setStatus(TransactionStatus_StatusAborted);
+ }
+ } else {
+ Hashset<Abort *> *addAbortSet = new Hashset<Abort * >();
+
+ // Create the abort
+ Abort *newAbort = new Abort(NULL,
+ transaction->getClientLocalSequenceNumber(),
+ -1,
+ transaction->getMachineId(),
+ transaction->getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+ addAbortSet->add(newAbort);
+
+ // Append all the commit parts to the end of the pending queue
+ // waiting for sending to the server
+ ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet);
+ pendingSendArbitrationRounds->add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
+
+ Vector<CommitPart *> *parts = newArbitrationRound->getCommit()->getParts();
+ uint partsSize = parts->size();
+ for (uint i = 0; i < partsSize; i++) {
+ CommitPart *commitPart = parts->get(i);
+ processEntry(commitPart);
+ }
+ }
+ }
+
+ updateLiveStateFromLocal();
+ return Pair<bool, bool>(true, false);
+ }
+}
+
+/**
+ * Compacts the arbitration data by merging commits and aggregating
+ * aborts so that a single large push of commits can be done instead
+ * of many small updates
+ */
+bool Table::compactArbitrationData() {
+ if (pendingSendArbitrationRounds->size() < 2) {
+ // Nothing to compact so do nothing
+ return false;
+ }
+
+ ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1);
+ if (lastRound->getDidSendPart()) {
+ return false;
+ }
+
+ bool hadCommit = (lastRound->getCommit() == NULL);
+ bool gotNewCommit = false;
+
+ uint numberToDelete = 1;
+
+ while (numberToDelete < pendingSendArbitrationRounds->size()) {
+ ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1);
+
+ if (round->isFull() || round->getDidSendPart()) {
+ // Stop since there is a part that cannot be compacted and we
+ // need to compact in order
+ break;
+ }
+
+ if (round->getCommit() == NULL) {
+ // Try compacting aborts only
+ int newSize = round->getCurrentSize() + lastRound->getAbortsCount();
+ if (newSize > ArbitrationRound_MAX_PARTS) {
+ // Cant compact since it would be too large
+ break;
+ }
+ lastRound->addAborts(round->getAborts());
+ } else {
+ // Create a new larger commit
+ Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+
+ // Create the commit parts so that we can count them
+ newCommit->createCommitParts();
+
+ // Calculate the new size of the parts
+ int newSize = newCommit->getNumberOfParts();
+ newSize += lastRound->getAbortsCount();
+ newSize += round->getAbortsCount();
+
+ if (newSize > ArbitrationRound_MAX_PARTS) {
+ // Can't compact since it would be too large
+ if (lastRound->getCommit() != newCommit &&
+ round->getCommit() != newCommit)
+ delete newCommit;
+ break;
+ }
+ // Set the new compacted part
+ if (lastRound->getCommit() == newCommit)
+ lastRound->setCommit(NULL);
+ if (round->getCommit() == newCommit)
+ round->setCommit(NULL);
+
+ if (lastRound->getCommit() != NULL) {
+ Commit * oldcommit = lastRound->getCommit();
+ lastRound->setCommit(NULL);
+ delete oldcommit;
+ }
+ lastRound->setCommit(newCommit);
+ lastRound->addAborts(round->getAborts());
+ gotNewCommit = true;
+ }
+
+ numberToDelete++;
+ }
+
+ if (numberToDelete != 1) {
+ // If there is a compaction
+ // Delete the previous pieces that are now in the new compacted piece
+ for (uint i = 2; i <= numberToDelete; i++) {
+ delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i);
+ }
+ pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete);
+
+ pendingSendArbitrationRounds->add(lastRound);
+
+ // Should reinsert into the commit processor
+ if (hadCommit && gotNewCommit) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+/**
+ * Update all the commits and the committed tables, sets dead the dead
+ * transactions
+ */
+bool Table::updateCommittedTable() {
+ if (newCommitParts->size() == 0) {
+ // Nothing new to process
+ return false;
+ }
+
+ // Iterate through all the machine Ids that we received new parts for
+ SetIterator<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts);
+ while (partsit->hasNext()) {
+ int64_t machineId = partsit->next();
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId);
+
+ // Iterate through all the parts for that machine Id
+ SetIterator<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts);
+ while (pairit->hasNext()) {
+ Pair<int64_t, int32_t> *partId = pairit->next();
+ CommitPart *part = pairit->currVal();
+
+ // Get the transaction object for that sequence number
+ Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(part->getMachineId());
+
+ if (commitForClientTable == NULL) {
+ // This is the first commit from this device
+ commitForClientTable = new Hashtable<int64_t, Commit *>();
+ liveCommitsTable->put(part->getMachineId(), commitForClientTable);
+ }
+
+ Commit *commit = commitForClientTable->get(part->getSequenceNumber());
+
+ if (commit == NULL) {
+ // This is a new commit that we dont have so make a new one
+ commit = new Commit();
+
+ // Insert this new commit into the live tables
+ commitForClientTable->put(part->getSequenceNumber(), commit);
+ }
+
+ // Add that part to the commit
+ commit->addPartDecode(part);
+ part->releaseRef();
+ }
+ delete pairit;
+ delete parts;
+ }
+ delete partsit;
+
+ // Clear all the new commits parts in preparation for the next time
+ // the server sends slots
+ newCommitParts->clear();
+
+ // If we process a new commit keep track of it for future use
+ bool didProcessANewCommit = false;
+
+ // Process the commits one by one
+ SetIterator<int64_t, Hashtable<int64_t, Commit *> *> *liveit = getKeyIterator(liveCommitsTable);
+ while (liveit->hasNext()) {
+ int64_t arbitratorId = liveit->next();
+ // Get all the commits for a specific arbitrator
+ Hashtable<int64_t, Commit *> *commitForClientTable = liveCommitsTable->get(arbitratorId);
+
+ // Sort the commits in order
+ Vector<int64_t> *commitSequenceNumbers = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Commit *> *clientit = getKeyIterator(commitForClientTable);
+ while (clientit->hasNext())
+ commitSequenceNumbers->add(clientit->next());
+ delete clientit;
+ }
+
+ qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64);
+
+ // Get the last commit seen from this arbitrator
+ int64_t lastCommitSeenSequenceNumber = -1;
+ if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) {
+ lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId);
+ }
+
+ // Go through each new commit one by one
+ for (uint i = 0; i < commitSequenceNumbers->size(); i++) {
+ int64_t commitSequenceNumber = commitSequenceNumbers->get(i);
+ Commit *commit = commitForClientTable->get(commitSequenceNumber);
+ // Special processing if a commit is not complete
+ if (!commit->isComplete()) {
+ if (i == (commitSequenceNumbers->size() - 1)) {
+ // If there is an incomplete commit and this commit is the
+ // latest one seen then this commit cannot be processed and
+ // there are no other commits
+ break;
+ } else {
+ // This is a commit that was already dead but parts of it
+ // are still in the block chain (not flushed out yet)->
+ // Delete it and move on
+ commit->setDead();
+ commitForClientTable->remove(commit->getSequenceNumber());
+ delete commit;
+ continue;
+ }
+ }
+
+ // Update the last transaction that was updated if we can
+ if (commit->getTransactionSequenceNumber() != -1) {
+ // Update the last transaction sequence number that the arbitrator arbitrated on1
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
+ lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
+ }
+ }
+
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) {
+ int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId());
+ if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
+ }
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber());
+ }
+
+ // We have already seen this commit before so need to do the
+ // full processing on this commit
+ if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) {
+ // Update the last transaction that was updated if we can
+ if (commit->getTransactionSequenceNumber() != -1) {
+ int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId());
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) ||
+ lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) {
+ lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber());
+ }
+ }
+ continue;
+ }
+
+ // If we got here then this is a brand new commit and needs full
+ // processing
+ // Get what commits should be edited, these are the commits that
+ // have live values for their keys
+ Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
+ {
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
+ if (commit != NULL)
+ commitsToEdit->add(commit);
+ }
+ delete kvit;
+ }
+
+ // Update each previous commit that needs to be updated
+ SetIterator<Commit *, Commit *> *commitit = commitsToEdit->iterator();
+ while (commitit->hasNext()) {
+ Commit *previousCommit = commitit->next();
+
+ // Only bother with live commits (TODO: Maybe remove this check)
+ if (previousCommit->isLive()) {
+
+ // Update which keys in the old commits are still live
+ {
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ previousCommit->invalidateKey(kv->getKey());
+ }
+ delete kvit;
+ }
+
+ // if the commit is now dead then remove it
+ if (!previousCommit->isLive()) {
+ commitForClientTable->remove(previousCommit->getSequenceNumber());
+ delete previousCommit;
+ }
+ }
+ }
+ delete commitit;
+ delete commitsToEdit;
+
+ // Update the last seen sequence number from this arbitrator
+ if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) {
+ if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) {
+ lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
+ }
+ } else {
+ lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber());
+ }
+
+ // We processed a new commit that we havent seen before
+ didProcessANewCommit = true;
+
+ // Update the committed table of keys and which commit is using which key
+ {
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ committedKeyValueTable->put(kv->getKey(), kv);
+ liveCommitsByKeyTable->put(kv->getKey(), commit);
+ }
+ delete kvit;
+ }
+ }
+ delete commitSequenceNumbers;
+ }
+ delete liveit;
+
+ return didProcessANewCommit;
+}
+
+/**
+ * Create the speculative table from transactions that are still live
+ * and have come from the cloud
+ */
+bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
+ if (liveTransactionBySequenceNumberTable->size() == 0) {
+ // There is nothing to speculate on
+ return false;
+ }
+
+ // Create a list of the transaction sequence numbers and sort them
+ // from oldest to newest
+ Vector<int64_t> *transactionSequenceNumbersSorted = new Vector<int64_t>();
+ {
+ SetIterator<int64_t, Transaction *> *trit = getKeyIterator(liveTransactionBySequenceNumberTable);
+ while (trit->hasNext())
+ transactionSequenceNumbersSorted->add(trit->next());
+ delete trit;
+ }
+
+ qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64);
+
+ bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn;
+
+
+ if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) {
+ // If there is a gap in the transaction sequence numbers then
+ // there was a commit or an abort of a transaction OR there was a
+ // new commit (Could be from offline commit) so a redo the
+ // speculation from scratch
+
+ // Start from scratch
+ speculatedKeyValueTable->clear();
+ lastTransactionSequenceNumberSpeculatedOn = -1;
+ oldestTransactionSequenceNumberSpeculatedOn = -1;
+ }
+
+ // Remember the front of the transaction list
+ oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0);
+
+ // Find where to start arbitration from
+ uint startIndex = 0;
+
+ for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++)
+ if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn)
+ break;
+ startIndex++;
+
+ if (startIndex >= transactionSequenceNumbersSorted->size()) {
+ // Make sure we are not out of bounds
+ delete transactionSequenceNumbersSorted;
+ return false; // did not speculate
+ }
+
+ Hashset<int64_t> *incompleteTransactionArbitrator = new Hashset<int64_t>();
+ bool didSkip = true;
+
+ for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) {
+ int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i);
+ Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber);
+
+ if (!transaction->isComplete()) {
+ // If there is an incomplete transaction then there is nothing
+ // we can do add this transactions arbitrator to the list of
+ // arbitrators we should ignore
+ incompleteTransactionArbitrator->add(transaction->getArbitrator());
+ didSkip = true;
+ continue;
+ }
+
+ if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) {
+ continue;
+ }
+
+ lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber;
+
+ if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) {
+ // Guard evaluated to true so update the speculative table
+ {
+ SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ speculatedKeyValueTable->put(kv->getKey(), kv);
+ }
+ delete kvit;
+ }
+ }
+ }
+
+ delete transactionSequenceNumbersSorted;
+
+ if (didSkip) {
+ // Since there was a skip we need to redo the speculation next time around
+ lastTransactionSequenceNumberSpeculatedOn = -1;
+ oldestTransactionSequenceNumberSpeculatedOn = -1;
+ }
+
+ // We did some speculation
+ return true;
+}
+
+/**
+ * Create the pending transaction speculative table from transactions
+ * that are still in the pending transaction buffer
+ */
+void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) {
+ if (pendingTransactionQueue->size() == 0) {
+ // There is nothing to speculate on
+ return;
+ }
+
+ if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) {
+ // need to reset on the pending speculation
+ lastPendingTransactionSpeculatedOn = NULL;
+ firstPendingTransaction = pendingTransactionQueue->get(0);
+ pendingTransactionSpeculatedKeyValueTable->clear();
+ }
+
+ // Find where to start arbitration from
+ uint startIndex = 0;
+
+ for (; startIndex < pendingTransactionQueue->size(); startIndex++)
+ if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction)
+ break;
+
+ if (startIndex >= pendingTransactionQueue->size()) {
+ // Make sure we are not out of bounds
+ return;
+ }
+
+ for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) {
+ Transaction *transaction = pendingTransactionQueue->get(i);
+
+ lastPendingTransactionSpeculatedOn = transaction;
+
+ if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) {
+ // Guard evaluated to true so update the speculative table
+ SetIterator<KeyValue *, KeyValue *> *kvit = transaction->getKeyValueUpdateSet()->iterator();
+ while (kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv);
+ }
+ delete kvit;
+ }
+ }
+}
+
+/**
+ * Set dead and remove from the live transaction tables the
+ * transactions that are dead
+ */
+void Table::updateLiveTransactionsAndStatus() {
+ // Go through each of the transactions
+ {
+ SetIterator<int64_t, Transaction *> *iter = getKeyIterator(liveTransactionBySequenceNumberTable);
+ while (iter->hasNext()) {
+ int64_t key = iter->next();
+ Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
+
+ // Check if the transaction is dead
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
+ && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
+ // Set dead the transaction
+ transaction->setDead();
+
+ // Remove the transaction from the live table
+ iter->remove();
+ liveTransactionByTransactionIdTable->remove(transaction->getId());
+ delete transaction;
+ }
+ }
+ delete iter;
+ }
+
+ // Go through each of the transactions
+ {
+ SetIterator<int64_t, TransactionStatus *> *iter = getKeyIterator(outstandingTransactionStatus);
+ while (iter->hasNext()) {
+ int64_t key = iter->next();
+ TransactionStatus *status = outstandingTransactionStatus->get(key);
+
+ // Check if the transaction is dead
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
+ && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
+ // Set committed
+ status->setStatus(TransactionStatus_StatusCommitted);
+
+ // Remove
+ iter->remove();
+ }
+ }
+ delete iter;
+ }
+}
+
+/**
+ * Process this slot, entry by entry-> Also update the latest message sent by slot
+ */
+void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
+
+ // Update the last message seen
+ updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
+
+ // Process each entry in the slot
+ Vector<Entry *> *entries = slot->getEntries();
+ uint eSize = entries->size();
+ for (uint ei = 0; ei < eSize; ei++) {
+ Entry *entry = entries->get(ei);
+ switch (entry->getType()) {
+ case TypeCommitPart:
+ processEntry((CommitPart *)entry);
+ break;
+ case TypeAbort:
+ processEntry((Abort *)entry);
+ break;
+ case TypeTransactionPart:
+ processEntry((TransactionPart *)entry);
+ break;
+ case TypeNewKey:
+ processEntry((NewKey *)entry);
+ break;
+ case TypeLastMessage:
+ processEntry((LastMessage *)entry, machineSet);
+ break;
+ case TypeRejectedMessage:
+ processEntry((RejectedMessage *)entry, indexer);
+ break;
+ case TypeTableStatus:
+ processEntry((TableStatus *)entry, slot->getSequenceNumber());
+ break;
+ default:
+ throw new Error("Unrecognized type: ");
+ }
+ }
+}
+
+/**
+ * Update the last message that was sent for a machine Id
+ */
+void Table::processEntry(LastMessage *entry, Hashset<int64_t> *machineSet) {
+ // Update what the last message received by a machine was
+ updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet);
+}
+
+/**
+ * Add the new key to the arbitrators table and update the set of live
+ * new keys (in case of a rescued new key message)
+ */
+void Table::processEntry(NewKey *entry) {
+ // Update the arbitrator table with the new key information
+ arbitratorTable->put(entry->getKey(), entry->getMachineID());
+
+ // Update what the latest live new key is
+ NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry);
+ if (oldNewKey != NULL) {
+ // Delete the old new key messages
+ oldNewKey->setDead();
+ }
+}
+
+/**
+ * Process new table status entries and set dead the old ones as new
+ * ones come in-> keeps track of the largest and smallest table status
+ * seen in this current round of updating the local copy of the block
+ * chain
+ */
+void Table::processEntry(TableStatus *entry, int64_t seq) {
+ int newNumSlots = entry->getMaxSlots();
+ updateCurrMaxSize(newNumSlots);
+ initExpectedSize(seq, newNumSlots);
+
+ if (liveTableStatus != NULL) {
+ // We have a larger table status so the old table status is no
+ // int64_ter alive
+ liveTableStatus->setDead();
+ }
+
+ // Make this new table status the latest alive table status
+ liveTableStatus = entry;
+}
+
+/**
+ * Check old messages to see if there is a block chain violation->
+ * Also
+ */
+void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) {
+ int64_t oldSeqNum = entry->getOldSeqNum();
+ int64_t newSeqNum = entry->getNewSeqNum();
+ bool isequal = entry->getEqual();
+ int64_t machineId = entry->getMachineID();
+ int64_t seq = entry->getSequenceNumber();
+
+ // Check if we have messages that were supposed to be rejected in
+ // our local block chain
+ for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) {
+ // Get the slot
+ Slot *slot = indexer->getSlot(seqNum);
+
+ if (slot != NULL) {
+ // If we have this slot make sure that it was not supposed to be
+ // a rejected slot
+ int64_t slotMachineId = slot->getMachineID();
+ if (isequal != (slotMachineId == machineId)) {
+ throw new Error("Server Error: Trying to insert rejected message for slot ");
+ }
+ }
+ }
+
+ // Create a list of clients to watch until they see this rejected
+ // message entry->
+ Hashset<int64_t> *deviceWatchSet = new Hashset<int64_t>();
+ SetIterator<int64_t, Pair<int64_t, Liveness *> *> *iter = getKeyIterator(lastMessageTable);
+ while (iter->hasNext()) {
+ // Machine ID for the last message entry
+ int64_t lastMessageEntryMachineId = iter->next();
+
+ // We've seen it, don't need to continue to watch-> Our next
+ // message will implicitly acknowledge it->
+ if (lastMessageEntryMachineId == localMachineId) {
+ continue;
+ }
+
+ Pair<int64_t, Liveness *> *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId);
+ int64_t entrySequenceNumber = lastMessageValue->getFirst();
+
+ if (entrySequenceNumber < seq) {
+ // Add this rejected message to the set of messages that this
+ // machine ID did not see yet
+ addWatchVector(lastMessageEntryMachineId, entry);
+ // This client did not see this rejected message yet so add it
+ // to the watch set to monitor
+ deviceWatchSet->add(lastMessageEntryMachineId);
+ }
+ }
+ delete iter;
+
+ if (deviceWatchSet->isEmpty()) {
+ // This rejected message has been seen by all the clients so
+ entry->setDead();
+ delete deviceWatchSet;
+ } else {
+ // We need to watch this rejected message
+ entry->setWatchSet(deviceWatchSet);
+ }
+}
+
+/**
+ * Check if this abort is live, if not then save it so we can kill it
+ * later-> update the last transaction number that was arbitrated on->
+ */
+void Table::processEntry(Abort *entry) {
+ if (entry->getTransactionSequenceNumber() != -1) {
+ // update the transaction status if it was sent to the server
+ TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber());
+ if (status != NULL) {
+ status->setStatus(TransactionStatus_StatusAborted);
+ }
+ }
+
+ // Abort has not been seen by the client it is for yet so we need to
+ // keep track of it
+
+ Abort *previouslySeenAbort = liveAbortTable->put(new Pair<int64_t, int64_t>(entry->getAbortId()), entry);
+ if (previouslySeenAbort != NULL) {
+ previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version
+ }
+
+ if (entry->getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
+ }
+
+ if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
+ // The machine already saw this so it is dead
+ entry->setDead();
+ Pair<int64_t, int64_t> abortid = entry->getAbortId();
+ liveAbortTable->remove(&abortid);
+
+ if (entry->getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber());
+ }
+ return;
+ }
+
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) {
+ int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator());
+ if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
+ }
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber());
+ }
+
+ // Set dead a transaction if we can
+ Pair<int64_t, int64_t> deadPair = Pair<int64_t, int64_t>(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber());
+
+ Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair);
+ if (transactionToSetDead != NULL) {
+ liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber());
+ }
+
+ // Update the last transaction sequence number that the arbitrator
+ // arbitrated on
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) ||
+ (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) {
+ // Is a valid one
+ if (entry->getTransactionSequenceNumber() != -1) {
+ lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber());
+ }
+ }
+}
+
+/**
+ * Set dead the transaction part if that transaction is dead and keep
+ * track of all new parts
+ */
+void Table::processEntry(TransactionPart *entry) {
+ // Check if we have already seen this transaction and set it dead OR
+ // if it is not alive
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) {
+ // This transaction is dead, it was already committed or aborted
+ entry->setDead();
+ return;
+ }
+
+ // This part is still alive
+ Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
+
+ if (transactionPart == NULL) {
+ // Dont have a table for this machine Id yet so make one
+ transactionPart = new Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ newTransactionParts->put(entry->getMachineId(), transactionPart);
+ }
+
+ // Update the part and set dead ones we have already seen (got a
+ // rescued version)
+ entry->acquireRef();
+ TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry);
+ if (previouslySeenPart != NULL) {
+ previouslySeenPart->releaseRef();
+ previouslySeenPart->setDead();
+ }
+}
+
+/**
+ * Process new commit entries and save them for future use-> Delete duplicates
+ */
+void Table::processEntry(CommitPart *entry) {
+ // Update the last transaction that was updated if we can
+ if (entry->getTransactionSequenceNumber() != -1) {
+ if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) ||
+ lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) {
+ lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber());
+ }
+ }
+
+ Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId());
+ if (commitPart == NULL) {
+ // Don't have a table for this machine Id yet so make one
+ commitPart = new Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ newCommitParts->put(entry->getMachineId(), commitPart);
+ }
+ // Update the part and set dead ones we have already seen (got a
+ // rescued version)
+ entry->acquireRef();
+ CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry);
+ if (previouslySeenPart != NULL) {
+ previouslySeenPart->setDead();
+ previouslySeenPart->releaseRef();
+ }
+}
+
+/**
+ * Update the last message seen table-> Update and set dead the
+ * appropriate RejectedMessages as clients see them-> Updates the live
+ * aborts, removes those that are dead and sets them dead-> Check that
+ * the last message seen is correct and that there is no mismatch of
+ * our own last message or that other clients have not had a rollback
+ * on the last message->
+ */
+void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet) {
+ // We have seen this machine ID
+ machineSet->remove(machineId);
+
+ // Get the set of rejected messages that this machine Id is has not seen yet
+ Hashset<RejectedMessage *> *watchset = rejectedMessageWatchVectorTable->get(machineId);
+ // If there is a rejected message that this machine Id has not seen yet
+ if (watchset != NULL) {
+ // Go through each rejected message that this machine Id has not
+ // seen yet
+
+ SetIterator<RejectedMessage *, RejectedMessage *> *rmit = watchset->iterator();
+ while (rmit->hasNext()) {
+ RejectedMessage *rm = rmit->next();
+ // If this machine Id has seen this rejected message->->->
+ if (rm->getSequenceNumber() <= seqNum) {
+ // Remove it from our watchlist
+ rmit->remove();
+ // Decrement machines that need to see this notification
+ rm->removeWatcher(machineId);
+ }
+ }
+ delete rmit;
+ }
+
+ // Set dead the abort
+ SetIterator<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable);
+
+ while (abortit->hasNext()) {
+ Pair<int64_t, int64_t> *key = abortit->next();
+ Abort *abort = liveAbortTable->get(key);
+ if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) {
+ abort->setDead();
+ abortit->remove();
+ if (abort->getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber());
+ }
+ }
+ }
+ delete abortit;
+ if (machineId == localMachineId) {
+ // Our own messages are immediately dead->
+ char livenessType = liveness->getType();
+ if (livenessType == TypeLastMessage) {
+ ((LastMessage *)liveness)->setDead();
+ } else if (livenessType == TypeSlot) {
+ ((Slot *)liveness)->setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+ // Get the old last message for this device
+ Pair<int64_t, Liveness *> *lastMessageEntry = lastMessageTable->put(machineId, new Pair<int64_t, Liveness *>(seqNum, liveness));
+ if (lastMessageEntry == NULL) {
+ // If no last message then there is nothing else to process
+ return;
+ }
+
+ int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
+ Liveness *lastEntry = lastMessageEntry->getSecond();
+ delete lastMessageEntry;
+
+ // If it is not our machine Id since we already set ours to dead
+ if (machineId != localMachineId) {
+ char lastEntryType = lastEntry->getType();
+
+ if (lastEntryType == TypeLastMessage) {
+ ((LastMessage *)lastEntry)->setDead();
+ } else if (lastEntryType == TypeSlot) {
+ ((Slot *)lastEntry)->setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+ // Make sure the server is not playing any games
+ if (machineId == localMachineId) {
+ if (hadPartialSendToServer) {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
+ }
+ } else {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
+ }
+ }
+ } else {
+ if (lastMessageSeqNum > seqNum) {
+ throw new Error("Server Error: Rollback on remote machine sequence number");
+ }
+ }
+}
+
+/**
+ * Add a rejected message entry to the watch set to keep track of
+ * which clients have seen that rejected message entry and which have
+ * not.
+ */
+void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) {
+ Hashset<RejectedMessage *> *entries = rejectedMessageWatchVectorTable->get(machineId);
+ if (entries == NULL) {
+ // There is no set for this machine ID yet so create one
+ entries = new Hashset<RejectedMessage *>();
+ rejectedMessageWatchVectorTable->put(machineId, entries);
+ }
+ entries->add(entry);
+}
+
+/**
+ * Check if the HMAC chain is not violated
+ */
+void Table::checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots) {
+ for (uint i = 0; i < newSlots->length(); i++) {
+ Slot *currSlot = newSlots->get(i);
+ Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1);
+ if (prevSlot != NULL &&
+ !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC()))
+ throw new Error("Server Error: Invalid HMAC Chain");
+ }
+}