X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cpp;fp=version2%2Fsrc%2FC%2FTable.cpp;h=255ba3cf35bb3da7a02df7e59a9366fe88b472a0;hb=786e40250f31eff04eec25bbcaae3cd916fedb14;hp=0000000000000000000000000000000000000000;hpb=3f24bffc82ebfe2730308b63100af08645316577;p=iotcloud.git diff --git a/version2/src/C/Table.cpp b/version2/src/C/Table.cpp new file mode 100644 index 0000000..255ba3c --- /dev/null +++ b/version2/src/C/Table.cpp @@ -0,0 +1,2863 @@ +#include "Table.h" +#include "CloudComm.h" +#include "SlotBuffer.h" +#include "NewKey.h" +#include "Slot.h" +#include "KeyValue.h" +#include "Error.h" +#include "PendingTransaction.h" +#include "TableStatus.h" +#include "TransactionStatus.h" +#include "Transaction.h" +#include "LastMessage.h" +#include "SecureRandom.h" +#include "ByteBuffer.h" +#include "Abort.h" +#include "CommitPart.h" +#include "ArbitrationRound.h" +#include "TransactionPart.h" +#include "Commit.h" +#include "RejectedMessage.h" +#include "SlotIndexer.h" +#include + +int compareInt64(const void *a, const void *b) { + const int64_t *pa = (const int64_t *) a; + const int64_t *pb = (const int64_t *) b; + if (*pa < *pb) + return -1; + else if (*pa > *pb) + return 1; + else + return 0; +} + +Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : + buffer(NULL), + cloud(new CloudComm(this, baseurl, password, listeningPort)), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumver(1), + localMachineId(_localMachineId), + sequenceNumber(0), + localSequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedsize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ + init(); +} + +Table::Table(CloudComm *_cloud, int64_t _localMachineId) : + buffer(NULL), + cloud(_cloud), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumver(1), + localMachineId(_localMachineId), + sequenceNumber(0), + localSequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedsize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ + init(); +} + +Table::~Table() { + delete cloud; + delete random; + delete buffer; + // init data structs + delete committedKeyValueTable; + delete speculatedKeyValueTable; + delete pendingTransactionSpeculatedKeyValueTable; + delete liveNewKeyTable; + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) { + Pair * pair = lastMessageTable->get(lmit->next()); + delete pair; + } + delete lmit; + delete lastMessageTable; + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + { + SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); + while(rmit->hasNext()) { + int64_t machineid = rmit->next(); + Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); + SetIterator * mit = rmset->iterator(); + while (mit->hasNext()) { + RejectedMessage * rm = mit->next(); + delete rm; + } + delete mit; + delete rmset; + } + delete rmit; + delete rejectedMessageWatchVectorTable; + } + delete arbitratorTable; + delete liveAbortTable; + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); + while(pit->hasNext()) { + Pair * pair=pit->next(); + pit->currVal()->releaseRef(); + } + delete pit; + + delete parts; + } + delete partsit; + delete newTransactionParts; + } + { + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); + while(pit->hasNext()) { + Pair * pair=pit->next(); + pit->currVal()->releaseRef(); + } + delete pit; + delete parts; + } + delete partsit; + delete newCommitParts; + } + delete lastArbitratedTransactionNumberByArbitratorTable; + delete liveTransactionBySequenceNumberTable; + delete liveTransactionByTransactionIdTable; + { + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveit->currVal(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) { + int64_t id = clientit->next(); + delete commitForClientTable->get(id); + } + delete clientit; + } + + delete commitForClientTable; + } + delete liveit; + delete liveCommitsTable; + } + delete liveCommitsByKeyTable; + delete lastCommitSeenSequenceNumberByArbitratorTable; + delete rejectedSlotVector; + { + uint size = pendingTransactionQueue->size(); + for (uint iter = 0; iter < size; iter++) { + delete pendingTransactionQueue->get(iter); + } + delete pendingTransactionQueue; + } + delete pendingSendArbitrationEntriesToDelete; + { + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + delete transactionPartsSent; + } + delete outstandingTransactionStatus; + delete liveAbortsGeneratedByLocal; + delete offlineTransactionsCommittedAndAtServer; + delete localCommunicationTable; + delete lastTransactionSeenFromMachineFromServer; + { + for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { + delete pendingSendArbitrationRounds->get(i); + } + delete pendingSendArbitrationRounds; + } + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; + delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; + if (lastNewKey) + delete lastNewKey; +} + +/** + * Init all the stuff needed for for table usage + */ +void Table::init() { + // Init helper objects + random = new SecureRandom(); + buffer = new SlotBuffer(); + + // init data structs + committedKeyValueTable = new Hashtable(); + speculatedKeyValueTable = new Hashtable(); + pendingTransactionSpeculatedKeyValueTable = new Hashtable(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable * >(); + rejectedMessageWatchVectorTable = new Hashtable * >(); + arbitratorTable = new Hashtable(); + liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); + liveTransactionBySequenceNumberTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); + liveCommitsTable = new Hashtable * >(); + liveCommitsByKeyTable = new Hashtable(); + lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); + rejectedSlotVector = new Vector(); + pendingTransactionQueue = new Vector(); + pendingSendArbitrationEntriesToDelete = new Vector(); + transactionPartsSent = new Hashtable *>(); + outstandingTransactionStatus = new Hashtable(); + liveAbortsGeneratedByLocal = new Hashtable(); + offlineTransactionsCommittedAndAtServer = new Hashset *, uintptr_t, 0, pairHashFunction, pairEquals>(); + localCommunicationTable = new Hashtable *>(); + lastTransactionSeenFromMachineFromServer = new Hashtable(); + pendingSendArbitrationRounds = new Vector(); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); + + // Other init stuff + numberOfSlots = buffer->capacity(); + setResizeThreshold(); +} + +/** + * Initialize the table by inserting a table status as the first entry + * into the table status also initialize the crypto stuff. + */ +void Table::initTable() { + cloud->initSecurity(); + + // Create the first insertion into the block chain which is the table status + Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); + localSequenceNumber++; + TableStatus *status = new TableStatus(s, numberOfSlots); + s->addShallowEntry(status); + Array *array = cloud->putSlot(s, numberOfSlots); + + if (array == NULL) { + array = new Array(1); + array->set(0, s); + // update local block chain + validateAndUpdate(array, true); + delete array; + } else if (array->length() == 1) { + // in case we did push the slot BUT we failed to init it + validateAndUpdate(array, true); + delete s; + delete array; + } else { + delete s; + delete array; + throw new Error("Error on initialization"); + } +} + +/** + * Rebuild the table from scratch by pulling the latest block chain + * from the server. + */ +void Table::rebuild() { + // Just pull the latest slots from the server + Array *newslots = cloud->getSlots(sequenceNumber + 1); + validateAndUpdate(newslots, true); + delete newslots; + sendToServer(NULL); + updateLiveTransactionsAndStatus(); +} + +void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { + localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); +} + +int64_t Table::getArbitrator(IoTString *key) { + return arbitratorTable->get(key); +} + +void Table::close() { + cloud->closeCloud(); +} + +IoTString *Table::getCommitted(IoTString *key) { + KeyValue *kv = committedKeyValueTable->get(key); + + if (kv != NULL) { + return new IoTString(kv->getValue()); + } else { + return NULL; + } +} + +IoTString *Table::getSpeculative(IoTString *key) { + KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); + + if (kv == NULL) { + kv = speculatedKeyValueTable->get(key); + } + + if (kv == NULL) { + kv = committedKeyValueTable->get(key); + } + + if (kv != NULL) { + return new IoTString(kv->getValue()); + } else { + return NULL; + } +} + +IoTString *Table::getCommittedAtomic(IoTString *key) { + KeyValue *kv = committedKeyValueTable->get(key); + + if (!arbitratorTable->contains(key)) { + throw new Error("Key not Found."); + } + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } + + if (kv != NULL) { + pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); + return new IoTString(kv->getValue()); + } else { + pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); + return NULL; + } +} + +IoTString *Table::getSpeculativeAtomic(IoTString *key) { + if (!arbitratorTable->contains(key)) { + throw new Error("Key not Found."); + } + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } + + KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); + + if (kv == NULL) { + kv = speculatedKeyValueTable->get(key); + } + + if (kv == NULL) { + kv = committedKeyValueTable->get(key); + } + + if (kv != NULL) { + pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); + return new IoTString(kv->getValue()); + } else { + pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); + return NULL; + } +} + +bool Table::update() { + try { + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + validateAndUpdate(newSlots, false); + delete newSlots; + sendToServer(NULL); + updateLiveTransactionsAndStatus(); + return true; + } catch (Exception *e) { + SetIterator *> *kit = getKeyIterator(localCommunicationTable); + while (kit->hasNext()) { + int64_t m = kit->next(); + updateFromLocal(m); + } + delete kit; + } + + return false; +} + +bool Table::createNewKey(IoTString *keyName, int64_t machineId) { + while (true) { + if (arbitratorTable->contains(keyName)) { + // There is already an arbitrator + return false; + } + NewKey *newKey = new NewKey(NULL, keyName, machineId); + + if (sendToServer(newKey)) { + // If successfully inserted + return true; + } + } +} + +void Table::startTransaction() { + // Create a new transaction, invalidates any old pending transactions. + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + pendingTransactionBuilder = new PendingTransaction(localMachineId); +} + +void Table::put(IoTString *key, IoTString *value) { + // Make sure it is a valid key + if (!arbitratorTable->contains(key)) { + throw new Error("Key not Found."); + } + + // Make sure new key value pair matches the current arbitrator + if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { + // TODO: Maybe not throw en error + throw new Error("Not all Key Values Match Arbitrator."); + } + + // Add the key value to this transaction + KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); + pendingTransactionBuilder->addKV(kv); +} + +TransactionStatus *Table::commitTransaction() { + if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { + // transaction with no updates will have no effect on the system + return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); + } + + // Set the local transaction sequence number and increment + pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber); + localTransactionSequenceNumber++; + + // Create the transaction status + TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator()); + + // Create the new transaction + Transaction *newTransaction = pendingTransactionBuilder->createTransaction(); + newTransaction->setTransactionStatus(transactionStatus); + + if (pendingTransactionBuilder->getArbitrator() != localMachineId) { + // Add it to the queue and invalidate the builder for safety + pendingTransactionQueue->add(newTransaction); + } else { + arbitrateOnLocalTransaction(newTransaction); + delete newTransaction; + updateLiveStateFromLocal(); + } + if (pendingTransactionBuilder != NULL) + delete pendingTransactionBuilder; + + pendingTransactionBuilder = new PendingTransaction(localMachineId); + + try { + sendToServer(NULL); + } catch (ServerException *e) { + + Hashset *arbitratorTriedAndFailed = new Hashset(); + uint size = pendingTransactionQueue->size(); + uint oldindex = 0; + for (uint iter = 0; iter < size; iter++) { + Transaction *transaction = pendingTransactionQueue->get(iter); + pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); + + if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { + // Already contacted this client so ignore all attempts to contact this client + // to preserve ordering for arbitrator + continue; + } + + Pair sendReturn = sendTransactionToLocal(transaction); + + if (sendReturn.getFirst()) { + // Failed to contact over local + arbitratorTriedAndFailed->add(transaction->getArbitrator()); + } else { + // Successful contact or should not contact + + if (sendReturn.getSecond()) { + // did arbitrate + delete transaction; + oldindex--; + } + } + } + pendingTransactionQueue->setSize(oldindex); + } + + updateLiveStateFromLocal(); + + return transactionStatus; +} + +/** + * Recalculate the new resize threshold + */ +void Table::setResizeThreshold() { + int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots); + bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower); +} + +int64_t Table::getLocalSequenceNumber() { + return localSequenceNumber; +} + +void Table::processTransactionList(bool handlePartial) { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Update which transactions parts still need to be sent + transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); + // Add the transaction status to the outstanding list + outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); + + // Update the transaction status + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); + + // Check if all the transaction parts were successfully + // sent and if so then remove it from pending + if (transaction->didSendAllParts()) { + transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); + pendingTransactionQueue->remove(transaction); + delete transaction; + } else if (handlePartial) { + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + } + delete trit; +} + +NewKey * Table::handlePartialSend(NewKey * newKey) { + //Didn't receive acknowledgement for last send + //See if the server has received a newer slot + + Array *newSlots = cloud->getSlots(sequenceNumber + 1); + if (newSlots->length() == 0) { + //Retry sending old slot + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); + + if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; + newKey = NULL; + } + } + processTransactionList(false); + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; + newKey = NULL; + } + } + processTransactionList(true); + } + } + + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + + if (newSlots->length() != 0) { + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + } else { + if (checkSend(newSlots, lastSlotAttemptedToSend)) { + if (newKey != NULL) { + if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { + delete newKey; + newKey = NULL; + } + } + + processTransactionList(true); + } else { + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetServerFailure(); + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + } + + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + delete newSlots; + return newKey; +} + +void Table::clearSentParts() { + // Clear the sent data since we are trying again + pendingSendArbitrationEntriesToDelete->clear(); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + delete trit->currVal(); + } + delete trit; + transactionPartsSent->clear(); +} + +bool Table::sendToServer(NewKey *newKey) { + if (hadPartialSendToServer) { + newKey = handlePartialSend(newKey); + } + + try { + // While we have stuff that needs inserting into the block chain + while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { + if (hadPartialSendToServer) { + throw new Error("Should Be error free"); + } + + // If there is a new key with same name then end + if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { + delete newKey; + return false; + } + + // Create the slot + Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); + localSequenceNumber++; + + // Try to fill the slot with data + int newSize = 0; + bool insertedNewKey = false; + bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); + + if (needsResize) { + // Reset which transaction to send + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetNextPartToSend(); + + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + + // Clear the sent data since we are trying again + clearSentParts(); + + // We needed a resize so try again + fillSlot(slot, true, newKey, newSize, insertedNewKey); + } + if (lastSlotAttemptedToSend != NULL) + delete lastSlotAttemptedToSend; + + lastSlotAttemptedToSend = slot; + lastIsNewKey = (newKey != NULL); + lastInsertedNewKey = insertedNewKey; + lastNewSize = newSize; + if (( newKey != lastNewKey) && (lastNewKey != NULL)) + delete lastNewKey; + lastNewKey = newKey; + if (lastTransactionPartsSent != NULL) + delete lastTransactionPartsSent; + lastTransactionPartsSent = transactionPartsSent->clone(); + + Array * newSlots = NULL; + bool wasInserted = false; + bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); + + if (sendSlotsReturn) { + lastSlotAttemptedToSend = NULL; + // Did insert into the block chain + if (insertedNewKey) { + // This slot was what was inserted not a previous slot + // New Key was successfully inserted into the block chain so dont want to insert it again + newKey = NULL; + } + + // Remove the aborts and commit parts that were sent from the pending to send queue + uint size = pendingSendArbitrationRounds->size(); + uint oldcount = 0; + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); + round->removeParts(pendingSendArbitrationEntriesToDelete); + + if (!round->isDoneSending()) { + //Add part back in + pendingSendArbitrationRounds->set(oldcount++, + pendingSendArbitrationRounds->get(i)); + } else + delete pendingSendArbitrationRounds->get(i); + } + pendingSendArbitrationRounds->setSize(oldcount); + processTransactionList(false); + } else { + // Reset which transaction to send + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetNextPartToSend(); + + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + } + + // Clear the sent data in preparation for next send + clearSentParts(); + + if (newSlots->length() != 0) { + // insert into the local block chain + validateAndUpdate(newSlots, true); + } + delete newSlots; + } + } catch (ServerException *e) { + if (e->getType() != ServerException_TypeInputTimeout) { + // Nothing was able to be sent to the server so just clear these data structures + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetNextPartToSend(); + + // Set the transaction sequence number back to nothing + if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { + transaction->setSequenceNumber(-1); + } + } + delete trit; + } else { + // There was a partial send to the server + hadPartialSendToServer = true; + + // Nothing was able to be sent to the server so just clear these data structures + SetIterator *> *trit = getKeyIterator(transactionPartsSent); + while (trit->hasNext()) { + Transaction *transaction = trit->next(); + transaction->resetNextPartToSend(); + transaction->setServerFailure(); + } + delete trit; + } + + clearSentParts(); + + throw e; + } + + return newKey == NULL; +} + +bool Table::updateFromLocal(int64_t machineId) { + if (!localCommunicationTable->contains(machineId)) + return false; + + Pair *localCommunicationInformation = localCommunicationTable->get(machineId); + + // Get the size of the send data + int sendDataSize = sizeof(int32_t) + sizeof(int64_t); + + int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); + } + + Array *sendData = new Array(sendDataSize); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); + + // Encode the data + bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode->putInt(0); + + // Send by local + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); + localSequenceNumber++; + + if (returnData == NULL) { + // Could not contact server + return false; + } + + // Decode the data + ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); + int numberOfEntries = bbDecode->getInt(); + + for (int i = 0; i < numberOfEntries; i++) { + char type = bbDecode->get(); + if (type == TypeAbort) { + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); + processEntry(abort); + } else if (type == TypeCommitPart) { + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + return true; +} + +Pair Table::sendTransactionToLocal(Transaction *transaction) { + + // Get the devices local communications + if (!localCommunicationTable->contains(transaction->getArbitrator())) + return Pair(true, false); + + Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + + // Get the size of the send data + int sendDataSize = sizeof(int32_t) + sizeof(int64_t); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + sendDataSize += part->getSize(); + } + } + + int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); + } + + // Make the send data size + Array *sendData = new Array(sendDataSize); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); + + // Encode the data + bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode->putInt(transaction->getParts()->size()); + { + Vector *tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart *part = tParts->get(i); + part->encode(bbEncode); + } + } + + // Send by local + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); + localSequenceNumber++; + + if (returnData == NULL) { + // Could not contact server + return Pair(true, false); + } + + // Decode the data + ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); + bool didCommit = bbDecode->get() == 1; + bool couldArbitrate = bbDecode->get() == 1; + int numberOfEntries = bbDecode->getInt(); + bool foundAbort = false; + + for (int i = 0; i < numberOfEntries; i++) { + char type = bbDecode->get(); + if (type == TypeAbort) { + Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); + + if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { + foundAbort = true; + } + + processEntry(abort); + } else if (type == TypeCommitPart) { + CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + if (couldArbitrate) { + TransactionStatus *status = transaction->getTransactionStatus(); + if (didCommit) { + status->setStatus(TransactionStatus_StatusCommitted); + } else { + status->setStatus(TransactionStatus_StatusAborted); + } + } else { + TransactionStatus *status = transaction->getTransactionStatus(); + if (foundAbort) { + status->setStatus(TransactionStatus_StatusAborted); + } else { + status->setStatus(TransactionStatus_StatusCommitted); + } + } + + return Pair(false, true); +} + +Array *Table::acceptDataFromLocal(Array *data) { + // Decode the data + ByteBuffer *bbDecode = ByteBuffer_wrap(data); + int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); + int numberOfParts = bbDecode->getInt(); + + // If we did commit a transaction or not + bool didCommit = false; + bool couldArbitrate = false; + + if (numberOfParts != 0) { + + // decode the transaction + Transaction *transaction = new Transaction(); + for (int i = 0; i < numberOfParts; i++) { + bbDecode->get(); + TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode); + transaction->addPartDecode(newPart); + } + + // Arbitrate on transaction and pull relevant return data + Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); + + updateLiveStateFromLocal(); + + // Transaction was sent to the server so keep track of it to prevent double commit + if (transaction->getSequenceNumber() != -1) { + offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); + } + } + + // The data to send back + int returnDataSize = 0; + Vector *unseenArbitrations = new Vector(); + + // Get the aborts to send back + Vector *abortLocalSequenceNumbers = new Vector(); + { + SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); + while (abortit->hasNext()) + abortLocalSequenceNumbers->add(abortit->next()); + delete abortit; + } + + qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint asize = abortLocalSequenceNumbers->size(); + for (uint i = 0; i < asize; i++) { + int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i); + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); + unseenArbitrations->add(abort); + returnDataSize += abort->getSize(); + } + + // Get the commits to send back + Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); + if (commitForClientTable != NULL) { + Vector *commitLocalSequenceNumbers = new Vector(); + { + SetIterator *commitit = getKeyIterator(commitForClientTable); + while (commitit->hasNext()) + commitLocalSequenceNumbers->add(commitit->next()); + delete commitit; + } + qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + uint clsSize = commitLocalSequenceNumbers->size(); + for (uint clsi = 0; clsi < clsSize; clsi++) { + int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); + Commit *commit = commitForClientTable->get(localSequenceNumber); + + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + { + Vector *parts = commit->getParts(); + uint nParts = parts->size(); + for (uint i = 0; i < nParts; i++) { + CommitPart *commitPart = parts->get(i); + unseenArbitrations->add(commitPart); + returnDataSize += commitPart->getSize(); + } + } + } + } + + // Number of arbitration entries to decode + returnDataSize += 2 * sizeof(int32_t); + + // bool of did commit or not + if (numberOfParts != 0) { + returnDataSize += sizeof(char); + } + + // Data to send Back + Array *returnData = new Array(returnDataSize); + ByteBuffer *bbEncode = ByteBuffer_wrap(returnData); + + if (numberOfParts != 0) { + if (didCommit) { + bbEncode->put((char)1); + } else { + bbEncode->put((char)0); + } + if (couldArbitrate) { + bbEncode->put((char)1); + } else { + bbEncode->put((char)0); + } + } + + bbEncode->putInt(unseenArbitrations->size()); + uint size = unseenArbitrations->size(); + for (uint i = 0; i < size; i++) { + Entry *entry = unseenArbitrations->get(i); + entry->encode(bbEncode); + } + + localSequenceNumber++; + return returnData; +} + +/** Checks whether a given slot was sent using new slots in + array. Returns true if sent and false otherwise. */ + +bool Table::checkSend(Array * array, Slot *checkSlot) { + uint size = array->length(); + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { + return true; + } + } + + //Also need to see if other machines acknowledged our message + for (uint i = 0; i < size; i++) { + Slot *s = array->get(i); + + // Process each entry in the slot + Vector *entries = s->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + + if (entry->getType() == TypeLastMessage) { + LastMessage *lastMessage = (LastMessage *)entry; + + if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { + return true; + } + } + } + } + //Not found + return false; +} + +/** Method tries to send slot to server. Returns status in tuple. + isInserted returns whether last un-acked send (if any) was + successful. Returns whether send was confirmed.x + */ + +bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array **array) { + attemptedToSendToServer = true; + + *array = cloud->putSlot(slot, newSize); + if (*array == NULL) { + *array = new Array(1); + (*array)->set(0, slot); + rejectedSlotVector->clear(); + *isInserted = false; + return true; + } else { + if ((*array)->length() == 0) { + throw new Error("Server Error: Did not send any slots"); + } + + if (hadPartialSendToServer) { + *isInserted = checkSend(*array, slot); + + if (!(*isInserted)) { + rejectedSlotVector->add(slot->getSequenceNumber()); + } + + return false; + } else { + rejectedSlotVector->add(slot->getSequenceNumber()); + *isInserted = false; + return false; + } + } +} + +/** + * Returns true if a resize was needed but not done. + */ +bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { + newSize = 0;//special value to indicate no resize + if (liveSlotCount > bufferResizeThreshold) { + resize = true;//Resize is forced + } + + if (resize) { + newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); + TableStatus *status = new TableStatus(slot, newSize); + slot->addShallowEntry(status); + } + + // Fill with rejected slots first before doing anything else + doRejectedMessages(slot); + + // Do mandatory rescue of entries + ThreeTuple mandatoryRescueReturn = doMandatoryRescue(slot, resize); + + // Extract working variables + bool needsResize = mandatoryRescueReturn.getFirst(); + bool seenLiveSlot = mandatoryRescueReturn.getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); + + if (needsResize && !resize) { + // We need to resize but we are not resizing so return true to force on retry + return true; + } + + insertedKey = false; + if (newKeyEntry != NULL) { + newKeyEntry->setSlot(slot); + if (slot->hasSpace(newKeyEntry)) { + slot->addEntry(newKeyEntry); + insertedKey = true; + } + } + + // Clear the transactions, aborts and commits that were sent previously + clearSentParts(); + uint size = pendingSendArbitrationRounds->size(); + for (uint i = 0; i < size; i++) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(i); + bool isFull = false; + round->generateParts(); + Vector *parts = round->getParts(); + + // Insert pending arbitration data + uint vsize = parts->size(); + for (uint vi = 0; vi < vsize; vi++) { + Entry *arbitrationData = parts->get(vi); + + // If it is an abort then we need to set some information + if (arbitrationData->getType() == TypeAbort) { + ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); + } + + if (!slot->hasSpace(arbitrationData)) { + // No space so cant do anything else with these data entries + isFull = true; + break; + } + + // Add to this current slot and add it to entries to delete + slot->addEntry(arbitrationData); + pendingSendArbitrationEntriesToDelete->add(arbitrationData); + } + + if (isFull) { + break; + } + } + + if (pendingTransactionQueue->size() > 0) { + Transaction *transaction = pendingTransactionQueue->get(0); + // Set the transaction sequence number if it has yet to be inserted into the block chain + if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) { + transaction->setSequenceNumber(slot->getSequenceNumber()); + } + + while (true) { + TransactionPart *part = transaction->getNextPartToSend(); + if (part == NULL) { + // Ran out of parts to send for this transaction so move on + break; + } + + if (slot->hasSpace(part)) { + slot->addEntry(part); + Vector *partsSent = transactionPartsSent->get(transaction); + if (partsSent == NULL) { + partsSent = new Vector(); + transactionPartsSent->put(transaction, partsSent); + } + partsSent->add(part->getPartNumber()); + transactionPartsSent->put(transaction, partsSent); + } else { + break; + } + } + } + + // Fill the remainder of the slot with rescue data + doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); + + return false; +} + +void Table::doRejectedMessages(Slot *s) { + if (!rejectedSlotVector->isEmpty()) { + /* TODO: We should avoid generating a rejected message entry if + * there is already a sufficient entry in the queue (e->g->, + * equalsto value of true and same sequence number)-> */ + + int64_t old_seqn = rejectedSlotVector->get(0); + if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { + int64_t new_seqn = rejectedSlotVector->lastElement(); + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); + s->addShallowEntry(rm); + } else { + int64_t prev_seqn = -1; + uint i = 0; + /* Go through list of missing messages */ + for (; i < rejectedSlotVector->size(); i++) { + int64_t curr_seqn = rejectedSlotVector->get(i); + Slot *s_msg = buffer->getSlot(curr_seqn); + if (s_msg != NULL) + break; + prev_seqn = curr_seqn; + } + /* Generate rejected message entry for missing messages */ + if (prev_seqn != -1) { + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); + s->addShallowEntry(rm); + } + /* Generate rejected message entries for present messages */ + for (; i < rejectedSlotVector->size(); i++) { + int64_t curr_seqn = rejectedSlotVector->get(i); + Slot *s_msg = buffer->getSlot(curr_seqn); + int64_t machineid = s_msg->getMachineID(); + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); + s->addShallowEntry(rm); + } + } + } +} + +ThreeTuple Table::doMandatoryRescue(Slot *slot, bool resize) { + int64_t newestSequenceNumber = buffer->getNewestSeqNum(); + int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); + if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { + oldestLiveSlotSequenceNumver = oldestSequenceNumber; + } + + int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; + bool seenLiveSlot = false; + int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full + int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point + + + // Mandatory Rescue + for (; currentSequenceNumber < threshold; currentSequenceNumber++) { + Slot *previousSlot = buffer->getSlot(currentSequenceNumber); + // Push slot number forward + if (!seenLiveSlot) { + oldestLiveSlotSequenceNumver = currentSequenceNumber; + } + + if (!previousSlot->isLive()) { + continue; + } + + // We have seen a live slot + seenLiveSlot = true; + + // Get all the live entries for a slot + Vector *liveEntries = previousSlot->getLiveEntries(resize); + + // Iterate over all the live entries and try to rescue them + uint lESize = liveEntries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveEntry = liveEntries->get(i); + if (slot->hasSpace(liveEntry)) { + // Enough space to rescue the entry + slot->addEntry(liveEntry); + } else if (currentSequenceNumber == firstIfFull) { + //if there's no space but the entry is about to fall off the queue + return ThreeTuple(true, seenLiveSlot, currentSequenceNumber); + } + } + } + + // Did not resize + return ThreeTuple(false, seenLiveSlot, currentSequenceNumber); +} + +void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) { + /* now go through live entries from least to greatest sequence number until + * either all live slots added, or the slot doesn't have enough room + * for SKIP_THRESHOLD consecutive entries*/ + int skipcount = 0; + int64_t newestseqnum = buffer->getNewestSeqNum(); + for (; seqn <= newestseqnum; seqn++) { + Slot *prevslot = buffer->getSlot(seqn); + //Push slot number forward + if (!seenliveslot) + oldestLiveSlotSequenceNumver = seqn; + + if (!prevslot->isLive()) + continue; + seenliveslot = true; + Vector *liveentries = prevslot->getLiveEntries(resize); + uint lESize = liveentries->size(); + for (uint i = 0; i < lESize; i++) { + Entry *liveentry = liveentries->get(i); + if (s->hasSpace(liveentry)) + s->addEntry(liveentry); + else { + skipcount++; + if (skipcount > Table_SKIP_THRESHOLD) { + delete liveentries; + goto donesearch; + } + } + } + delete liveentries; + } +donesearch: + ; +} + +/** + * Checks for malicious activity and updates the local copy of the block chain-> + */ +void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { + // The cloud communication layer has checked slot HMACs already + // before decoding + if (newSlots->length() == 0) { + return; + } + + // Make sure all slots are newer than the last largest slot this + // client has seen + int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber(); + if (firstSeqNum <= sequenceNumber) { + throw new Error("Server Error: Sent older slots!"); + } + + // Create an object that can access both new slots and slots in our + // local chain without committing slots to our local chain + SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); + + // Check that the HMAC chain is not broken + checkHMACChain(indexer, newSlots); + + // Set to keep track of messages from clients + Hashset *machineSet = new Hashset(); + { + SetIterator *> *lmit = getKeyIterator(lastMessageTable); + while (lmit->hasNext()) + machineSet->add(lmit->next()); + delete lmit; + } + + // Process each slots data + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); + processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); + } + } + delete indexer; + + // If there is a gap, check to see if the server sent us + // everything-> + if (firstSeqNum != (sequenceNumber + 1)) { + + // Check the size of the slots that were sent down by the server-> + // Can only check the size if there was a gap + checkNumSlots(newSlots->length()); + + // Since there was a gap every machine must have pushed a slot or + // must have a last message message-> If not then the server is + // hiding slots + if (!machineSet->isEmpty()) { + delete machineSet; + throw new Error("Missing record for machines: "); + } + } + delete machineSet; + // Update the size of our local block chain-> + commitNewMaxSize(); + + // Commit new to slots to the local block chain-> + { + uint numSlots = newSlots->length(); + for (uint i = 0; i < numSlots; i++) { + Slot *slot = newSlots->get(i); + + // Insert this slot into our local block chain copy-> + buffer->putSlot(slot); + + // Keep track of how many slots are currently live (have live data + // in them)-> + liveSlotCount++; + } + } + // Get the sequence number of the latest slot in the system + sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); + updateLiveStateFromServer(); + + // No Need to remember after we pulled from the server + offlineTransactionsCommittedAndAtServer->clear(); + + // This is invalidated now + hadPartialSendToServer = false; +} + +void Table::updateLiveStateFromServer() { + // Process the new transaction parts + processNewTransactionParts(); + + // Do arbitration on new transactions that were received + arbitrateFromServer(); + + // Update all the committed keys + bool didCommitOrSpeculate = updateCommittedTable(); + + // Delete the transactions that are now dead + updateLiveTransactionsAndStatus(); + + // Do speculations + didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); + updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); +} + +void Table::updateLiveStateFromLocal() { + // Update all the committed keys + bool didCommitOrSpeculate = updateCommittedTable(); + + // Delete the transactions that are now dead + updateLiveTransactionsAndStatus(); + + // Do speculations + didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); + updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); +} + +void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { + int64_t prevslots = firstSequenceNumber; + + if (didFindTableStatus) { + } else { + expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; + } + + didFindTableStatus = true; + currMaxSize = numberOfSlots; +} + +void Table::updateExpectedSize() { + expectedsize++; + + if (expectedsize > currMaxSize) { + expectedsize = currMaxSize; + } +} + + +/** + * Check the size of the block chain to make sure there are enough + * slots sent back by the server-> This is only called when we have a + * gap between the slots that we have locally and the slots sent by + * the server therefore in the slots sent by the server there will be + * at least 1 Table status message + */ +void Table::checkNumSlots(int numberOfSlots) { + if (numberOfSlots != expectedsize) { + throw new Error("Server Error: Server did not send all slots-> Expected: "); + } +} + +/** + * Update the size of of the local buffer if it is needed-> + */ +void Table::commitNewMaxSize() { + didFindTableStatus = false; + + // Resize the local slot buffer + if (numberOfSlots != currMaxSize) { + buffer->resize((int32_t)currMaxSize); + } + + // Change the number of local slots to the new size + numberOfSlots = (int32_t)currMaxSize; + + // Recalculate the resize threshold since the size of the local + // buffer has changed + setResizeThreshold(); +} + +/** + * Process the new transaction parts from this latest round of slots + * received from the server + */ +void Table::processNewTransactionParts() { + + if (newTransactionParts->size() == 0) { + // Nothing new to process + return; + } + + // Iterate through all the machine Ids that we received new parts + // for + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); + while (tpit->hasNext()) { + int64_t machineId = tpit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal(); + + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); + // Iterate through all the parts for that machine Id + while (ptit->hasNext()) { + Pair *partId = ptit->next(); + TransactionPart *part = parts->get(partId); + + if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); + if (lastTransactionNumber >= part->getSequenceNumber()) { + // Set dead the transaction part + part->setDead(); + part->releaseRef(); + continue; + } + } + + // Get the transaction object for that sequence number + Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); + + if (transaction == NULL) { + // This is a new transaction that we dont have so make a new one + transaction = new Transaction(); + + // Add that part to the transaction + transaction->addPartDecode(part); + + // Insert this new transaction into the live tables + liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); + liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); + } + part->releaseRef(); + } + delete ptit; + } + delete tpit; + // Clear all the new transaction parts in preparation for the next + // time the server sends slots + { + SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); + delete parts; + } + delete partsit; + newTransactionParts->clear(); + } +} + +void Table::arbitrateFromServer() { + if (liveTransactionBySequenceNumberTable->size() == 0) { + // Nothing to arbitrate on so move on + return; + } + + // Get the transaction sequence numbers and sort from oldest to newest + Vector *transactionSequenceNumbers = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbers->add(trit->next()); + delete trit; + } + qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + // Collection of key value pairs that are + Hashtable *speculativeTableTmp = new Hashtable(); + + // The last transaction arbitrated on + int64_t lastTransactionCommitted = -1; + Hashset *generatedAborts = new Hashset(); + uint tsnSize = transactionSequenceNumbers->size(); + for (uint i = 0; i < tsnSize; i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); + + // Check if this machine arbitrates for this transaction if not + // then we cant arbitrate this transaction + if (transaction->getArbitrator() != localMachineId) { + continue; + } + + if (transactionSequenceNumber < lastSeqNumArbOn) { + continue; + } + + if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) { + // We have seen this already locally so dont commit again + continue; + } + + if (!transaction->isComplete()) { + // Will arbitrate in incorrect order if we continue so just break + // Most likely this + break; + } + + // update the largest transaction seen by arbitrator from server + if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { + lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); + } else { + int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); + if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { + lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); + } + } + + if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { + // Guard evaluated as true + // Update the local changes so we can make the commit + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculativeTableTmp->put(kv->getKey(), kv); + } + delete kvit; + + // Update what the last transaction committed was for use in batch commit + lastTransactionCommitted = transactionSequenceNumber; + } else { + // Guard evaluated was false so create abort + // Create the abort + Abort *newAbort = new Abort(NULL, + transaction->getClientLocalSequenceNumber(), + transaction->getSequenceNumber(), + transaction->getMachineId(), + transaction->getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + generatedAborts->add(newAbort); + + // Insert the abort so we can process + processEntry(newAbort); + } + + lastSeqNumArbOn = transactionSequenceNumber; + } + + delete transactionSequenceNumbers; + + Commit *newCommit = NULL; + + // If there is something to commit + if (speculativeTableTmp->size() != 0) { + // Create the commit and increment the commit sequence number + newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); + localArbitrationSequenceNumber++; + + // Add all the new keys to the commit + SetIterator *spit = getKeyIterator(speculativeTableTmp); + while (spit->hasNext()) { + IoTString *string = spit->next(); + KeyValue *kv = speculativeTableTmp->get(string); + newCommit->addKV(kv); + } + delete spit; + + // create the commit parts + newCommit->createCommitParts(); + + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server + // Insert the commit so we can process it + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); + processEntry(commitPart); + } + } + delete speculativeTableTmp; + + if ((newCommit != NULL) || (generatedAborts->size() > 0)) { + ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); + pendingSendArbitrationRounds->add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + if (newArbitrationRound->getCommit() != NULL) { + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); + processEntry(commitPart); + } + } + } + } else { + delete generatedAborts; + } +} + +Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { + + // Check if this machine arbitrates for this transaction if not then + // we cant arbitrate this transaction + if (transaction->getArbitrator() != localMachineId) { + return Pair(false, false); + } + + if (!transaction->isComplete()) { + // Will arbitrate in incorrect order if we continue so just break + // Most likely this + return Pair(false, false); + } + + if (transaction->getMachineId() != localMachineId) { + // dont do this check for local transactions + if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { + if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { + // We've have already seen this from the server + return Pair(false, false); + } + } + } + + if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { + // Guard evaluated as true Create the commit and increment the + // commit sequence number + Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); + localArbitrationSequenceNumber++; + + // Update the local changes so we can make the commit + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + newCommit->addKV(kv); + } + delete kvit; + + // create the commit parts + newCommit->createCommitParts(); + + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server + ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); + pendingSendArbitrationRounds->add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); + processEntry(commitPart); + } + } else { + // Insert the commit so we can process it + Vector *parts = newCommit->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); + processEntry(commitPart); + } + } + + if (transaction->getMachineId() == localMachineId) { + TransactionStatus *status = transaction->getTransactionStatus(); + if (status != NULL) { + status->setStatus(TransactionStatus_StatusCommitted); + } + } + + updateLiveStateFromLocal(); + return Pair(true, true); + } else { + if (transaction->getMachineId() == localMachineId) { + // For locally created messages update the status + // Guard evaluated was false so create abort + TransactionStatus *status = transaction->getTransactionStatus(); + if (status != NULL) { + status->setStatus(TransactionStatus_StatusAborted); + } + } else { + Hashset *addAbortSet = new Hashset(); + + // Create the abort + Abort *newAbort = new Abort(NULL, + transaction->getClientLocalSequenceNumber(), + -1, + transaction->getMachineId(), + transaction->getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + addAbortSet->add(newAbort); + + // Append all the commit parts to the end of the pending queue + // waiting for sending to the server + ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); + pendingSendArbitrationRounds->add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + + Vector *parts = newArbitrationRound->getCommit()->getParts(); + uint partsSize = parts->size(); + for (uint i = 0; i < partsSize; i++) { + CommitPart *commitPart = parts->get(i); + processEntry(commitPart); + } + } + } + + updateLiveStateFromLocal(); + return Pair(true, false); + } +} + +/** + * Compacts the arbitration data by merging commits and aggregating + * aborts so that a single large push of commits can be done instead + * of many small updates + */ +bool Table::compactArbitrationData() { + if (pendingSendArbitrationRounds->size() < 2) { + // Nothing to compact so do nothing + return false; + } + + ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + if (lastRound->getDidSendPart()) { + return false; + } + + bool hadCommit = (lastRound->getCommit() == NULL); + bool gotNewCommit = false; + + uint numberToDelete = 1; + + while (numberToDelete < pendingSendArbitrationRounds->size()) { + ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + + if (round->isFull() || round->getDidSendPart()) { + // Stop since there is a part that cannot be compacted and we + // need to compact in order + break; + } + + if (round->getCommit() == NULL) { + // Try compacting aborts only + int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); + if (newSize > ArbitrationRound_MAX_PARTS) { + // Cant compact since it would be too large + break; + } + lastRound->addAborts(round->getAborts()); + } else { + // Create a new larger commit + Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + // Create the commit parts so that we can count them + newCommit->createCommitParts(); + + // Calculate the new size of the parts + int newSize = newCommit->getNumberOfParts(); + newSize += lastRound->getAbortsCount(); + newSize += round->getAbortsCount(); + + if (newSize > ArbitrationRound_MAX_PARTS) { + // Can't compact since it would be too large + if (lastRound->getCommit() != newCommit && + round->getCommit() != newCommit) + delete newCommit; + break; + } + // Set the new compacted part + if (lastRound->getCommit() == newCommit) + lastRound->setCommit(NULL); + if (round->getCommit() == newCommit) + round->setCommit(NULL); + + if (lastRound->getCommit() != NULL) { + Commit * oldcommit = lastRound->getCommit(); + lastRound->setCommit(NULL); + delete oldcommit; + } + lastRound->setCommit(newCommit); + lastRound->addAborts(round->getAborts()); + gotNewCommit = true; + } + + numberToDelete++; + } + + if (numberToDelete != 1) { + // If there is a compaction + // Delete the previous pieces that are now in the new compacted piece + for (uint i = 2; i <= numberToDelete; i++) { + delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); + } + pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); + + pendingSendArbitrationRounds->add(lastRound); + + // Should reinsert into the commit processor + if (hadCommit && gotNewCommit) { + return true; + } + } + + return false; +} + +/** + * Update all the commits and the committed tables, sets dead the dead + * transactions + */ +bool Table::updateCommittedTable() { + if (newCommitParts->size() == 0) { + // Nothing new to process + return false; + } + + // Iterate through all the machine Ids that we received new parts for + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); + while (partsit->hasNext()) { + int64_t machineId = partsit->next(); + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); + + // Iterate through all the parts for that machine Id + SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); + while (pairit->hasNext()) { + Pair *partId = pairit->next(); + CommitPart *part = pairit->currVal(); + + // Get the transaction object for that sequence number + Hashtable *commitForClientTable = liveCommitsTable->get(part->getMachineId()); + + if (commitForClientTable == NULL) { + // This is the first commit from this device + commitForClientTable = new Hashtable(); + liveCommitsTable->put(part->getMachineId(), commitForClientTable); + } + + Commit *commit = commitForClientTable->get(part->getSequenceNumber()); + + if (commit == NULL) { + // This is a new commit that we dont have so make a new one + commit = new Commit(); + + // Insert this new commit into the live tables + commitForClientTable->put(part->getSequenceNumber(), commit); + } + + // Add that part to the commit + commit->addPartDecode(part); + part->releaseRef(); + } + delete pairit; + delete parts; + } + delete partsit; + + // Clear all the new commits parts in preparation for the next time + // the server sends slots + newCommitParts->clear(); + + // If we process a new commit keep track of it for future use + bool didProcessANewCommit = false; + + // Process the commits one by one + SetIterator *> *liveit = getKeyIterator(liveCommitsTable); + while (liveit->hasNext()) { + int64_t arbitratorId = liveit->next(); + // Get all the commits for a specific arbitrator + Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); + + // Sort the commits in order + Vector *commitSequenceNumbers = new Vector(); + { + SetIterator *clientit = getKeyIterator(commitForClientTable); + while (clientit->hasNext()) + commitSequenceNumbers->add(clientit->next()); + delete clientit; + } + + qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); + + // Get the last commit seen from this arbitrator + int64_t lastCommitSeenSequenceNumber = -1; + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { + lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); + } + + // Go through each new commit one by one + for (uint i = 0; i < commitSequenceNumbers->size(); i++) { + int64_t commitSequenceNumber = commitSequenceNumbers->get(i); + Commit *commit = commitForClientTable->get(commitSequenceNumber); + // Special processing if a commit is not complete + if (!commit->isComplete()) { + if (i == (commitSequenceNumbers->size() - 1)) { + // If there is an incomplete commit and this commit is the + // latest one seen then this commit cannot be processed and + // there are no other commits + break; + } else { + // This is a commit that was already dead but parts of it + // are still in the block chain (not flushed out yet)-> + // Delete it and move on + commit->setDead(); + commitForClientTable->remove(commit->getSequenceNumber()); + delete commit; + continue; + } + } + + // Update the last transaction that was updated if we can + if (commit->getTransactionSequenceNumber() != -1) { + // Update the last transaction sequence number that the arbitrator arbitrated on1 + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { + lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); + } + } + + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) { + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); + if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); + } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); + } + + // We have already seen this commit before so need to do the + // full processing on this commit + if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { + // Update the last transaction that was updated if we can + if (commit->getTransactionSequenceNumber() != -1) { + int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { + lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); + } + } + continue; + } + + // If we got here then this is a brand new commit and needs full + // processing + // Get what commits should be edited, these are the commits that + // have live values for their keys + Hashset *commitsToEdit = new Hashset(); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); + if (commit != NULL) + commitsToEdit->add(commit); + } + delete kvit; + } + + // Update each previous commit that needs to be updated + SetIterator *commitit = commitsToEdit->iterator(); + while (commitit->hasNext()) { + Commit *previousCommit = commitit->next(); + + // Only bother with live commits (TODO: Maybe remove this check) + if (previousCommit->isLive()) { + + // Update which keys in the old commits are still live + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + previousCommit->invalidateKey(kv->getKey()); + } + delete kvit; + } + + // if the commit is now dead then remove it + if (!previousCommit->isLive()) { + commitForClientTable->remove(previousCommit->getSequenceNumber()); + delete previousCommit; + } + } + } + delete commitit; + delete commitsToEdit; + + // Update the last seen sequence number from this arbitrator + if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { + if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { + lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); + } + } else { + lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); + } + + // We processed a new commit that we havent seen before + didProcessANewCommit = true; + + // Update the committed table of keys and which commit is using which key + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); + } + delete kvit; + } + } + delete commitSequenceNumbers; + } + delete liveit; + + return didProcessANewCommit; +} + +/** + * Create the speculative table from transactions that are still live + * and have come from the cloud + */ +bool Table::updateSpeculativeTable(bool didProcessNewCommits) { + if (liveTransactionBySequenceNumberTable->size() == 0) { + // There is nothing to speculate on + return false; + } + + // Create a list of the transaction sequence numbers and sort them + // from oldest to newest + Vector *transactionSequenceNumbersSorted = new Vector(); + { + SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); + while (trit->hasNext()) + transactionSequenceNumbersSorted->add(trit->next()); + delete trit; + } + + qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); + + bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; + + + if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { + // If there is a gap in the transaction sequence numbers then + // there was a commit or an abort of a transaction OR there was a + // new commit (Could be from offline commit) so a redo the + // speculation from scratch + + // Start from scratch + speculatedKeyValueTable->clear(); + lastTransactionSequenceNumberSpeculatedOn = -1; + oldestTransactionSequenceNumberSpeculatedOn = -1; + } + + // Remember the front of the transaction list + oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); + + // Find where to start arbitration from + uint startIndex = 0; + + for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) + if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) + break; + startIndex++; + + if (startIndex >= transactionSequenceNumbersSorted->size()) { + // Make sure we are not out of bounds + delete transactionSequenceNumbersSorted; + return false; // did not speculate + } + + Hashset *incompleteTransactionArbitrator = new Hashset(); + bool didSkip = true; + + for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { + int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); + + if (!transaction->isComplete()) { + // If there is an incomplete transaction then there is nothing + // we can do add this transactions arbitrator to the list of + // arbitrators we should ignore + incompleteTransactionArbitrator->add(transaction->getArbitrator()); + didSkip = true; + continue; + } + + if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) { + continue; + } + + lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; + + if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { + // Guard evaluated to true so update the speculative table + { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; + } + } + } + + delete transactionSequenceNumbersSorted; + + if (didSkip) { + // Since there was a skip we need to redo the speculation next time around + lastTransactionSequenceNumberSpeculatedOn = -1; + oldestTransactionSequenceNumberSpeculatedOn = -1; + } + + // We did some speculation + return true; +} + +/** + * Create the pending transaction speculative table from transactions + * that are still in the pending transaction buffer + */ +void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { + if (pendingTransactionQueue->size() == 0) { + // There is nothing to speculate on + return; + } + + if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) { + // need to reset on the pending speculation + lastPendingTransactionSpeculatedOn = NULL; + firstPendingTransaction = pendingTransactionQueue->get(0); + pendingTransactionSpeculatedKeyValueTable->clear(); + } + + // Find where to start arbitration from + uint startIndex = 0; + + for (; startIndex < pendingTransactionQueue->size(); startIndex++) + if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) + break; + + if (startIndex >= pendingTransactionQueue->size()) { + // Make sure we are not out of bounds + return; + } + + for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) { + Transaction *transaction = pendingTransactionQueue->get(i); + + lastPendingTransactionSpeculatedOn = transaction; + + if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { + // Guard evaluated to true so update the speculative table + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; + } + } +} + +/** + * Set dead and remove from the live transaction tables the + * transactions that are dead + */ +void Table::updateLiveTransactionsAndStatus() { + // Go through each of the transactions + { + SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); + while (iter->hasNext()) { + int64_t key = iter->next(); + Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) + && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { + // Set dead the transaction + transaction->setDead(); + + // Remove the transaction from the live table + iter->remove(); + liveTransactionByTransactionIdTable->remove(transaction->getId()); + delete transaction; + } + } + delete iter; + } + + // Go through each of the transactions + { + SetIterator *iter = getKeyIterator(outstandingTransactionStatus); + while (iter->hasNext()) { + int64_t key = iter->next(); + TransactionStatus *status = outstandingTransactionStatus->get(key); + + // Check if the transaction is dead + if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) + && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { + // Set committed + status->setStatus(TransactionStatus_StatusCommitted); + + // Remove + iter->remove(); + } + } + delete iter; + } +} + +/** + * Process this slot, entry by entry-> Also update the latest message sent by slot + */ +void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet) { + + // Update the last message seen + updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); + + // Process each entry in the slot + Vector *entries = slot->getEntries(); + uint eSize = entries->size(); + for (uint ei = 0; ei < eSize; ei++) { + Entry *entry = entries->get(ei); + switch (entry->getType()) { + case TypeCommitPart: + processEntry((CommitPart *)entry); + break; + case TypeAbort: + processEntry((Abort *)entry); + break; + case TypeTransactionPart: + processEntry((TransactionPart *)entry); + break; + case TypeNewKey: + processEntry((NewKey *)entry); + break; + case TypeLastMessage: + processEntry((LastMessage *)entry, machineSet); + break; + case TypeRejectedMessage: + processEntry((RejectedMessage *)entry, indexer); + break; + case TypeTableStatus: + processEntry((TableStatus *)entry, slot->getSequenceNumber()); + break; + default: + throw new Error("Unrecognized type: "); + } + } +} + +/** + * Update the last message that was sent for a machine Id + */ +void Table::processEntry(LastMessage *entry, Hashset *machineSet) { + // Update what the last message received by a machine was + updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet); +} + +/** + * Add the new key to the arbitrators table and update the set of live + * new keys (in case of a rescued new key message) + */ +void Table::processEntry(NewKey *entry) { + // Update the arbitrator table with the new key information + arbitratorTable->put(entry->getKey(), entry->getMachineID()); + + // Update what the latest live new key is + NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry); + if (oldNewKey != NULL) { + // Delete the old new key messages + oldNewKey->setDead(); + } +} + +/** + * Process new table status entries and set dead the old ones as new + * ones come in-> keeps track of the largest and smallest table status + * seen in this current round of updating the local copy of the block + * chain + */ +void Table::processEntry(TableStatus *entry, int64_t seq) { + int newNumSlots = entry->getMaxSlots(); + updateCurrMaxSize(newNumSlots); + initExpectedSize(seq, newNumSlots); + + if (liveTableStatus != NULL) { + // We have a larger table status so the old table status is no + // int64_ter alive + liveTableStatus->setDead(); + } + + // Make this new table status the latest alive table status + liveTableStatus = entry; +} + +/** + * Check old messages to see if there is a block chain violation-> + * Also + */ +void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { + int64_t oldSeqNum = entry->getOldSeqNum(); + int64_t newSeqNum = entry->getNewSeqNum(); + bool isequal = entry->getEqual(); + int64_t machineId = entry->getMachineID(); + int64_t seq = entry->getSequenceNumber(); + + // Check if we have messages that were supposed to be rejected in + // our local block chain + for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { + // Get the slot + Slot *slot = indexer->getSlot(seqNum); + + if (slot != NULL) { + // If we have this slot make sure that it was not supposed to be + // a rejected slot + int64_t slotMachineId = slot->getMachineID(); + if (isequal != (slotMachineId == machineId)) { + throw new Error("Server Error: Trying to insert rejected message for slot "); + } + } + } + + // Create a list of clients to watch until they see this rejected + // message entry-> + Hashset *deviceWatchSet = new Hashset(); + SetIterator *> *iter = getKeyIterator(lastMessageTable); + while (iter->hasNext()) { + // Machine ID for the last message entry + int64_t lastMessageEntryMachineId = iter->next(); + + // We've seen it, don't need to continue to watch-> Our next + // message will implicitly acknowledge it-> + if (lastMessageEntryMachineId == localMachineId) { + continue; + } + + Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); + int64_t entrySequenceNumber = lastMessageValue->getFirst(); + + if (entrySequenceNumber < seq) { + // Add this rejected message to the set of messages that this + // machine ID did not see yet + addWatchVector(lastMessageEntryMachineId, entry); + // This client did not see this rejected message yet so add it + // to the watch set to monitor + deviceWatchSet->add(lastMessageEntryMachineId); + } + } + delete iter; + + if (deviceWatchSet->isEmpty()) { + // This rejected message has been seen by all the clients so + entry->setDead(); + delete deviceWatchSet; + } else { + // We need to watch this rejected message + entry->setWatchSet(deviceWatchSet); + } +} + +/** + * Check if this abort is live, if not then save it so we can kill it + * later-> update the last transaction number that was arbitrated on-> + */ +void Table::processEntry(Abort *entry) { + if (entry->getTransactionSequenceNumber() != -1) { + // update the transaction status if it was sent to the server + TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); + if (status != NULL) { + status->setStatus(TransactionStatus_StatusAborted); + } + } + + // Abort has not been seen by the client it is for yet so we need to + // keep track of it + + Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); + if (previouslySeenAbort != NULL) { + previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version + } + + if (entry->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); + } + + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { + // The machine already saw this so it is dead + entry->setDead(); + Pair abortid = entry->getAbortId(); + liveAbortTable->remove(&abortid); + + if (entry->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); + } + return; + } + + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { + int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); + if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); + } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); + } + + // Set dead a transaction if we can + Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); + + Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); + if (transactionToSetDead != NULL) { + liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); + } + + // Update the last transaction sequence number that the arbitrator + // arbitrated on + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || + (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { + // Is a valid one + if (entry->getTransactionSequenceNumber() != -1) { + lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); + } + } +} + +/** + * Set dead the transaction part if that transaction is dead and keep + * track of all new parts + */ +void Table::processEntry(TransactionPart *entry) { + // Check if we have already seen this transaction and set it dead OR + // if it is not alive + if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { + // This transaction is dead, it was already committed or aborted + entry->setDead(); + return; + } + + // This part is still alive + Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); + + if (transactionPart == NULL) { + // Dont have a table for this machine Id yet so make one + transactionPart = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts->put(entry->getMachineId(), transactionPart); + } + + // Update the part and set dead ones we have already seen (got a + // rescued version) + entry->acquireRef(); + TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); + if (previouslySeenPart != NULL) { + previouslySeenPart->releaseRef(); + previouslySeenPart->setDead(); + } +} + +/** + * Process new commit entries and save them for future use-> Delete duplicates + */ +void Table::processEntry(CommitPart *entry) { + // Update the last transaction that was updated if we can + if (entry->getTransactionSequenceNumber() != -1) { + if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || + lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { + lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); + } + } + + Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); + if (commitPart == NULL) { + // Don't have a table for this machine Id yet so make one + commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newCommitParts->put(entry->getMachineId(), commitPart); + } + // Update the part and set dead ones we have already seen (got a + // rescued version) + entry->acquireRef(); + CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); + if (previouslySeenPart != NULL) { + previouslySeenPart->setDead(); + previouslySeenPart->releaseRef(); + } +} + +/** + * Update the last message seen table-> Update and set dead the + * appropriate RejectedMessages as clients see them-> Updates the live + * aborts, removes those that are dead and sets them dead-> Check that + * the last message seen is correct and that there is no mismatch of + * our own last message or that other clients have not had a rollback + * on the last message-> + */ +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { + // We have seen this machine ID + machineSet->remove(machineId); + + // Get the set of rejected messages that this machine Id is has not seen yet + Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); + // If there is a rejected message that this machine Id has not seen yet + if (watchset != NULL) { + // Go through each rejected message that this machine Id has not + // seen yet + + SetIterator *rmit = watchset->iterator(); + while (rmit->hasNext()) { + RejectedMessage *rm = rmit->next(); + // If this machine Id has seen this rejected message->->-> + if (rm->getSequenceNumber() <= seqNum) { + // Remove it from our watchlist + rmit->remove(); + // Decrement machines that need to see this notification + rm->removeWatcher(machineId); + } + } + delete rmit; + } + + // Set dead the abort + SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); + + while (abortit->hasNext()) { + Pair *key = abortit->next(); + Abort *abort = liveAbortTable->get(key); + if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { + abort->setDead(); + abortit->remove(); + if (abort->getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); + } + } + } + delete abortit; + if (machineId == localMachineId) { + // Our own messages are immediately dead-> + char livenessType = liveness->getType(); + if (livenessType == TypeLastMessage) { + ((LastMessage *)liveness)->setDead(); + } else if (livenessType == TypeSlot) { + ((Slot *)liveness)->setDead(); + } else { + throw new Error("Unrecognized type"); + } + } + // Get the old last message for this device + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); + if (lastMessageEntry == NULL) { + // If no last message then there is nothing else to process + return; + } + + int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); + Liveness *lastEntry = lastMessageEntry->getSecond(); + delete lastMessageEntry; + + // If it is not our machine Id since we already set ours to dead + if (machineId != localMachineId) { + char lastEntryType = lastEntry->getType(); + + if (lastEntryType == TypeLastMessage) { + ((LastMessage *)lastEntry)->setDead(); + } else if (lastEntryType == TypeSlot) { + ((Slot *)lastEntry)->setDead(); + } else { + throw new Error("Unrecognized type"); + } + } + // Make sure the server is not playing any games + if (machineId == localMachineId) { + if (hadPartialSendToServer) { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: "); + } + } else { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed: "); + } + } + } else { + if (lastMessageSeqNum > seqNum) { + throw new Error("Server Error: Rollback on remote machine sequence number"); + } + } +} + +/** + * Add a rejected message entry to the watch set to keep track of + * which clients have seen that rejected message entry and which have + * not. + */ +void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { + Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); + if (entries == NULL) { + // There is no set for this machine ID yet so create one + entries = new Hashset(); + rejectedMessageWatchVectorTable->put(machineId, entries); + } + entries->add(entry); +} + +/** + * Check if the HMAC chain is not violated + */ +void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { + for (uint i = 0; i < newSlots->length(); i++) { + Slot *currSlot = newSlots->get(i); + Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); + if (prevSlot != NULL && + !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) + throw new Error("Server Error: Invalid HMAC Chain"); + } +}