X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=version2%2Fsrc%2FC%2FTable.cc;fp=version2%2Fsrc%2FC%2FTable.cc;h=0000000000000000000000000000000000000000;hb=786e40250f31eff04eec25bbcaae3cd916fedb14;hp=255ba3cf35bb3da7a02df7e59a9366fe88b472a0;hpb=3f24bffc82ebfe2730308b63100af08645316577;p=iotcloud.git diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc deleted file mode 100644 index 255ba3c..0000000 --- a/version2/src/C/Table.cc +++ /dev/null @@ -1,2863 +0,0 @@ -#include "Table.h" -#include "CloudComm.h" -#include "SlotBuffer.h" -#include "NewKey.h" -#include "Slot.h" -#include "KeyValue.h" -#include "Error.h" -#include "PendingTransaction.h" -#include "TableStatus.h" -#include "TransactionStatus.h" -#include "Transaction.h" -#include "LastMessage.h" -#include "SecureRandom.h" -#include "ByteBuffer.h" -#include "Abort.h" -#include "CommitPart.h" -#include "ArbitrationRound.h" -#include "TransactionPart.h" -#include "Commit.h" -#include "RejectedMessage.h" -#include "SlotIndexer.h" -#include - -int compareInt64(const void *a, const void *b) { - const int64_t *pa = (const int64_t *) a; - const int64_t *pb = (const int64_t *) b; - if (*pa < *pb) - return -1; - else if (*pa > *pb) - return 1; - else - return 0; -} - -Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : - buffer(NULL), - cloud(new CloudComm(this, baseurl, password, listeningPort)), - random(NULL), - liveTableStatus(NULL), - pendingTransactionBuilder(NULL), - lastPendingTransactionSpeculatedOn(NULL), - firstPendingTransaction(NULL), - numberOfSlots(0), - bufferResizeThreshold(0), - liveSlotCount(0), - oldestLiveSlotSequenceNumver(1), - localMachineId(_localMachineId), - sequenceNumber(0), - localSequenceNumber(0), - localTransactionSequenceNumber(0), - lastTransactionSequenceNumberSpeculatedOn(0), - oldestTransactionSequenceNumberSpeculatedOn(0), - localArbitrationSequenceNumber(0), - hadPartialSendToServer(false), - attemptedToSendToServer(false), - expectedsize(0), - didFindTableStatus(false), - currMaxSize(0), - lastSlotAttemptedToSend(NULL), - lastIsNewKey(false), - lastNewSize(0), - lastTransactionPartsSent(NULL), - lastNewKey(NULL), - committedKeyValueTable(NULL), - speculatedKeyValueTable(NULL), - pendingTransactionSpeculatedKeyValueTable(NULL), - liveNewKeyTable(NULL), - lastMessageTable(NULL), - rejectedMessageWatchVectorTable(NULL), - arbitratorTable(NULL), - liveAbortTable(NULL), - newTransactionParts(NULL), - newCommitParts(NULL), - lastArbitratedTransactionNumberByArbitratorTable(NULL), - liveTransactionBySequenceNumberTable(NULL), - liveTransactionByTransactionIdTable(NULL), - liveCommitsTable(NULL), - liveCommitsByKeyTable(NULL), - lastCommitSeenSequenceNumberByArbitratorTable(NULL), - rejectedSlotVector(NULL), - pendingTransactionQueue(NULL), - pendingSendArbitrationRounds(NULL), - pendingSendArbitrationEntriesToDelete(NULL), - transactionPartsSent(NULL), - outstandingTransactionStatus(NULL), - liveAbortsGeneratedByLocal(NULL), - offlineTransactionsCommittedAndAtServer(NULL), - localCommunicationTable(NULL), - lastTransactionSeenFromMachineFromServer(NULL), - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), - lastInsertedNewKey(false), - lastSeqNumArbOn(0) -{ - init(); -} - -Table::Table(CloudComm *_cloud, int64_t _localMachineId) : - buffer(NULL), - cloud(_cloud), - random(NULL), - liveTableStatus(NULL), - pendingTransactionBuilder(NULL), - lastPendingTransactionSpeculatedOn(NULL), - firstPendingTransaction(NULL), - numberOfSlots(0), - bufferResizeThreshold(0), - liveSlotCount(0), - oldestLiveSlotSequenceNumver(1), - localMachineId(_localMachineId), - sequenceNumber(0), - localSequenceNumber(0), - localTransactionSequenceNumber(0), - lastTransactionSequenceNumberSpeculatedOn(0), - oldestTransactionSequenceNumberSpeculatedOn(0), - localArbitrationSequenceNumber(0), - hadPartialSendToServer(false), - attemptedToSendToServer(false), - expectedsize(0), - didFindTableStatus(false), - currMaxSize(0), - lastSlotAttemptedToSend(NULL), - lastIsNewKey(false), - lastNewSize(0), - lastTransactionPartsSent(NULL), - lastNewKey(NULL), - committedKeyValueTable(NULL), - speculatedKeyValueTable(NULL), - pendingTransactionSpeculatedKeyValueTable(NULL), - liveNewKeyTable(NULL), - lastMessageTable(NULL), - rejectedMessageWatchVectorTable(NULL), - arbitratorTable(NULL), - liveAbortTable(NULL), - newTransactionParts(NULL), - newCommitParts(NULL), - lastArbitratedTransactionNumberByArbitratorTable(NULL), - liveTransactionBySequenceNumberTable(NULL), - liveTransactionByTransactionIdTable(NULL), - liveCommitsTable(NULL), - liveCommitsByKeyTable(NULL), - lastCommitSeenSequenceNumberByArbitratorTable(NULL), - rejectedSlotVector(NULL), - pendingTransactionQueue(NULL), - pendingSendArbitrationRounds(NULL), - pendingSendArbitrationEntriesToDelete(NULL), - transactionPartsSent(NULL), - outstandingTransactionStatus(NULL), - liveAbortsGeneratedByLocal(NULL), - offlineTransactionsCommittedAndAtServer(NULL), - localCommunicationTable(NULL), - lastTransactionSeenFromMachineFromServer(NULL), - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), - lastInsertedNewKey(false), - lastSeqNumArbOn(0) -{ - init(); -} - -Table::~Table() { - delete cloud; - delete random; - delete buffer; - // init data structs - delete committedKeyValueTable; - delete speculatedKeyValueTable; - delete pendingTransactionSpeculatedKeyValueTable; - delete liveNewKeyTable; - { - SetIterator *> *lmit = getKeyIterator(lastMessageTable); - while (lmit->hasNext()) { - Pair * pair = lastMessageTable->get(lmit->next()); - delete pair; - } - delete lmit; - delete lastMessageTable; - } - if (pendingTransactionBuilder != NULL) - delete pendingTransactionBuilder; - { - SetIterator *> *rmit = getKeyIterator(rejectedMessageWatchVectorTable); - while(rmit->hasNext()) { - int64_t machineid = rmit->next(); - Hashset * rmset = rejectedMessageWatchVectorTable->get(machineid); - SetIterator * mit = rmset->iterator(); - while (mit->hasNext()) { - RejectedMessage * rm = mit->next(); - delete rm; - } - delete mit; - delete rmset; - } - delete rmit; - delete rejectedMessageWatchVectorTable; - } - delete arbitratorTable; - delete liveAbortTable; - { - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); - while (partsit->hasNext()) { - int64_t machineId = partsit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); - while(pit->hasNext()) { - Pair * pair=pit->next(); - pit->currVal()->releaseRef(); - } - delete pit; - - delete parts; - } - delete partsit; - delete newTransactionParts; - } - { - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); - while (partsit->hasNext()) { - int64_t machineId = partsit->next(); - Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = partsit->currVal(); - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pit = getKeyIterator(parts); - while(pit->hasNext()) { - Pair * pair=pit->next(); - pit->currVal()->releaseRef(); - } - delete pit; - delete parts; - } - delete partsit; - delete newCommitParts; - } - delete lastArbitratedTransactionNumberByArbitratorTable; - delete liveTransactionBySequenceNumberTable; - delete liveTransactionByTransactionIdTable; - { - SetIterator *> *liveit = getKeyIterator(liveCommitsTable); - while (liveit->hasNext()) { - int64_t arbitratorId = liveit->next(); - - // Get all the commits for a specific arbitrator - Hashtable *commitForClientTable = liveit->currVal(); - { - SetIterator *clientit = getKeyIterator(commitForClientTable); - while (clientit->hasNext()) { - int64_t id = clientit->next(); - delete commitForClientTable->get(id); - } - delete clientit; - } - - delete commitForClientTable; - } - delete liveit; - delete liveCommitsTable; - } - delete liveCommitsByKeyTable; - delete lastCommitSeenSequenceNumberByArbitratorTable; - delete rejectedSlotVector; - { - uint size = pendingTransactionQueue->size(); - for (uint iter = 0; iter < size; iter++) { - delete pendingTransactionQueue->get(iter); - } - delete pendingTransactionQueue; - } - delete pendingSendArbitrationEntriesToDelete; - { - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - delete trit->currVal(); - } - delete trit; - delete transactionPartsSent; - } - delete outstandingTransactionStatus; - delete liveAbortsGeneratedByLocal; - delete offlineTransactionsCommittedAndAtServer; - delete localCommunicationTable; - delete lastTransactionSeenFromMachineFromServer; - { - for(uint i = 0; i < pendingSendArbitrationRounds->size(); i++) { - delete pendingSendArbitrationRounds->get(i); - } - delete pendingSendArbitrationRounds; - } - if (lastTransactionPartsSent != NULL) - delete lastTransactionPartsSent; - delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; - if (lastNewKey) - delete lastNewKey; -} - -/** - * Init all the stuff needed for for table usage - */ -void Table::init() { - // Init helper objects - random = new SecureRandom(); - buffer = new SlotBuffer(); - - // init data structs - committedKeyValueTable = new Hashtable(); - speculatedKeyValueTable = new Hashtable(); - pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable * >(); - rejectedMessageWatchVectorTable = new Hashtable * >(); - arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); - newTransactionParts = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); - newCommitParts = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); - lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); - liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); - liveCommitsTable = new Hashtable * >(); - liveCommitsByKeyTable = new Hashtable(); - lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); - rejectedSlotVector = new Vector(); - pendingTransactionQueue = new Vector(); - pendingSendArbitrationEntriesToDelete = new Vector(); - transactionPartsSent = new Hashtable *>(); - outstandingTransactionStatus = new Hashtable(); - liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset *, uintptr_t, 0, pairHashFunction, pairEquals>(); - localCommunicationTable = new Hashtable *>(); - lastTransactionSeenFromMachineFromServer = new Hashtable(); - pendingSendArbitrationRounds = new Vector(); - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); - - // Other init stuff - numberOfSlots = buffer->capacity(); - setResizeThreshold(); -} - -/** - * Initialize the table by inserting a table status as the first entry - * into the table status also initialize the crypto stuff. - */ -void Table::initTable() { - cloud->initSecurity(); - - // Create the first insertion into the block chain which is the table status - Slot *s = new Slot(this, 1, localMachineId, localSequenceNumber); - localSequenceNumber++; - TableStatus *status = new TableStatus(s, numberOfSlots); - s->addShallowEntry(status); - Array *array = cloud->putSlot(s, numberOfSlots); - - if (array == NULL) { - array = new Array(1); - array->set(0, s); - // update local block chain - validateAndUpdate(array, true); - delete array; - } else if (array->length() == 1) { - // in case we did push the slot BUT we failed to init it - validateAndUpdate(array, true); - delete s; - delete array; - } else { - delete s; - delete array; - throw new Error("Error on initialization"); - } -} - -/** - * Rebuild the table from scratch by pulling the latest block chain - * from the server. - */ -void Table::rebuild() { - // Just pull the latest slots from the server - Array *newslots = cloud->getSlots(sequenceNumber + 1); - validateAndUpdate(newslots, true); - delete newslots; - sendToServer(NULL); - updateLiveTransactionsAndStatus(); -} - -void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { - localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); -} - -int64_t Table::getArbitrator(IoTString *key) { - return arbitratorTable->get(key); -} - -void Table::close() { - cloud->closeCloud(); -} - -IoTString *Table::getCommitted(IoTString *key) { - KeyValue *kv = committedKeyValueTable->get(key); - - if (kv != NULL) { - return new IoTString(kv->getValue()); - } else { - return NULL; - } -} - -IoTString *Table::getSpeculative(IoTString *key) { - KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); - - if (kv == NULL) { - kv = speculatedKeyValueTable->get(key); - } - - if (kv == NULL) { - kv = committedKeyValueTable->get(key); - } - - if (kv != NULL) { - return new IoTString(kv->getValue()); - } else { - return NULL; - } -} - -IoTString *Table::getCommittedAtomic(IoTString *key) { - KeyValue *kv = committedKeyValueTable->get(key); - - if (!arbitratorTable->contains(key)) { - throw new Error("Key not Found."); - } - - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } - - if (kv != NULL) { - pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return new IoTString(kv->getValue()); - } else { - pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); - return NULL; - } -} - -IoTString *Table::getSpeculativeAtomic(IoTString *key) { - if (!arbitratorTable->contains(key)) { - throw new Error("Key not Found."); - } - - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } - - KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); - - if (kv == NULL) { - kv = speculatedKeyValueTable->get(key); - } - - if (kv == NULL) { - kv = committedKeyValueTable->get(key); - } - - if (kv != NULL) { - pendingTransactionBuilder->addKVGuard(new KeyValue(key, kv->getValue())); - return new IoTString(kv->getValue()); - } else { - pendingTransactionBuilder->addKVGuard(new KeyValue(key, NULL)); - return NULL; - } -} - -bool Table::update() { - try { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); - validateAndUpdate(newSlots, false); - delete newSlots; - sendToServer(NULL); - updateLiveTransactionsAndStatus(); - return true; - } catch (Exception *e) { - SetIterator *> *kit = getKeyIterator(localCommunicationTable); - while (kit->hasNext()) { - int64_t m = kit->next(); - updateFromLocal(m); - } - delete kit; - } - - return false; -} - -bool Table::createNewKey(IoTString *keyName, int64_t machineId) { - while (true) { - if (arbitratorTable->contains(keyName)) { - // There is already an arbitrator - return false; - } - NewKey *newKey = new NewKey(NULL, keyName, machineId); - - if (sendToServer(newKey)) { - // If successfully inserted - return true; - } - } -} - -void Table::startTransaction() { - // Create a new transaction, invalidates any old pending transactions. - if (pendingTransactionBuilder != NULL) - delete pendingTransactionBuilder; - pendingTransactionBuilder = new PendingTransaction(localMachineId); -} - -void Table::put(IoTString *key, IoTString *value) { - // Make sure it is a valid key - if (!arbitratorTable->contains(key)) { - throw new Error("Key not Found."); - } - - // Make sure new key value pair matches the current arbitrator - if (!pendingTransactionBuilder->checkArbitrator(arbitratorTable->get(key))) { - // TODO: Maybe not throw en error - throw new Error("Not all Key Values Match Arbitrator."); - } - - // Add the key value to this transaction - KeyValue *kv = new KeyValue(new IoTString(key), new IoTString(value)); - pendingTransactionBuilder->addKV(kv); -} - -TransactionStatus *Table::commitTransaction() { - if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { - // transaction with no updates will have no effect on the system - return new TransactionStatus(TransactionStatus_StatusNoEffect, -1); - } - - // Set the local transaction sequence number and increment - pendingTransactionBuilder->setClientLocalSequenceNumber(localTransactionSequenceNumber); - localTransactionSequenceNumber++; - - // Create the transaction status - TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator()); - - // Create the new transaction - Transaction *newTransaction = pendingTransactionBuilder->createTransaction(); - newTransaction->setTransactionStatus(transactionStatus); - - if (pendingTransactionBuilder->getArbitrator() != localMachineId) { - // Add it to the queue and invalidate the builder for safety - pendingTransactionQueue->add(newTransaction); - } else { - arbitrateOnLocalTransaction(newTransaction); - delete newTransaction; - updateLiveStateFromLocal(); - } - if (pendingTransactionBuilder != NULL) - delete pendingTransactionBuilder; - - pendingTransactionBuilder = new PendingTransaction(localMachineId); - - try { - sendToServer(NULL); - } catch (ServerException *e) { - - Hashset *arbitratorTriedAndFailed = new Hashset(); - uint size = pendingTransactionQueue->size(); - uint oldindex = 0; - for (uint iter = 0; iter < size; iter++) { - Transaction *transaction = pendingTransactionQueue->get(iter); - pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); - - if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { - // Already contacted this client so ignore all attempts to contact this client - // to preserve ordering for arbitrator - continue; - } - - Pair sendReturn = sendTransactionToLocal(transaction); - - if (sendReturn.getFirst()) { - // Failed to contact over local - arbitratorTriedAndFailed->add(transaction->getArbitrator()); - } else { - // Successful contact or should not contact - - if (sendReturn.getSecond()) { - // did arbitrate - delete transaction; - oldindex--; - } - } - } - pendingTransactionQueue->setSize(oldindex); - } - - updateLiveStateFromLocal(); - - return transactionStatus; -} - -/** - * Recalculate the new resize threshold - */ -void Table::setResizeThreshold() { - int resizeLower = (int) (Table_RESIZE_THRESHOLD * numberOfSlots); - bufferResizeThreshold = resizeLower - 1 + random->nextInt(numberOfSlots - resizeLower); -} - -int64_t Table::getLocalSequenceNumber() { - return localSequenceNumber; -} - -void Table::processTransactionList(bool handlePartial) { - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Update which transactions parts still need to be sent - transaction->removeSentParts(lastTransactionPartsSent->get(transaction)); - // Add the transaction status to the outstanding list - outstandingTransactionStatus->put(transaction->getSequenceNumber(), transaction->getTransactionStatus()); - - // Update the transaction status - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentPartial); - - // Check if all the transaction parts were successfully - // sent and if so then remove it from pending - if (transaction->didSendAllParts()) { - transaction->getTransactionStatus()->setStatus(TransactionStatus_StatusSentFully); - pendingTransactionQueue->remove(transaction); - delete transaction; - } else if (handlePartial) { - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - } - delete trit; -} - -NewKey * Table::handlePartialSend(NewKey * newKey) { - //Didn't receive acknowledgement for last send - //See if the server has received a newer slot - - Array *newSlots = cloud->getSlots(sequenceNumber + 1); - if (newSlots->length() == 0) { - //Retry sending old slot - bool wasInserted = false; - bool sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey, &wasInserted, &newSlots); - - if (sendSlotsReturn) { - lastSlotAttemptedToSend = NULL; - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - delete newKey; - newKey = NULL; - } - } - processTransactionList(false); - } else { - if (checkSend(newSlots, lastSlotAttemptedToSend)) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - delete newKey; - newKey = NULL; - } - } - processTransactionList(true); - } - } - - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - - if (newSlots->length() != 0) { - // insert into the local block chain - validateAndUpdate(newSlots, true); - } - } else { - if (checkSend(newSlots, lastSlotAttemptedToSend)) { - if (newKey != NULL) { - if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { - delete newKey; - newKey = NULL; - } - } - - processTransactionList(true); - } else { - SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetServerFailure(); - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - } - - // insert into the local block chain - validateAndUpdate(newSlots, true); - } - delete newSlots; - return newKey; -} - -void Table::clearSentParts() { - // Clear the sent data since we are trying again - pendingSendArbitrationEntriesToDelete->clear(); - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - delete trit->currVal(); - } - delete trit; - transactionPartsSent->clear(); -} - -bool Table::sendToServer(NewKey *newKey) { - if (hadPartialSendToServer) { - newKey = handlePartialSend(newKey); - } - - try { - // While we have stuff that needs inserting into the block chain - while ((pendingTransactionQueue->size() > 0) || (pendingSendArbitrationRounds->size() > 0) || (newKey != NULL)) { - if (hadPartialSendToServer) { - throw new Error("Should Be error free"); - } - - // If there is a new key with same name then end - if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { - delete newKey; - return false; - } - - // Create the slot - Slot *slot = new Slot(this, sequenceNumber + 1, localMachineId, new Array(buffer->getSlot(sequenceNumber)->getHMAC()), localSequenceNumber); - localSequenceNumber++; - - // Try to fill the slot with data - int newSize = 0; - bool insertedNewKey = false; - bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey); - - if (needsResize) { - // Reset which transaction to send - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetNextPartToSend(); - - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - - // Clear the sent data since we are trying again - clearSentParts(); - - // We needed a resize so try again - fillSlot(slot, true, newKey, newSize, insertedNewKey); - } - if (lastSlotAttemptedToSend != NULL) - delete lastSlotAttemptedToSend; - - lastSlotAttemptedToSend = slot; - lastIsNewKey = (newKey != NULL); - lastInsertedNewKey = insertedNewKey; - lastNewSize = newSize; - if (( newKey != lastNewKey) && (lastNewKey != NULL)) - delete lastNewKey; - lastNewKey = newKey; - if (lastTransactionPartsSent != NULL) - delete lastTransactionPartsSent; - lastTransactionPartsSent = transactionPartsSent->clone(); - - Array * newSlots = NULL; - bool wasInserted = false; - bool sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL, &wasInserted, &newSlots); - - if (sendSlotsReturn) { - lastSlotAttemptedToSend = NULL; - // Did insert into the block chain - if (insertedNewKey) { - // This slot was what was inserted not a previous slot - // New Key was successfully inserted into the block chain so dont want to insert it again - newKey = NULL; - } - - // Remove the aborts and commit parts that were sent from the pending to send queue - uint size = pendingSendArbitrationRounds->size(); - uint oldcount = 0; - for (uint i = 0; i < size; i++) { - ArbitrationRound *round = pendingSendArbitrationRounds->get(i); - round->removeParts(pendingSendArbitrationEntriesToDelete); - - if (!round->isDoneSending()) { - //Add part back in - pendingSendArbitrationRounds->set(oldcount++, - pendingSendArbitrationRounds->get(i)); - } else - delete pendingSendArbitrationRounds->get(i); - } - pendingSendArbitrationRounds->setSize(oldcount); - processTransactionList(false); - } else { - // Reset which transaction to send - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetNextPartToSend(); - - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - } - - // Clear the sent data in preparation for next send - clearSentParts(); - - if (newSlots->length() != 0) { - // insert into the local block chain - validateAndUpdate(newSlots, true); - } - delete newSlots; - } - } catch (ServerException *e) { - if (e->getType() != ServerException_TypeInputTimeout) { - // Nothing was able to be sent to the server so just clear these data structures - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetNextPartToSend(); - - // Set the transaction sequence number back to nothing - if (!transaction->didSendAPartToServer() && !transaction->getServerFailure()) { - transaction->setSequenceNumber(-1); - } - } - delete trit; - } else { - // There was a partial send to the server - hadPartialSendToServer = true; - - // Nothing was able to be sent to the server so just clear these data structures - SetIterator *> *trit = getKeyIterator(transactionPartsSent); - while (trit->hasNext()) { - Transaction *transaction = trit->next(); - transaction->resetNextPartToSend(); - transaction->setServerFailure(); - } - delete trit; - } - - clearSentParts(); - - throw e; - } - - return newKey == NULL; -} - -bool Table::updateFromLocal(int64_t machineId) { - if (!localCommunicationTable->contains(machineId)) - return false; - - Pair *localCommunicationInformation = localCommunicationTable->get(machineId); - - // Get the size of the send data - int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - - int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); - } - - Array *sendData = new Array(sendDataSize); - ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); - - // Encode the data - bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode->putInt(0); - - // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); - localSequenceNumber++; - - if (returnData == NULL) { - // Could not contact server - return false; - } - - // Decode the data - ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); - int numberOfEntries = bbDecode->getInt(); - - for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode->get(); - if (type == TypeAbort) { - Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); - processEntry(abort); - } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); - processEntry(commitPart); - } - } - - updateLiveStateFromLocal(); - - return true; -} - -Pair Table::sendTransactionToLocal(Transaction *transaction) { - - // Get the devices local communications - if (!localCommunicationTable->contains(transaction->getArbitrator())) - return Pair(true, false); - - Pair *localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); - - // Get the size of the send data - int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - { - Vector *tParts = transaction->getParts(); - uint tPartsSize = tParts->size(); - for (uint i = 0; i < tPartsSize; i++) { - TransactionPart *part = tParts->get(i); - sendDataSize += part->getSize(); - } - } - - int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { - lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); - } - - // Make the send data size - Array *sendData = new Array(sendDataSize); - ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); - - // Encode the data - bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); - bbEncode->putInt(transaction->getParts()->size()); - { - Vector *tParts = transaction->getParts(); - uint tPartsSize = tParts->size(); - for (uint i = 0; i < tPartsSize; i++) { - TransactionPart *part = tParts->get(i); - part->encode(bbEncode); - } - } - - // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); - localSequenceNumber++; - - if (returnData == NULL) { - // Could not contact server - return Pair(true, false); - } - - // Decode the data - ByteBuffer *bbDecode = ByteBuffer_wrap(returnData); - bool didCommit = bbDecode->get() == 1; - bool couldArbitrate = bbDecode->get() == 1; - int numberOfEntries = bbDecode->getInt(); - bool foundAbort = false; - - for (int i = 0; i < numberOfEntries; i++) { - char type = bbDecode->get(); - if (type == TypeAbort) { - Abort *abort = (Abort *)Abort_decode(NULL, bbDecode); - - if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { - foundAbort = true; - } - - processEntry(abort); - } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart *)CommitPart_decode(NULL, bbDecode); - processEntry(commitPart); - } - } - - updateLiveStateFromLocal(); - - if (couldArbitrate) { - TransactionStatus *status = transaction->getTransactionStatus(); - if (didCommit) { - status->setStatus(TransactionStatus_StatusCommitted); - } else { - status->setStatus(TransactionStatus_StatusAborted); - } - } else { - TransactionStatus *status = transaction->getTransactionStatus(); - if (foundAbort) { - status->setStatus(TransactionStatus_StatusAborted); - } else { - status->setStatus(TransactionStatus_StatusCommitted); - } - } - - return Pair(false, true); -} - -Array *Table::acceptDataFromLocal(Array *data) { - // Decode the data - ByteBuffer *bbDecode = ByteBuffer_wrap(data); - int64_t lastArbitratedSequenceNumberSeen = bbDecode->getLong(); - int numberOfParts = bbDecode->getInt(); - - // If we did commit a transaction or not - bool didCommit = false; - bool couldArbitrate = false; - - if (numberOfParts != 0) { - - // decode the transaction - Transaction *transaction = new Transaction(); - for (int i = 0; i < numberOfParts; i++) { - bbDecode->get(); - TransactionPart *newPart = (TransactionPart *)TransactionPart_decode(NULL, bbDecode); - transaction->addPartDecode(newPart); - } - - // Arbitrate on transaction and pull relevant return data - Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn.getFirst(); - didCommit = localArbitrateReturn.getSecond(); - - updateLiveStateFromLocal(); - - // Transaction was sent to the server so keep track of it to prevent double commit - if (transaction->getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); - } - } - - // The data to send back - int returnDataSize = 0; - Vector *unseenArbitrations = new Vector(); - - // Get the aborts to send back - Vector *abortLocalSequenceNumbers = new Vector(); - { - SetIterator *abortit = getKeyIterator(liveAbortsGeneratedByLocal); - while (abortit->hasNext()) - abortLocalSequenceNumbers->add(abortit->next()); - delete abortit; - } - - qsort(abortLocalSequenceNumbers->expose(), abortLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); - - uint asize = abortLocalSequenceNumbers->size(); - for (uint i = 0; i < asize; i++) { - int64_t localSequenceNumber = abortLocalSequenceNumbers->get(i); - if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { - continue; - } - - Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); - unseenArbitrations->add(abort); - returnDataSize += abort->getSize(); - } - - // Get the commits to send back - Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); - if (commitForClientTable != NULL) { - Vector *commitLocalSequenceNumbers = new Vector(); - { - SetIterator *commitit = getKeyIterator(commitForClientTable); - while (commitit->hasNext()) - commitLocalSequenceNumbers->add(commitit->next()); - delete commitit; - } - qsort(commitLocalSequenceNumbers->expose(), commitLocalSequenceNumbers->size(), sizeof(int64_t), compareInt64); - - uint clsSize = commitLocalSequenceNumbers->size(); - for (uint clsi = 0; clsi < clsSize; clsi++) { - int64_t localSequenceNumber = commitLocalSequenceNumbers->get(clsi); - Commit *commit = commitForClientTable->get(localSequenceNumber); - - if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { - continue; - } - - { - Vector *parts = commit->getParts(); - uint nParts = parts->size(); - for (uint i = 0; i < nParts; i++) { - CommitPart *commitPart = parts->get(i); - unseenArbitrations->add(commitPart); - returnDataSize += commitPart->getSize(); - } - } - } - } - - // Number of arbitration entries to decode - returnDataSize += 2 * sizeof(int32_t); - - // bool of did commit or not - if (numberOfParts != 0) { - returnDataSize += sizeof(char); - } - - // Data to send Back - Array *returnData = new Array(returnDataSize); - ByteBuffer *bbEncode = ByteBuffer_wrap(returnData); - - if (numberOfParts != 0) { - if (didCommit) { - bbEncode->put((char)1); - } else { - bbEncode->put((char)0); - } - if (couldArbitrate) { - bbEncode->put((char)1); - } else { - bbEncode->put((char)0); - } - } - - bbEncode->putInt(unseenArbitrations->size()); - uint size = unseenArbitrations->size(); - for (uint i = 0; i < size; i++) { - Entry *entry = unseenArbitrations->get(i); - entry->encode(bbEncode); - } - - localSequenceNumber++; - return returnData; -} - -/** Checks whether a given slot was sent using new slots in - array. Returns true if sent and false otherwise. */ - -bool Table::checkSend(Array * array, Slot *checkSlot) { - uint size = array->length(); - for (uint i = 0; i < size; i++) { - Slot *s = array->get(i); - if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { - return true; - } - } - - //Also need to see if other machines acknowledged our message - for (uint i = 0; i < size; i++) { - Slot *s = array->get(i); - - // Process each entry in the slot - Vector *entries = s->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - - if (entry->getType() == TypeLastMessage) { - LastMessage *lastMessage = (LastMessage *)entry; - - if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) { - return true; - } - } - } - } - //Not found - return false; -} - -/** Method tries to send slot to server. Returns status in tuple. - isInserted returns whether last un-acked send (if any) was - successful. Returns whether send was confirmed.x - */ - -bool Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool *isInserted, Array **array) { - attemptedToSendToServer = true; - - *array = cloud->putSlot(slot, newSize); - if (*array == NULL) { - *array = new Array(1); - (*array)->set(0, slot); - rejectedSlotVector->clear(); - *isInserted = false; - return true; - } else { - if ((*array)->length() == 0) { - throw new Error("Server Error: Did not send any slots"); - } - - if (hadPartialSendToServer) { - *isInserted = checkSend(*array, slot); - - if (!(*isInserted)) { - rejectedSlotVector->add(slot->getSequenceNumber()); - } - - return false; - } else { - rejectedSlotVector->add(slot->getSequenceNumber()); - *isInserted = false; - return false; - } - } -} - -/** - * Returns true if a resize was needed but not done. - */ -bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) { - newSize = 0;//special value to indicate no resize - if (liveSlotCount > bufferResizeThreshold) { - resize = true;//Resize is forced - } - - if (resize) { - newSize = (int) (numberOfSlots * Table_RESIZE_MULTIPLE); - TableStatus *status = new TableStatus(slot, newSize); - slot->addShallowEntry(status); - } - - // Fill with rejected slots first before doing anything else - doRejectedMessages(slot); - - // Do mandatory rescue of entries - ThreeTuple mandatoryRescueReturn = doMandatoryRescue(slot, resize); - - // Extract working variables - bool needsResize = mandatoryRescueReturn.getFirst(); - bool seenLiveSlot = mandatoryRescueReturn.getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); - - if (needsResize && !resize) { - // We need to resize but we are not resizing so return true to force on retry - return true; - } - - insertedKey = false; - if (newKeyEntry != NULL) { - newKeyEntry->setSlot(slot); - if (slot->hasSpace(newKeyEntry)) { - slot->addEntry(newKeyEntry); - insertedKey = true; - } - } - - // Clear the transactions, aborts and commits that were sent previously - clearSentParts(); - uint size = pendingSendArbitrationRounds->size(); - for (uint i = 0; i < size; i++) { - ArbitrationRound *round = pendingSendArbitrationRounds->get(i); - bool isFull = false; - round->generateParts(); - Vector *parts = round->getParts(); - - // Insert pending arbitration data - uint vsize = parts->size(); - for (uint vi = 0; vi < vsize; vi++) { - Entry *arbitrationData = parts->get(vi); - - // If it is an abort then we need to set some information - if (arbitrationData->getType() == TypeAbort) { - ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); - } - - if (!slot->hasSpace(arbitrationData)) { - // No space so cant do anything else with these data entries - isFull = true; - break; - } - - // Add to this current slot and add it to entries to delete - slot->addEntry(arbitrationData); - pendingSendArbitrationEntriesToDelete->add(arbitrationData); - } - - if (isFull) { - break; - } - } - - if (pendingTransactionQueue->size() > 0) { - Transaction *transaction = pendingTransactionQueue->get(0); - // Set the transaction sequence number if it has yet to be inserted into the block chain - if ((!transaction->didSendAPartToServer()) || (transaction->getSequenceNumber() == -1)) { - transaction->setSequenceNumber(slot->getSequenceNumber()); - } - - while (true) { - TransactionPart *part = transaction->getNextPartToSend(); - if (part == NULL) { - // Ran out of parts to send for this transaction so move on - break; - } - - if (slot->hasSpace(part)) { - slot->addEntry(part); - Vector *partsSent = transactionPartsSent->get(transaction); - if (partsSent == NULL) { - partsSent = new Vector(); - transactionPartsSent->put(transaction, partsSent); - } - partsSent->add(part->getPartNumber()); - transactionPartsSent->put(transaction, partsSent); - } else { - break; - } - } - } - - // Fill the remainder of the slot with rescue data - doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize); - - return false; -} - -void Table::doRejectedMessages(Slot *s) { - if (!rejectedSlotVector->isEmpty()) { - /* TODO: We should avoid generating a rejected message entry if - * there is already a sufficient entry in the queue (e->g->, - * equalsto value of true and same sequence number)-> */ - - int64_t old_seqn = rejectedSlotVector->get(0); - if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { - int64_t new_seqn = rejectedSlotVector->lastElement(); - RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); - s->addShallowEntry(rm); - } else { - int64_t prev_seqn = -1; - uint i = 0; - /* Go through list of missing messages */ - for (; i < rejectedSlotVector->size(); i++) { - int64_t curr_seqn = rejectedSlotVector->get(i); - Slot *s_msg = buffer->getSlot(curr_seqn); - if (s_msg != NULL) - break; - prev_seqn = curr_seqn; - } - /* Generate rejected message entry for missing messages */ - if (prev_seqn != -1) { - RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); - s->addShallowEntry(rm); - } - /* Generate rejected message entries for present messages */ - for (; i < rejectedSlotVector->size(); i++) { - int64_t curr_seqn = rejectedSlotVector->get(i); - Slot *s_msg = buffer->getSlot(curr_seqn); - int64_t machineid = s_msg->getMachineID(); - RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); - s->addShallowEntry(rm); - } - } - } -} - -ThreeTuple Table::doMandatoryRescue(Slot *slot, bool resize) { - int64_t newestSequenceNumber = buffer->getNewestSeqNum(); - int64_t oldestSequenceNumber = buffer->getOldestSeqNum(); - if (oldestLiveSlotSequenceNumver < oldestSequenceNumber) { - oldestLiveSlotSequenceNumver = oldestSequenceNumber; - } - - int64_t currentSequenceNumber = oldestLiveSlotSequenceNumver; - bool seenLiveSlot = false; - int64_t firstIfFull = newestSequenceNumber + 1 - numberOfSlots; // smallest seq number in the buffer if it is full - int64_t threshold = firstIfFull + Table_FREE_SLOTS; // we want the buffer to be clear of live entries up to this point - - - // Mandatory Rescue - for (; currentSequenceNumber < threshold; currentSequenceNumber++) { - Slot *previousSlot = buffer->getSlot(currentSequenceNumber); - // Push slot number forward - if (!seenLiveSlot) { - oldestLiveSlotSequenceNumver = currentSequenceNumber; - } - - if (!previousSlot->isLive()) { - continue; - } - - // We have seen a live slot - seenLiveSlot = true; - - // Get all the live entries for a slot - Vector *liveEntries = previousSlot->getLiveEntries(resize); - - // Iterate over all the live entries and try to rescue them - uint lESize = liveEntries->size(); - for (uint i = 0; i < lESize; i++) { - Entry *liveEntry = liveEntries->get(i); - if (slot->hasSpace(liveEntry)) { - // Enough space to rescue the entry - slot->addEntry(liveEntry); - } else if (currentSequenceNumber == firstIfFull) { - //if there's no space but the entry is about to fall off the queue - return ThreeTuple(true, seenLiveSlot, currentSequenceNumber); - } - } - } - - // Did not resize - return ThreeTuple(false, seenLiveSlot, currentSequenceNumber); -} - -void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize) { - /* now go through live entries from least to greatest sequence number until - * either all live slots added, or the slot doesn't have enough room - * for SKIP_THRESHOLD consecutive entries*/ - int skipcount = 0; - int64_t newestseqnum = buffer->getNewestSeqNum(); - for (; seqn <= newestseqnum; seqn++) { - Slot *prevslot = buffer->getSlot(seqn); - //Push slot number forward - if (!seenliveslot) - oldestLiveSlotSequenceNumver = seqn; - - if (!prevslot->isLive()) - continue; - seenliveslot = true; - Vector *liveentries = prevslot->getLiveEntries(resize); - uint lESize = liveentries->size(); - for (uint i = 0; i < lESize; i++) { - Entry *liveentry = liveentries->get(i); - if (s->hasSpace(liveentry)) - s->addEntry(liveentry); - else { - skipcount++; - if (skipcount > Table_SKIP_THRESHOLD) { - delete liveentries; - goto donesearch; - } - } - } - delete liveentries; - } -donesearch: - ; -} - -/** - * Checks for malicious activity and updates the local copy of the block chain-> - */ -void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { - // The cloud communication layer has checked slot HMACs already - // before decoding - if (newSlots->length() == 0) { - return; - } - - // Make sure all slots are newer than the last largest slot this - // client has seen - int64_t firstSeqNum = newSlots->get(0)->getSequenceNumber(); - if (firstSeqNum <= sequenceNumber) { - throw new Error("Server Error: Sent older slots!"); - } - - // Create an object that can access both new slots and slots in our - // local chain without committing slots to our local chain - SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); - - // Check that the HMAC chain is not broken - checkHMACChain(indexer, newSlots); - - // Set to keep track of messages from clients - Hashset *machineSet = new Hashset(); - { - SetIterator *> *lmit = getKeyIterator(lastMessageTable); - while (lmit->hasNext()) - machineSet->add(lmit->next()); - delete lmit; - } - - // Process each slots data - { - uint numSlots = newSlots->length(); - for (uint i = 0; i < numSlots; i++) { - Slot *slot = newSlots->get(i); - processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); - updateExpectedSize(); - } - } - delete indexer; - - // If there is a gap, check to see if the server sent us - // everything-> - if (firstSeqNum != (sequenceNumber + 1)) { - - // Check the size of the slots that were sent down by the server-> - // Can only check the size if there was a gap - checkNumSlots(newSlots->length()); - - // Since there was a gap every machine must have pushed a slot or - // must have a last message message-> If not then the server is - // hiding slots - if (!machineSet->isEmpty()) { - delete machineSet; - throw new Error("Missing record for machines: "); - } - } - delete machineSet; - // Update the size of our local block chain-> - commitNewMaxSize(); - - // Commit new to slots to the local block chain-> - { - uint numSlots = newSlots->length(); - for (uint i = 0; i < numSlots; i++) { - Slot *slot = newSlots->get(i); - - // Insert this slot into our local block chain copy-> - buffer->putSlot(slot); - - // Keep track of how many slots are currently live (have live data - // in them)-> - liveSlotCount++; - } - } - // Get the sequence number of the latest slot in the system - sequenceNumber = newSlots->get(newSlots->length() - 1)->getSequenceNumber(); - updateLiveStateFromServer(); - - // No Need to remember after we pulled from the server - offlineTransactionsCommittedAndAtServer->clear(); - - // This is invalidated now - hadPartialSendToServer = false; -} - -void Table::updateLiveStateFromServer() { - // Process the new transaction parts - processNewTransactionParts(); - - // Do arbitration on new transactions that were received - arbitrateFromServer(); - - // Update all the committed keys - bool didCommitOrSpeculate = updateCommittedTable(); - - // Delete the transactions that are now dead - updateLiveTransactionsAndStatus(); - - // Do speculations - didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); - updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); -} - -void Table::updateLiveStateFromLocal() { - // Update all the committed keys - bool didCommitOrSpeculate = updateCommittedTable(); - - // Delete the transactions that are now dead - updateLiveTransactionsAndStatus(); - - // Do speculations - didCommitOrSpeculate |= updateSpeculativeTable(didCommitOrSpeculate); - updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); -} - -void Table::initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots) { - int64_t prevslots = firstSequenceNumber; - - if (didFindTableStatus) { - } else { - expectedsize = (prevslots < ((int64_t) numberOfSlots)) ? (int) prevslots : numberOfSlots; - } - - didFindTableStatus = true; - currMaxSize = numberOfSlots; -} - -void Table::updateExpectedSize() { - expectedsize++; - - if (expectedsize > currMaxSize) { - expectedsize = currMaxSize; - } -} - - -/** - * Check the size of the block chain to make sure there are enough - * slots sent back by the server-> This is only called when we have a - * gap between the slots that we have locally and the slots sent by - * the server therefore in the slots sent by the server there will be - * at least 1 Table status message - */ -void Table::checkNumSlots(int numberOfSlots) { - if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots-> Expected: "); - } -} - -/** - * Update the size of of the local buffer if it is needed-> - */ -void Table::commitNewMaxSize() { - didFindTableStatus = false; - - // Resize the local slot buffer - if (numberOfSlots != currMaxSize) { - buffer->resize((int32_t)currMaxSize); - } - - // Change the number of local slots to the new size - numberOfSlots = (int32_t)currMaxSize; - - // Recalculate the resize threshold since the size of the local - // buffer has changed - setResizeThreshold(); -} - -/** - * Process the new transaction parts from this latest round of slots - * received from the server - */ -void Table::processNewTransactionParts() { - - if (newTransactionParts->size() == 0) { - // Nothing new to process - return; - } - - // Iterate through all the machine Ids that we received new parts - // for - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *tpit = getKeyIterator(newTransactionParts); - while (tpit->hasNext()) { - int64_t machineId = tpit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = tpit->currVal(); - - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *ptit = getKeyIterator(parts); - // Iterate through all the parts for that machine Id - while (ptit->hasNext()) { - Pair *partId = ptit->next(); - TransactionPart *part = parts->get(partId); - - if (lastArbitratedTransactionNumberByArbitratorTable->contains(part->getArbitratorId())) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); - if (lastTransactionNumber >= part->getSequenceNumber()) { - // Set dead the transaction part - part->setDead(); - part->releaseRef(); - continue; - } - } - - // Get the transaction object for that sequence number - Transaction *transaction = liveTransactionBySequenceNumberTable->get(part->getSequenceNumber()); - - if (transaction == NULL) { - // This is a new transaction that we dont have so make a new one - transaction = new Transaction(); - - // Add that part to the transaction - transaction->addPartDecode(part); - - // Insert this new transaction into the live tables - liveTransactionBySequenceNumberTable->put(part->getSequenceNumber(), transaction); - liveTransactionByTransactionIdTable->put(transaction->getId(), transaction); - } - part->releaseRef(); - } - delete ptit; - } - delete tpit; - // Clear all the new transaction parts in preparation for the next - // time the server sends slots - { - SetIterator *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newTransactionParts); - while (partsit->hasNext()) { - int64_t machineId = partsit->next(); - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newTransactionParts->get(machineId); - delete parts; - } - delete partsit; - newTransactionParts->clear(); - } -} - -void Table::arbitrateFromServer() { - if (liveTransactionBySequenceNumberTable->size() == 0) { - // Nothing to arbitrate on so move on - return; - } - - // Get the transaction sequence numbers and sort from oldest to newest - Vector *transactionSequenceNumbers = new Vector(); - { - SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); - while (trit->hasNext()) - transactionSequenceNumbers->add(trit->next()); - delete trit; - } - qsort(transactionSequenceNumbers->expose(), transactionSequenceNumbers->size(), sizeof(int64_t), compareInt64); - - // Collection of key value pairs that are - Hashtable *speculativeTableTmp = new Hashtable(); - - // The last transaction arbitrated on - int64_t lastTransactionCommitted = -1; - Hashset *generatedAborts = new Hashset(); - uint tsnSize = transactionSequenceNumbers->size(); - for (uint i = 0; i < tsnSize; i++) { - int64_t transactionSequenceNumber = transactionSequenceNumbers->get(i); - Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - // Check if this machine arbitrates for this transaction if not - // then we cant arbitrate this transaction - if (transaction->getArbitrator() != localMachineId) { - continue; - } - - if (transactionSequenceNumber < lastSeqNumArbOn) { - continue; - } - - if (offlineTransactionsCommittedAndAtServer->contains(transaction->getId())) { - // We have seen this already locally so dont commit again - continue; - } - - if (!transaction->isComplete()) { - // Will arbitrate in incorrect order if we continue so just break - // Most likely this - break; - } - - // update the largest transaction seen by arbitrator from server - if (!lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { - lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); - } else { - int64_t lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()); - if (transaction->getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { - lastTransactionSeenFromMachineFromServer->put(transaction->getMachineId(), transaction->getClientLocalSequenceNumber()); - } - } - - if (transaction->evaluateGuard(committedKeyValueTable, speculativeTableTmp, NULL)) { - // Guard evaluated as true - // Update the local changes so we can make the commit - SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - speculativeTableTmp->put(kv->getKey(), kv); - } - delete kvit; - - // Update what the last transaction committed was for use in batch commit - lastTransactionCommitted = transactionSequenceNumber; - } else { - // Guard evaluated was false so create abort - // Create the abort - Abort *newAbort = new Abort(NULL, - transaction->getClientLocalSequenceNumber(), - transaction->getSequenceNumber(), - transaction->getMachineId(), - transaction->getArbitrator(), - localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; - generatedAborts->add(newAbort); - - // Insert the abort so we can process - processEntry(newAbort); - } - - lastSeqNumArbOn = transactionSequenceNumber; - } - - delete transactionSequenceNumbers; - - Commit *newCommit = NULL; - - // If there is something to commit - if (speculativeTableTmp->size() != 0) { - // Create the commit and increment the commit sequence number - newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); - localArbitrationSequenceNumber++; - - // Add all the new keys to the commit - SetIterator *spit = getKeyIterator(speculativeTableTmp); - while (spit->hasNext()) { - IoTString *string = spit->next(); - KeyValue *kv = speculativeTableTmp->get(string); - newCommit->addKV(kv); - } - delete spit; - - // create the commit parts - newCommit->createCommitParts(); - - // Append all the commit parts to the end of the pending queue - // waiting for sending to the server - // Insert the commit so we can process it - Vector *parts = newCommit->getParts(); - uint partsSize = parts->size(); - for (uint i = 0; i < partsSize; i++) { - CommitPart *commitPart = parts->get(i); - processEntry(commitPart); - } - } - delete speculativeTableTmp; - - if ((newCommit != NULL) || (generatedAborts->size() > 0)) { - ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); - pendingSendArbitrationRounds->add(arbitrationRound); - - if (compactArbitrationData()) { - ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - if (newArbitrationRound->getCommit() != NULL) { - Vector *parts = newArbitrationRound->getCommit()->getParts(); - uint partsSize = parts->size(); - for (uint i = 0; i < partsSize; i++) { - CommitPart *commitPart = parts->get(i); - processEntry(commitPart); - } - } - } - } else { - delete generatedAborts; - } -} - -Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { - - // Check if this machine arbitrates for this transaction if not then - // we cant arbitrate this transaction - if (transaction->getArbitrator() != localMachineId) { - return Pair(false, false); - } - - if (!transaction->isComplete()) { - // Will arbitrate in incorrect order if we continue so just break - // Most likely this - return Pair(false, false); - } - - if (transaction->getMachineId() != localMachineId) { - // dont do this check for local transactions - if (lastTransactionSeenFromMachineFromServer->contains(transaction->getMachineId())) { - if (lastTransactionSeenFromMachineFromServer->get(transaction->getMachineId()) > transaction->getClientLocalSequenceNumber()) { - // We've have already seen this from the server - return Pair(false, false); - } - } - } - - if (transaction->evaluateGuard(committedKeyValueTable, NULL, NULL)) { - // Guard evaluated as true Create the commit and increment the - // commit sequence number - Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); - localArbitrationSequenceNumber++; - - // Update the local changes so we can make the commit - SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - newCommit->addKV(kv); - } - delete kvit; - - // create the commit parts - newCommit->createCommitParts(); - - // Append all the commit parts to the end of the pending queue - // waiting for sending to the server - ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); - pendingSendArbitrationRounds->add(arbitrationRound); - - if (compactArbitrationData()) { - ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - Vector *parts = newArbitrationRound->getCommit()->getParts(); - uint partsSize = parts->size(); - for (uint i = 0; i < partsSize; i++) { - CommitPart *commitPart = parts->get(i); - processEntry(commitPart); - } - } else { - // Insert the commit so we can process it - Vector *parts = newCommit->getParts(); - uint partsSize = parts->size(); - for (uint i = 0; i < partsSize; i++) { - CommitPart *commitPart = parts->get(i); - processEntry(commitPart); - } - } - - if (transaction->getMachineId() == localMachineId) { - TransactionStatus *status = transaction->getTransactionStatus(); - if (status != NULL) { - status->setStatus(TransactionStatus_StatusCommitted); - } - } - - updateLiveStateFromLocal(); - return Pair(true, true); - } else { - if (transaction->getMachineId() == localMachineId) { - // For locally created messages update the status - // Guard evaluated was false so create abort - TransactionStatus *status = transaction->getTransactionStatus(); - if (status != NULL) { - status->setStatus(TransactionStatus_StatusAborted); - } - } else { - Hashset *addAbortSet = new Hashset(); - - // Create the abort - Abort *newAbort = new Abort(NULL, - transaction->getClientLocalSequenceNumber(), - -1, - transaction->getMachineId(), - transaction->getArbitrator(), - localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; - addAbortSet->add(newAbort); - - // Append all the commit parts to the end of the pending queue - // waiting for sending to the server - ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); - pendingSendArbitrationRounds->add(arbitrationRound); - - if (compactArbitrationData()) { - ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - - Vector *parts = newArbitrationRound->getCommit()->getParts(); - uint partsSize = parts->size(); - for (uint i = 0; i < partsSize; i++) { - CommitPart *commitPart = parts->get(i); - processEntry(commitPart); - } - } - } - - updateLiveStateFromLocal(); - return Pair(true, false); - } -} - -/** - * Compacts the arbitration data by merging commits and aggregating - * aborts so that a single large push of commits can be done instead - * of many small updates - */ -bool Table::compactArbitrationData() { - if (pendingSendArbitrationRounds->size() < 2) { - // Nothing to compact so do nothing - return false; - } - - ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - if (lastRound->getDidSendPart()) { - return false; - } - - bool hadCommit = (lastRound->getCommit() == NULL); - bool gotNewCommit = false; - - uint numberToDelete = 1; - - while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound *round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); - - if (round->isFull() || round->getDidSendPart()) { - // Stop since there is a part that cannot be compacted and we - // need to compact in order - break; - } - - if (round->getCommit() == NULL) { - // Try compacting aborts only - int newSize = round->getCurrentSize() + lastRound->getAbortsCount(); - if (newSize > ArbitrationRound_MAX_PARTS) { - // Cant compact since it would be too large - break; - } - lastRound->addAborts(round->getAborts()); - } else { - // Create a new larger commit - Commit *newCommit = Commit_merge(lastRound->getCommit(), round->getCommit(), localArbitrationSequenceNumber); - localArbitrationSequenceNumber++; - - // Create the commit parts so that we can count them - newCommit->createCommitParts(); - - // Calculate the new size of the parts - int newSize = newCommit->getNumberOfParts(); - newSize += lastRound->getAbortsCount(); - newSize += round->getAbortsCount(); - - if (newSize > ArbitrationRound_MAX_PARTS) { - // Can't compact since it would be too large - if (lastRound->getCommit() != newCommit && - round->getCommit() != newCommit) - delete newCommit; - break; - } - // Set the new compacted part - if (lastRound->getCommit() == newCommit) - lastRound->setCommit(NULL); - if (round->getCommit() == newCommit) - round->setCommit(NULL); - - if (lastRound->getCommit() != NULL) { - Commit * oldcommit = lastRound->getCommit(); - lastRound->setCommit(NULL); - delete oldcommit; - } - lastRound->setCommit(newCommit); - lastRound->addAborts(round->getAborts()); - gotNewCommit = true; - } - - numberToDelete++; - } - - if (numberToDelete != 1) { - // If there is a compaction - // Delete the previous pieces that are now in the new compacted piece - for (uint i = 2; i <= numberToDelete; i++) { - delete pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size()-i); - } - pendingSendArbitrationRounds->setSize(pendingSendArbitrationRounds->size() - numberToDelete); - - pendingSendArbitrationRounds->add(lastRound); - - // Should reinsert into the commit processor - if (hadCommit && gotNewCommit) { - return true; - } - } - - return false; -} - -/** - * Update all the commits and the committed tables, sets dead the dead - * transactions - */ -bool Table::updateCommittedTable() { - if (newCommitParts->size() == 0) { - // Nothing new to process - return false; - } - - // Iterate through all the machine Ids that we received new parts for - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *partsit = getKeyIterator(newCommitParts); - while (partsit->hasNext()) { - int64_t machineId = partsit->next(); - Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *parts = newCommitParts->get(machineId); - - // Iterate through all the parts for that machine Id - SetIterator *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *pairit = getKeyIterator(parts); - while (pairit->hasNext()) { - Pair *partId = pairit->next(); - CommitPart *part = pairit->currVal(); - - // Get the transaction object for that sequence number - Hashtable *commitForClientTable = liveCommitsTable->get(part->getMachineId()); - - if (commitForClientTable == NULL) { - // This is the first commit from this device - commitForClientTable = new Hashtable(); - liveCommitsTable->put(part->getMachineId(), commitForClientTable); - } - - Commit *commit = commitForClientTable->get(part->getSequenceNumber()); - - if (commit == NULL) { - // This is a new commit that we dont have so make a new one - commit = new Commit(); - - // Insert this new commit into the live tables - commitForClientTable->put(part->getSequenceNumber(), commit); - } - - // Add that part to the commit - commit->addPartDecode(part); - part->releaseRef(); - } - delete pairit; - delete parts; - } - delete partsit; - - // Clear all the new commits parts in preparation for the next time - // the server sends slots - newCommitParts->clear(); - - // If we process a new commit keep track of it for future use - bool didProcessANewCommit = false; - - // Process the commits one by one - SetIterator *> *liveit = getKeyIterator(liveCommitsTable); - while (liveit->hasNext()) { - int64_t arbitratorId = liveit->next(); - // Get all the commits for a specific arbitrator - Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); - - // Sort the commits in order - Vector *commitSequenceNumbers = new Vector(); - { - SetIterator *clientit = getKeyIterator(commitForClientTable); - while (clientit->hasNext()) - commitSequenceNumbers->add(clientit->next()); - delete clientit; - } - - qsort(commitSequenceNumbers->expose(), commitSequenceNumbers->size(), sizeof(int64_t), compareInt64); - - // Get the last commit seen from this arbitrator - int64_t lastCommitSeenSequenceNumber = -1; - if (lastCommitSeenSequenceNumberByArbitratorTable->contains(arbitratorId)) { - lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable->get(arbitratorId); - } - - // Go through each new commit one by one - for (uint i = 0; i < commitSequenceNumbers->size(); i++) { - int64_t commitSequenceNumber = commitSequenceNumbers->get(i); - Commit *commit = commitForClientTable->get(commitSequenceNumber); - // Special processing if a commit is not complete - if (!commit->isComplete()) { - if (i == (commitSequenceNumbers->size() - 1)) { - // If there is an incomplete commit and this commit is the - // latest one seen then this commit cannot be processed and - // there are no other commits - break; - } else { - // This is a commit that was already dead but parts of it - // are still in the block chain (not flushed out yet)-> - // Delete it and move on - commit->setDead(); - commitForClientTable->remove(commit->getSequenceNumber()); - delete commit; - continue; - } - } - - // Update the last transaction that was updated if we can - if (commit->getTransactionSequenceNumber() != -1) { - // Update the last transaction sequence number that the arbitrator arbitrated on1 - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { - lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); - } - } - - // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(commit->getMachineId())) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(commit->getMachineId()); - if (commit->getSequenceNumber() > lastArbitrationSequenceNumber) { - // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); - } - } else { - // Never seen any data from this arbitrator so record the first one - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(commit->getMachineId(), commit->getSequenceNumber()); - } - - // We have already seen this commit before so need to do the - // full processing on this commit - if (commit->getSequenceNumber() <= lastCommitSeenSequenceNumber) { - // Update the last transaction that was updated if we can - if (commit->getTransactionSequenceNumber() != -1) { - int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()); - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(commit->getMachineId()) || - lastArbitratedTransactionNumberByArbitratorTable->get(commit->getMachineId()) < commit->getTransactionSequenceNumber()) { - lastArbitratedTransactionNumberByArbitratorTable->put(commit->getMachineId(), commit->getTransactionSequenceNumber()); - } - } - continue; - } - - // If we got here then this is a brand new commit and needs full - // processing - // Get what commits should be edited, these are the commits that - // have live values for their keys - Hashset *commitsToEdit = new Hashset(); - { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - Commit *commit = liveCommitsByKeyTable->get(kv->getKey()); - if (commit != NULL) - commitsToEdit->add(commit); - } - delete kvit; - } - - // Update each previous commit that needs to be updated - SetIterator *commitit = commitsToEdit->iterator(); - while (commitit->hasNext()) { - Commit *previousCommit = commitit->next(); - - // Only bother with live commits (TODO: Maybe remove this check) - if (previousCommit->isLive()) { - - // Update which keys in the old commits are still live - { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - previousCommit->invalidateKey(kv->getKey()); - } - delete kvit; - } - - // if the commit is now dead then remove it - if (!previousCommit->isLive()) { - commitForClientTable->remove(previousCommit->getSequenceNumber()); - delete previousCommit; - } - } - } - delete commitit; - delete commitsToEdit; - - // Update the last seen sequence number from this arbitrator - if (lastCommitSeenSequenceNumberByArbitratorTable->contains(commit->getMachineId())) { - if (commit->getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable->get(commit->getMachineId())) { - lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); - } - } else { - lastCommitSeenSequenceNumberByArbitratorTable->put(commit->getMachineId(), commit->getSequenceNumber()); - } - - // We processed a new commit that we havent seen before - didProcessANewCommit = true; - - // Update the committed table of keys and which commit is using which key - { - SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - committedKeyValueTable->put(kv->getKey(), kv); - liveCommitsByKeyTable->put(kv->getKey(), commit); - } - delete kvit; - } - } - delete commitSequenceNumbers; - } - delete liveit; - - return didProcessANewCommit; -} - -/** - * Create the speculative table from transactions that are still live - * and have come from the cloud - */ -bool Table::updateSpeculativeTable(bool didProcessNewCommits) { - if (liveTransactionBySequenceNumberTable->size() == 0) { - // There is nothing to speculate on - return false; - } - - // Create a list of the transaction sequence numbers and sort them - // from oldest to newest - Vector *transactionSequenceNumbersSorted = new Vector(); - { - SetIterator *trit = getKeyIterator(liveTransactionBySequenceNumberTable); - while (trit->hasNext()) - transactionSequenceNumbersSorted->add(trit->next()); - delete trit; - } - - qsort(transactionSequenceNumbersSorted->expose(), transactionSequenceNumbersSorted->size(), sizeof(int64_t), compareInt64); - - bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; - - - if (hasGapInTransactionSequenceNumbers || didProcessNewCommits) { - // If there is a gap in the transaction sequence numbers then - // there was a commit or an abort of a transaction OR there was a - // new commit (Could be from offline commit) so a redo the - // speculation from scratch - - // Start from scratch - speculatedKeyValueTable->clear(); - lastTransactionSequenceNumberSpeculatedOn = -1; - oldestTransactionSequenceNumberSpeculatedOn = -1; - } - - // Remember the front of the transaction list - oldestTransactionSequenceNumberSpeculatedOn = transactionSequenceNumbersSorted->get(0); - - // Find where to start arbitration from - uint startIndex = 0; - - for (; startIndex < transactionSequenceNumbersSorted->size(); startIndex++) - if (transactionSequenceNumbersSorted->get(startIndex) == lastTransactionSequenceNumberSpeculatedOn) - break; - startIndex++; - - if (startIndex >= transactionSequenceNumbersSorted->size()) { - // Make sure we are not out of bounds - delete transactionSequenceNumbersSorted; - return false; // did not speculate - } - - Hashset *incompleteTransactionArbitrator = new Hashset(); - bool didSkip = true; - - for (uint i = startIndex; i < transactionSequenceNumbersSorted->size(); i++) { - int64_t transactionSequenceNumber = transactionSequenceNumbersSorted->get(i); - Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); - - if (!transaction->isComplete()) { - // If there is an incomplete transaction then there is nothing - // we can do add this transactions arbitrator to the list of - // arbitrators we should ignore - incompleteTransactionArbitrator->add(transaction->getArbitrator()); - didSkip = true; - continue; - } - - if (incompleteTransactionArbitrator->contains(transaction->getArbitrator())) { - continue; - } - - lastTransactionSequenceNumberSpeculatedOn = transactionSequenceNumber; - - if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { - // Guard evaluated to true so update the speculative table - { - SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - speculatedKeyValueTable->put(kv->getKey(), kv); - } - delete kvit; - } - } - } - - delete transactionSequenceNumbersSorted; - - if (didSkip) { - // Since there was a skip we need to redo the speculation next time around - lastTransactionSequenceNumberSpeculatedOn = -1; - oldestTransactionSequenceNumberSpeculatedOn = -1; - } - - // We did some speculation - return true; -} - -/** - * Create the pending transaction speculative table from transactions - * that are still in the pending transaction buffer - */ -void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate) { - if (pendingTransactionQueue->size() == 0) { - // There is nothing to speculate on - return; - } - - if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) { - // need to reset on the pending speculation - lastPendingTransactionSpeculatedOn = NULL; - firstPendingTransaction = pendingTransactionQueue->get(0); - pendingTransactionSpeculatedKeyValueTable->clear(); - } - - // Find where to start arbitration from - uint startIndex = 0; - - for (; startIndex < pendingTransactionQueue->size(); startIndex++) - if (pendingTransactionQueue->get(startIndex) == firstPendingTransaction) - break; - - if (startIndex >= pendingTransactionQueue->size()) { - // Make sure we are not out of bounds - return; - } - - for (uint i = startIndex; i < pendingTransactionQueue->size(); i++) { - Transaction *transaction = pendingTransactionQueue->get(i); - - lastPendingTransactionSpeculatedOn = transaction; - - if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { - // Guard evaluated to true so update the speculative table - SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); - } - delete kvit; - } - } -} - -/** - * Set dead and remove from the live transaction tables the - * transactions that are dead - */ -void Table::updateLiveTransactionsAndStatus() { - // Go through each of the transactions - { - SetIterator *iter = getKeyIterator(liveTransactionBySequenceNumberTable); - while (iter->hasNext()) { - int64_t key = iter->next(); - Transaction *transaction = liveTransactionBySequenceNumberTable->get(key); - - // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) - && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) { - // Set dead the transaction - transaction->setDead(); - - // Remove the transaction from the live table - iter->remove(); - liveTransactionByTransactionIdTable->remove(transaction->getId()); - delete transaction; - } - } - delete iter; - } - - // Go through each of the transactions - { - SetIterator *iter = getKeyIterator(outstandingTransactionStatus); - while (iter->hasNext()) { - int64_t key = iter->next(); - TransactionStatus *status = outstandingTransactionStatus->get(key); - - // Check if the transaction is dead - if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) - && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) { - // Set committed - status->setStatus(TransactionStatus_StatusCommitted); - - // Remove - iter->remove(); - } - } - delete iter; - } -} - -/** - * Process this slot, entry by entry-> Also update the latest message sent by slot - */ -void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet) { - - // Update the last message seen - updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); - - // Process each entry in the slot - Vector *entries = slot->getEntries(); - uint eSize = entries->size(); - for (uint ei = 0; ei < eSize; ei++) { - Entry *entry = entries->get(ei); - switch (entry->getType()) { - case TypeCommitPart: - processEntry((CommitPart *)entry); - break; - case TypeAbort: - processEntry((Abort *)entry); - break; - case TypeTransactionPart: - processEntry((TransactionPart *)entry); - break; - case TypeNewKey: - processEntry((NewKey *)entry); - break; - case TypeLastMessage: - processEntry((LastMessage *)entry, machineSet); - break; - case TypeRejectedMessage: - processEntry((RejectedMessage *)entry, indexer); - break; - case TypeTableStatus: - processEntry((TableStatus *)entry, slot->getSequenceNumber()); - break; - default: - throw new Error("Unrecognized type: "); - } - } -} - -/** - * Update the last message that was sent for a machine Id - */ -void Table::processEntry(LastMessage *entry, Hashset *machineSet) { - // Update what the last message received by a machine was - updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet); -} - -/** - * Add the new key to the arbitrators table and update the set of live - * new keys (in case of a rescued new key message) - */ -void Table::processEntry(NewKey *entry) { - // Update the arbitrator table with the new key information - arbitratorTable->put(entry->getKey(), entry->getMachineID()); - - // Update what the latest live new key is - NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry); - if (oldNewKey != NULL) { - // Delete the old new key messages - oldNewKey->setDead(); - } -} - -/** - * Process new table status entries and set dead the old ones as new - * ones come in-> keeps track of the largest and smallest table status - * seen in this current round of updating the local copy of the block - * chain - */ -void Table::processEntry(TableStatus *entry, int64_t seq) { - int newNumSlots = entry->getMaxSlots(); - updateCurrMaxSize(newNumSlots); - initExpectedSize(seq, newNumSlots); - - if (liveTableStatus != NULL) { - // We have a larger table status so the old table status is no - // int64_ter alive - liveTableStatus->setDead(); - } - - // Make this new table status the latest alive table status - liveTableStatus = entry; -} - -/** - * Check old messages to see if there is a block chain violation-> - * Also - */ -void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { - int64_t oldSeqNum = entry->getOldSeqNum(); - int64_t newSeqNum = entry->getNewSeqNum(); - bool isequal = entry->getEqual(); - int64_t machineId = entry->getMachineID(); - int64_t seq = entry->getSequenceNumber(); - - // Check if we have messages that were supposed to be rejected in - // our local block chain - for (int64_t seqNum = oldSeqNum; seqNum <= newSeqNum; seqNum++) { - // Get the slot - Slot *slot = indexer->getSlot(seqNum); - - if (slot != NULL) { - // If we have this slot make sure that it was not supposed to be - // a rejected slot - int64_t slotMachineId = slot->getMachineID(); - if (isequal != (slotMachineId == machineId)) { - throw new Error("Server Error: Trying to insert rejected message for slot "); - } - } - } - - // Create a list of clients to watch until they see this rejected - // message entry-> - Hashset *deviceWatchSet = new Hashset(); - SetIterator *> *iter = getKeyIterator(lastMessageTable); - while (iter->hasNext()) { - // Machine ID for the last message entry - int64_t lastMessageEntryMachineId = iter->next(); - - // We've seen it, don't need to continue to watch-> Our next - // message will implicitly acknowledge it-> - if (lastMessageEntryMachineId == localMachineId) { - continue; - } - - Pair *lastMessageValue = lastMessageTable->get(lastMessageEntryMachineId); - int64_t entrySequenceNumber = lastMessageValue->getFirst(); - - if (entrySequenceNumber < seq) { - // Add this rejected message to the set of messages that this - // machine ID did not see yet - addWatchVector(lastMessageEntryMachineId, entry); - // This client did not see this rejected message yet so add it - // to the watch set to monitor - deviceWatchSet->add(lastMessageEntryMachineId); - } - } - delete iter; - - if (deviceWatchSet->isEmpty()) { - // This rejected message has been seen by all the clients so - entry->setDead(); - delete deviceWatchSet; - } else { - // We need to watch this rejected message - entry->setWatchSet(deviceWatchSet); - } -} - -/** - * Check if this abort is live, if not then save it so we can kill it - * later-> update the last transaction number that was arbitrated on-> - */ -void Table::processEntry(Abort *entry) { - if (entry->getTransactionSequenceNumber() != -1) { - // update the transaction status if it was sent to the server - TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); - if (status != NULL) { - status->setStatus(TransactionStatus_StatusAborted); - } - } - - // Abort has not been seen by the client it is for yet so we need to - // keep track of it - - Abort *previouslySeenAbort = liveAbortTable->put(new Pair(entry->getAbortId()), entry); - if (previouslySeenAbort != NULL) { - previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version - } - - if (entry->getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); - } - - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { - // The machine already saw this so it is dead - entry->setDead(); - Pair abortid = entry->getAbortId(); - liveAbortTable->remove(&abortid); - - if (entry->getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); - } - return; - } - - // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { - int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); - if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { - // Is larger - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); - } - } else { - // Never seen any data from this arbitrator so record the first one - lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->put(entry->getTransactionArbitrator(), entry->getSequenceNumber()); - } - - // Set dead a transaction if we can - Pair deadPair = Pair(entry->getTransactionMachineId(), entry->getTransactionClientLocalSequenceNumber()); - - Transaction *transactionToSetDead = liveTransactionByTransactionIdTable->remove(&deadPair); - if (transactionToSetDead != NULL) { - liveTransactionBySequenceNumberTable->remove(transactionToSetDead->getSequenceNumber()); - } - - // Update the last transaction sequence number that the arbitrator - // arbitrated on - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getTransactionArbitrator()) || - (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getTransactionArbitrator()) < entry->getTransactionSequenceNumber())) { - // Is a valid one - if (entry->getTransactionSequenceNumber() != -1) { - lastArbitratedTransactionNumberByArbitratorTable->put(entry->getTransactionArbitrator(), entry->getTransactionSequenceNumber()); - } - } -} - -/** - * Set dead the transaction part if that transaction is dead and keep - * track of all new parts - */ -void Table::processEntry(TransactionPart *entry) { - // Check if we have already seen this transaction and set it dead OR - // if it is not alive - if (lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getArbitratorId()) && (lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()) >= entry->getSequenceNumber())) { - // This transaction is dead, it was already committed or aborted - entry->setDead(); - return; - } - - // This part is still alive - Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); - - if (transactionPart == NULL) { - // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); - newTransactionParts->put(entry->getMachineId(), transactionPart); - } - - // Update the part and set dead ones we have already seen (got a - // rescued version) - entry->acquireRef(); - TransactionPart *previouslySeenPart = transactionPart->put(entry->getPartId(), entry); - if (previouslySeenPart != NULL) { - previouslySeenPart->releaseRef(); - previouslySeenPart->setDead(); - } -} - -/** - * Process new commit entries and save them for future use-> Delete duplicates - */ -void Table::processEntry(CommitPart *entry) { - // Update the last transaction that was updated if we can - if (entry->getTransactionSequenceNumber() != -1) { - if (!lastArbitratedTransactionNumberByArbitratorTable->contains(entry->getMachineId()) || - lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()) < entry->getTransactionSequenceNumber()) { - lastArbitratedTransactionNumberByArbitratorTable->put(entry->getMachineId(), entry->getTransactionSequenceNumber()); - } - } - - Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *commitPart = newCommitParts->get(entry->getMachineId()); - if (commitPart == NULL) { - // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals>(); - newCommitParts->put(entry->getMachineId(), commitPart); - } - // Update the part and set dead ones we have already seen (got a - // rescued version) - entry->acquireRef(); - CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); - if (previouslySeenPart != NULL) { - previouslySeenPart->setDead(); - previouslySeenPart->releaseRef(); - } -} - -/** - * Update the last message seen table-> Update and set dead the - * appropriate RejectedMessages as clients see them-> Updates the live - * aborts, removes those that are dead and sets them dead-> Check that - * the last message seen is correct and that there is no mismatch of - * our own last message or that other clients have not had a rollback - * on the last message-> - */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { - // We have seen this machine ID - machineSet->remove(machineId); - - // Get the set of rejected messages that this machine Id is has not seen yet - Hashset *watchset = rejectedMessageWatchVectorTable->get(machineId); - // If there is a rejected message that this machine Id has not seen yet - if (watchset != NULL) { - // Go through each rejected message that this machine Id has not - // seen yet - - SetIterator *rmit = watchset->iterator(); - while (rmit->hasNext()) { - RejectedMessage *rm = rmit->next(); - // If this machine Id has seen this rejected message->->-> - if (rm->getSequenceNumber() <= seqNum) { - // Remove it from our watchlist - rmit->remove(); - // Decrement machines that need to see this notification - rm->removeWatcher(machineId); - } - } - delete rmit; - } - - // Set dead the abort - SetIterator *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *abortit = getKeyIterator(liveAbortTable); - - while (abortit->hasNext()) { - Pair *key = abortit->next(); - Abort *abort = liveAbortTable->get(key); - if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { - abort->setDead(); - abortit->remove(); - if (abort->getTransactionArbitrator() == localMachineId) { - liveAbortsGeneratedByLocal->remove(abort->getArbitratorLocalSequenceNumber()); - } - } - } - delete abortit; - if (machineId == localMachineId) { - // Our own messages are immediately dead-> - char livenessType = liveness->getType(); - if (livenessType == TypeLastMessage) { - ((LastMessage *)liveness)->setDead(); - } else if (livenessType == TypeSlot) { - ((Slot *)liveness)->setDead(); - } else { - throw new Error("Unrecognized type"); - } - } - // Get the old last message for this device - Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); - if (lastMessageEntry == NULL) { - // If no last message then there is nothing else to process - return; - } - - int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); - Liveness *lastEntry = lastMessageEntry->getSecond(); - delete lastMessageEntry; - - // If it is not our machine Id since we already set ours to dead - if (machineId != localMachineId) { - char lastEntryType = lastEntry->getType(); - - if (lastEntryType == TypeLastMessage) { - ((LastMessage *)lastEntry)->setDead(); - } else if (lastEntryType == TypeSlot) { - ((Slot *)lastEntry)->setDead(); - } else { - throw new Error("Unrecognized type"); - } - } - // Make sure the server is not playing any games - if (machineId == localMachineId) { - if (hadPartialSendToServer) { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: "); - } - } else { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: "); - } - } - } else { - if (lastMessageSeqNum > seqNum) { - throw new Error("Server Error: Rollback on remote machine sequence number"); - } - } -} - -/** - * Add a rejected message entry to the watch set to keep track of - * which clients have seen that rejected message entry and which have - * not. - */ -void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { - Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); - if (entries == NULL) { - // There is no set for this machine ID yet so create one - entries = new Hashset(); - rejectedMessageWatchVectorTable->put(machineId, entries); - } - entries->add(entry); -} - -/** - * Check if the HMAC chain is not violated - */ -void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { - for (uint i = 0; i < newSlots->length(); i++) { - Slot *currSlot = newSlots->get(i); - Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); - if (prevSlot != NULL && - !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) - throw new Error("Server Error: Invalid HMAC Chain"); - } -}