From 18c08ca8d6f4aed6fa24003826304cd5700f7b7d Mon Sep 17 00:00:00 2001 From: bdemsky Date: Fri, 19 Jan 2018 21:23:08 -0800 Subject: [PATCH] edits --- version2/src/C/Error.h | 9 + version2/src/C/Table.cc | 388 +++++++++++++++++----------------- version2/src/C/Table.h | 14 +- version2/src/C/Transaction.cc | 6 +- version2/src/C/common.h | 2 + 5 files changed, 215 insertions(+), 204 deletions(-) diff --git a/version2/src/C/Error.h b/version2/src/C/Error.h index 4e1126e..7fcd2a9 100644 --- a/version2/src/C/Error.h +++ b/version2/src/C/Error.h @@ -5,4 +5,13 @@ public: Error(const char *msg) {} }; +class Exception { +public: + Exception(const char *msg) {} +}; + +class ServerException { +public: + ServerException(const char *msg) {} +}; #endif diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 2687db1..22d62a2 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -9,8 +9,10 @@ #include "TableStatus.h" #include "TransactionStatus.h" #include "Transaction.h" +#include "LastMessage.h" #include "Random.h" + Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : buffer(NULL), cloud(new CloudComm(this, baseurl, password, listeningPort)), @@ -73,7 +75,7 @@ Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, i init(); } -Table::Table(CloudComm * _cloud, int64_t _localMachineId) : +Table::Table(CloudComm *_cloud, int64_t _localMachineId) : buffer(NULL), cloud(_cloud), random(NULL), @@ -147,29 +149,29 @@ void Table::init() { committedKeyValueTable = new Hashtable(); speculatedKeyValueTable = new Hashtable(); pendingTransactionSpeculatedKeyValueTable = new Hashtable(); - liveNewKeyTable = new Hashtable(); - lastMessageTable = new Hashtable *>(); - rejectedMessageWatchVectorTable = new Hashtable* >(); + liveNewKeyTable = new Hashtable(); + lastMessageTable = new Hashtable *>(); + rejectedMessageWatchVectorTable = new Hashtable * >(); arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable*, Abort*>(); - newTransactionParts = new Hashtable*, TransactionPart*> *>(); - newCommitParts = new Hashtable*, CommitPart*> *>(); + liveAbortTable = new Hashtable *, Abort *>(); + newTransactionParts = new Hashtable *, TransactionPart *> *>(); + newCommitParts = new Hashtable *, CommitPart *> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); - liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable*, Transaction*>(); - liveCommitsTable = new Hashtable >(); - liveCommitsByKeyTable = new Hashtable(); + liveTransactionBySequenceNumberTable = new Hashtable(); + liveTransactionByTransactionIdTable = new Hashtable *, Transaction *>(); + liveCommitsTable = new Hashtable >(); + liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); rejectedSlotVector = new Vector(); - pendingTransactionQueue = new Vector(); - pendingSendArbitrationEntriesToDelete = new Vector(); - transactionPartsSent = new Hashtable *>(); - outstandingTransactionStatus = new Hashtable(); - liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset*>(); - localCommunicationTable = new Hashtable*>(); + pendingTransactionQueue = new Vector(); + pendingSendArbitrationEntriesToDelete = new Vector(); + transactionPartsSent = new Hashtable *>(); + outstandingTransactionStatus = new Hashtable(); + liveAbortsGeneratedByLocal = new Hashtable(); + offlineTransactionsCommittedAndAtServer = new Hashset *>(); + localCommunicationTable = new Hashtable >(); lastTransactionSeenFromMachineFromServer = new Hashtable(); - pendingSendArbitrationRounds = new Vector(); + pendingSendArbitrationRounds = new Vector(); lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new Hashtable(); @@ -179,10 +181,10 @@ void Table::init() { } /** - * Initialize the table by inserting a table status as the first entry into the table status - * also initialize the crypto stuff. + * Initialize the table by inserting a table status as the first entry + * into the table status also initialize the crypto stuff. */ - void Table::initTable() { +void Table::initTable() { cloud->initSecurity(); // Create the first insertion into the block chain which is the table status @@ -190,10 +192,10 @@ void Table::init() { localSequenceNumber++; TableStatus *status = new TableStatus(s, numberOfSlots); s->addEntry(status); - Array *array = cloud->putSlot(s, numberOfSlots); + Array *array = cloud->putSlot(s, numberOfSlots); if (array == NULL) { - array = new Array(1); + array = new Array(1); array->set(0, s); // update local block chain validateAndUpdate(array, true); @@ -206,29 +208,30 @@ void Table::init() { } /** - * Rebuild the table from scratch by pulling the latest block chain from the server. + * Rebuild the table from scratch by pulling the latest block chain + * from the server. */ - void Table::rebuild() { +void Table::rebuild() { // Just pull the latest slots from the server - Array *newslots = cloud->getSlots(sequenceNumber + 1); + Array *newslots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); sendToServer(NULL); updateLiveTransactionsAndStatus(); } - void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { - localCommunicationTable->put(arbitrator, new Pair(hostName, portNumber)); +void Table::addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber) { + localCommunicationTable->put(arbitrator, Pair(hostName, portNumber)); } - int64_t Table::getArbitrator(IoTString *key) { +int64_t Table::getArbitrator(IoTString *key) { return arbitratorTable->get(key); } - void Table::close() { +void Table::close() { cloud->close(); } - IoTString * Table::getCommitted(IoTString *key) { +IoTString *Table::getCommitted(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); if (kv != NULL) { @@ -238,7 +241,7 @@ void Table::init() { } } - IoTString * Table::getSpeculative(IoTString *key) { +IoTString *Table::getSpeculative(IoTString *key) { KeyValue *kv = pendingTransactionSpeculatedKeyValueTable->get(key); if (kv == NULL) { @@ -256,10 +259,10 @@ void Table::init() { } } - IoTString * Table::getCommittedAtomic(IoTString *key) { +IoTString *Table::getCommittedAtomic(IoTString *key) { KeyValue *kv = committedKeyValueTable->get(key); - if (arbitratorTable->get(key) == NULL) { + if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); } @@ -278,8 +281,8 @@ void Table::init() { } } - IoTString * Table::getSpeculativeAtomic(IoTString *key) { - if (arbitratorTable->get(key) == NULL) { +IoTString *Table::getSpeculativeAtomic(IoTString *key) { + if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); } @@ -308,9 +311,9 @@ void Table::init() { } } - bool Table::update() { +bool Table::update() { try { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); + Array *newSlots = cloud->getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); @@ -319,8 +322,6 @@ void Table::init() { return true; } catch (Exception *e) { - // e->printStackTrace(); - for (int64_t m : localCommunicationTable->keySet()) { updateFromLocal(m); } @@ -329,14 +330,14 @@ void Table::init() { return false; } - bool Table::createNewKey(IoTString *keyName, int64_t machineId) { +bool Table::createNewKey(IoTString *keyName, int64_t machineId) { while (true) { - if (arbitratorTable->get(keyName) != NULL) { + if (!arbitratorTable->contains(keyName)) { // There is already an arbitrator return false; } - NewKey * newKey = new NewKey(NULL, keyName, machineId); + NewKey *newKey = new NewKey(NULL, keyName, machineId); if (sendToServer(newKey)) { // If successfully inserted @@ -345,15 +346,15 @@ void Table::init() { } } - void Table::startTransaction() { +void Table::startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransactionBuilder = new PendingTransaction(localMachineId); } - void Table::addKV(IoTString *key, IoTString *value) { +void Table::addKV(IoTString *key, IoTString *value) { // Make sure it is a valid key - if (arbitratorTable->get(key) == NULL) { + if (!arbitratorTable->contains(key)) { throw new Error("Key not Found."); } @@ -368,7 +369,7 @@ void Table::init() { pendingTransactionBuilder->addKV(kv); } - TransactionStatus Table::commitTransaction() { +TransactionStatus *Table::commitTransaction() { if (pendingTransactionBuilder->getKVUpdates()->size() == 0) { // transaction with no updates will have no effect on the system @@ -380,7 +381,7 @@ void Table::init() { localTransactionSequenceNumber++; // Create the transaction status - TransactionStatus transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator()); + TransactionStatus *transactionStatus = new TransactionStatus(TransactionStatus_StatusPending, pendingTransactionBuilder->getArbitrator()); // Create the new transaction Transaction *newTransaction = pendingTransactionBuilder->createTransaction(); @@ -400,9 +401,9 @@ void Table::init() { sendToServer(NULL); } catch (ServerException *e) { - Hashset* arbitratorTriedAndFailed = new Hashset(); - for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { - Transaction * transaction = iter->next(); + Hashset *arbitratorTriedAndFailed = new Hashset(); + for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { + Transaction *transaction = iter->next(); if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client @@ -410,7 +411,7 @@ void Table::init() { continue; } - Pair * sendReturn = sendTransactionToLocal(transaction); + Pair *sendReturn = sendTransactionToLocal(transaction); if (sendReturn->getFirst()) { // Failed to contact over local @@ -446,16 +447,16 @@ int64_t Table::getLocalSequenceNumber() { bool lastInsertedNewKey = false; -bool Table::sendToServer(NewKey* newKey) { +bool Table::sendToServer(NewKey *newKey) { bool fromRetry = false; try { if (hadPartialSendToServer) { - Array *newSlots = cloud->getSlots(sequenceNumber + 1); + Array *newSlots = cloud->getSlots(sequenceNumber + 1); if (newSlots->length() == 0) { fromRetry = true; - ThreeTuple *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + ThreeTuple *> *sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); if (sendSlotsReturn->getFirst()) { if (newKey != NULL) { @@ -487,23 +488,24 @@ bool Table::sendToServer(NewKey* newKey) { newSlots = sendSlotsReturn->getThird(); bool isInserted = false; - for (Slot *s : newSlots) { + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot *s : newSlots) { + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); if (isInserted) { break; } // Process each entry in the slot for (Entry *entry : s->getEntries()) { - if (entry->getType() == TypeLastMessage) { - LastMessage lastMessage = (LastMessage)entry; + LastMessage *lastMessage = (LastMessage *)entry; if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; @@ -561,14 +563,16 @@ bool Table::sendToServer(NewKey* newKey) { // continue; } else { bool isInserted = false; - for (Slot *s : newSlots) { + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot *s : newSlots) { + for (uint si = 0; si < newSlots->length(); si++) { + Slot *s = newSlots->get(si); if (isInserted) { break; } @@ -577,7 +581,7 @@ bool Table::sendToServer(NewKey* newKey) { for (Entry *entry : s->getEntries()) { if (entry->getType() == TypeLastMessage) { - LastMessage lastMessage = (LastMessage)entry; + LastMessage *lastMessage = (LastMessage *)entry; if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) { isInserted = true; break; @@ -650,7 +654,7 @@ bool Table::sendToServer(NewKey* newKey) { // If there is a new key with same name then end - if ((newKey != NULL) && (arbitratorTable->get(newKey->getKey()) != NULL)) { + if ((newKey != NULL) && arbitratorTable->contains(newKey->getKey())) { return false; } @@ -688,11 +692,11 @@ bool Table::sendToServer(NewKey* newKey) { lastInsertedNewKey = insertedNewKey; lastNewSize = newSize; lastNewKey = newKey; - lastTransactionPartsSent = new Hashtable* >(transactionPartsSent); - lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); + lastTransactionPartsSent = new Hashtable * >(transactionPartsSent); + lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); if (sendSlotsReturn->getFirst()) { @@ -706,8 +710,8 @@ bool Table::sendToServer(NewKey* newKey) { } // Remove the aborts and commit parts that were sent from the pending to send queue - for (Iterator iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) { - ArbitrationRound round = iter->next(); + for (Iterator *iter = pendingSendArbitrationRounds->iterator(); iter->hasNext(); ) { + ArbitrationRound *round = iter->next(); round->removeParts(pendingSendArbitrationEntriesToDelete); if (round->isDoneSending()) { @@ -788,8 +792,8 @@ bool Table::sendToServer(NewKey* newKey) { return newKey == NULL; } - bool Table::updateFromLocal(int64_t machineId) { - Pair localCommunicationInformation = localCommunicationTable->get(machineId); +bool Table::updateFromLocal(int64_t machineId) { + Pair localCommunicationInformation = localCommunicationTable->get(machineId); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing return false; @@ -804,7 +808,7 @@ bool Table::sendToServer(NewKey* newKey) { } Array *sendData = new Array(sendDataSize); - ByteBuffer * bbEncode = ByteBuffer_wrap(sendData); + ByteBuffer *bbEncode = ByteBuffer_wrap(sendData); // Encode the data bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); @@ -842,7 +846,7 @@ bool Table::sendToServer(NewKey* newKey) { Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); if (localCommunicationInformation == NULL) { // Cant talk to that device locally so do nothing @@ -891,7 +895,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort)Abort_decode(NULL, bbDecode); if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; @@ -925,7 +929,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { return new Pair(false, true); } - Array *Table::acceptDataFromLocal(Array *data) { +Array *Table::acceptDataFromLocal(Array *data) { // Decode the data ByteBuffer *bbDecode = ByteBuffer_wrap(data); @@ -942,7 +946,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { Transaction *transaction = new Transaction(); for (int i = 0; i < numberOfParts; i++) { bbDecode->get(); - TransactionPart newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); + TransactionPart *newPart = (TransactionPart)TransactionPart.decode(NULL, bbDecode); transaction->addPartDecode(newPart); } @@ -961,7 +965,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { // The data to send back int returnDataSize = 0; - Vector *unseenArbitrations = new Vector(); + Vector *unseenArbitrations = new Vector(); // Get the aborts to send back Vector *abortLocalSequenceNumbers = new Vector(liveAbortsGeneratedByLocal->keySet()); @@ -971,19 +975,19 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { continue; } - Abort abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); + Abort *abort = liveAbortsGeneratedByLocal->get(localSequenceNumber); unseenArbitrations->add(abort); returnDataSize += abort->getSize(); } // Get the commits to send back - Hashtable* commitForClientTable = liveCommitsTable->get(localMachineId); + Hashtable *commitForClientTable = liveCommitsTable->get(localMachineId); if (commitForClientTable != NULL) { Vector *commitLocalSequenceNumbers = new Vector(commitForClientTable->keySet()); Collections->sort(commitLocalSequenceNumbers); for (int64_t localSequenceNumber : commitLocalSequenceNumbers) { - Commit commit = commitForClientTable->get(localSequenceNumber); + Commit *commit = commitForClientTable->get(localSequenceNumber); if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { continue; @@ -1032,16 +1036,16 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { return returnData; } -ThreeTuple *> * Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { +ThreeTuple *> *Table::sendSlotsToServer(Slot *slot, int newSize, bool isNewKey) { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; bool inserted = false; bool lastTryInserted = false; - Array *array = cloud->putSlot(slot, newSize); + Array *array = cloud->putSlot(slot, newSize); if (array == NULL) { - array = new Array(); + array = new Array(); array->set(0, slot); rejectedSlotVector->clear(); inserted = true; @@ -1070,7 +1074,7 @@ ThreeTuple *> * Table::sendSlotsToServer(Slot *slot, in for (Entry *entry : s->getEntries()) { if (entry->getType() == TypeLastMessage) { - LastMessage lastMessage = (LastMessage)entry; + LastMessage *lastMessage = (LastMessage *)entry; if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) { isInserted = true; @@ -1092,13 +1096,13 @@ ThreeTuple *> * Table::sendSlotsToServer(Slot *slot, in } } - return new ThreeTuple *>(inserted, lastTryInserted, array); + return new ThreeTuple *>(inserted, lastTryInserted, array); } /** * Returns false if a resize was needed */ -ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey * newKeyEntry) { +ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) { int newSize = 0; @@ -1143,17 +1147,17 @@ ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey transactionPartsSent->clear(); pendingSendArbitrationEntriesToDelete->clear(); - for (ArbitrationRound round : pendingSendArbitrationRounds) { + for (ArbitrationRound *round : pendingSendArbitrationRounds) { bool isFull = false; round->generateParts(); - Vector* parts = round->getParts(); + Vector *parts = round->getParts(); // Insert pending arbitration data - for (Entry arbitrationData : parts) { + for (Entry *arbitrationData : parts) { // If it is an abort then we need to set some information if (arbitrationData instanceof Abort) { - ((Abort)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); + ((Abort *)arbitrationData)->setSequenceNumber(slot->getSequenceNumber()); } if (!slot->hasSpace(arbitrationData)) { @@ -1196,7 +1200,7 @@ ThreeTuple *Table::fillSlot(Slot *slot, bool resize, NewKey if (slot->hasSpace(part)) { slot->addEntry(part); - Vector* partsSent = transactionPartsSent->get(transaction); + Vector *partsSent = transactionPartsSent->get(transaction); if (partsSent == NULL) { partsSent = new Vector(); transactionPartsSent->put(transaction, partsSent); @@ -1224,7 +1228,7 @@ void Table::doRejectedMessages(Slot *s) { int64_t old_seqn = rejectedSlotVector->firstElement(); if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); - RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s->addEntry(rm); } else { int64_t prev_seqn = -1; @@ -1239,7 +1243,7 @@ void Table::doRejectedMessages(Slot *s) { } /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); s->addEntry(rm); } /* Generate rejected message entries for present messages */ @@ -1247,7 +1251,7 @@ void Table::doRejectedMessages(Slot *s) { int64_t curr_seqn = rejectedSlotVector->get(i); Slot *s_msg = buffer->getSlot(curr_seqn); int64_t machineid = s_msg->getMachineID(); - RejectedMessage rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); + RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); s->addEntry(rm); } } @@ -1283,10 +1287,10 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize seenLiveSlot = true; // Get all the live entries for a slot - Vector* liveEntries = previousSlot->getLiveEntries(resize); + Vector *liveEntries = previousSlot->getLiveEntries(resize); // Iterate over all the live entries and try to rescue them - for (Entry liveEntry : liveEntries) { + for (Entry *liveEntry : liveEntries) { if (slot->hasSpace(liveEntry)) { // Enough space to rescue the entry @@ -1312,7 +1316,7 @@ void Table::doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resi int64_t newestseqnum = buffer->getNewestSeqNum(); search: for (; seqn <= newestseqnum; seqn++) { - Slot prevslot = buffer->getSlot(seqn); + Slot *prevslot = buffer->getSlot(seqn); //Push slot number forward if (!seenliveslot) oldestLiveSlotSequenceNumver = seqn; @@ -1320,7 +1324,7 @@ search: if (!prevslot->isLive()) continue; seenliveslot = true; - Vector* liveentries = prevslot->getLiveEntries(resize); + Vector *liveentries = prevslot->getLiveEntries(resize); for (Entry *liveentry : liveentries) { if (s->hasSpace(liveentry)) s->addEntry(liveentry); @@ -1336,7 +1340,7 @@ search: /** * Checks for malicious activity and updates the local copy of the block chain-> */ -void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { +void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) { // The cloud communication layer has checked slot HMACs already before decoding if (newSlots->length() == 0) { @@ -1351,7 +1355,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal) // Create an object that can access both new slots and slots in our local chain // without committing slots to our local chain - SlotIndexer indexer = new SlotIndexer(newSlots, buffer); + SlotIndexer *indexer = new SlotIndexer(newSlots, buffer); // Check that the HMAC chain is not broken checkHMACChain(indexer, newSlots); @@ -1491,11 +1495,11 @@ void Table::commitNewMaxSize() { // Resize the local slot buffer if (numberOfSlots != currMaxSize) { - buffer->resize((int)currMaxSize); + buffer->resize((int32_t)currMaxSize); } // Change the number of local slots to the new size - numberOfSlots = (int)currMaxSize; + numberOfSlots = (int32_t)currMaxSize; // Recalculate the resize threshold since the size of the local buffer has changed @@ -1514,10 +1518,10 @@ void Table::processNewTransactionParts() { // Iterate through all the machine Ids that we received new parts for for (int64_t machineId : newTransactionParts->keySet()) { - Hashtable*, TransactionPart*> * parts = newTransactionParts->get(machineId); + Hashtable *, TransactionPart *> *parts = newTransactionParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair* partId : parts->keySet()) { + for (Pair *partId : parts->keySet()) { TransactionPart *part = parts->get(partId); int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(part->getArbitratorId()); @@ -1560,11 +1564,11 @@ void Table::arbitrateFromServer() { Collections->sort(transactionSequenceNumbers); // Collection of key value pairs that are - Hashtable speculativeTableTmp = new Hashtable(); + Hashtable speculativeTableTmp = new Hashtable(); // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; - Hashset* generatedAborts = new Hashset(); + Hashset *generatedAborts = new Hashset(); for (int64_t transactionSequenceNumber : transactionSequenceNumbers) { Transaction *transaction = liveTransactionBySequenceNumberTable->get(transactionSequenceNumber); @@ -1617,12 +1621,12 @@ void Table::arbitrateFromServer() { // Guard evaluated was false so create abort // Create the abort - Abort newAbort = new Abort(NULL, - transaction->getClientLocalSequenceNumber(), - transaction->getSequenceNumber(), - transaction->getMachineId(), - transaction->getArbitrator(), - localArbitrationSequenceNumber); + Abort *newAbort = new Abort(NULL, + transaction->getClientLocalSequenceNumber(), + transaction->getSequenceNumber(), + transaction->getMachineId(), + transaction->getArbitrator(), + localArbitrationSequenceNumber); localArbitrationSequenceNumber++; generatedAborts->add(newAbort); @@ -1636,7 +1640,7 @@ void Table::arbitrateFromServer() { // liveTransactionBySequenceNumberTable->remove(transactionSequenceNumber); } - Commit newCommit = NULL; + Commit *newCommit = NULL; // If there is something to commit if (speculativeTableTmp->size() != 0) { @@ -1656,19 +1660,19 @@ void Table::arbitrateFromServer() { // Append all the commit parts to the end of the pending queue waiting for sending to the server // Insert the commit so we can process it - for (CommitPart commitPart : newCommit->getParts()->values()) { + for (CommitPart *commitPart : newCommit->getParts()->values()) { processEntry(commitPart); } } if ((newCommit != NULL) || (generatedAborts->size() > 0)) { - ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); + ArbitrationRound *arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (newArbitrationRound->getCommit() != NULL) { - for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } @@ -1703,7 +1707,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { // Guard evaluated as true // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); + Commit *newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); localArbitrationSequenceNumber++; // Update the local changes so we can make the commit @@ -1719,19 +1723,19 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } else { // Insert the commit so we can process it - for (CommitPart commitPart : newCommit->getParts()->values()) { + for (CommitPart *commitPart : newCommit->getParts()->values()) { processEntry(commitPart); } } if (transaction->getMachineId() == localMachineId) { - TransactionStatus status = transaction->getTransactionStatus(); + TransactionStatus *status = transaction->getTransactionStatus(); if (status != NULL) { status->setStatus(TransactionStatus_StatusCommitted); } @@ -1750,28 +1754,28 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { status->setStatus(TransactionStatus_StatusAborted); } } else { - Hashset addAbortSet = new Hashset(); + Hashset *addAbortSet = new Hashset(); // Create the abort - Abort newAbort = new Abort(NULL, - transaction->getClientLocalSequenceNumber(), - -1, - transaction->getMachineId(), - transaction->getArbitrator(), - localArbitrationSequenceNumber); + Abort *newAbort = new Abort(NULL, + transaction->getClientLocalSequenceNumber(), + -1, + transaction->getMachineId(), + transaction->getArbitrator(), + localArbitrationSequenceNumber); localArbitrationSequenceNumber++; addAbortSet->add(newAbort); // Append all the commit parts to the end of the pending queue waiting for sending to the server - ArbitrationRound arbitrationRound = new ArbitrationRound(NULL, addAbortSet); + ArbitrationRound *arbitrationRound = new ArbitrationRound(NULL, addAbortSet); pendingSendArbitrationRounds->add(arbitrationRound); if (compactArbitrationData()) { - ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); - for (CommitPart commitPart : newArbitrationRound->getCommit()->getParts()->values()) { + ArbitrationRound *newArbitrationRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + for (CommitPart *commitPart : newArbitrationRound->getCommit()->getParts()->values()) { processEntry(commitPart); } } @@ -1792,7 +1796,7 @@ bool Table::compactArbitrationData() { return false; } - ArbitrationRound lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); + ArbitrationRound *lastRound = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - 1); if (lastRound->didSendPart()) { return false; } @@ -1802,7 +1806,7 @@ bool Table::compactArbitrationData() { int numberToDelete = 1; while (numberToDelete < pendingSendArbitrationRounds->size()) { - ArbitrationRound round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); + ArbitrationRound *xs round = pendingSendArbitrationRounds->get(pendingSendArbitrationRounds->size() - numberToDelete - 1); if (round->isFull() || round->didSendPart()) { // Stop since there is a part that cannot be compacted and we need to compact in order @@ -1885,22 +1889,22 @@ bool Table::updateCommittedTable() { // Iterate through all the machine Ids that we received new parts for for (int64_t machineId : newCommitParts->keySet()) { - Hashtable*, CommitPart*>* parts = newCommitParts->get(machineId); + Hashtable *, CommitPart *> *parts = newCommitParts->get(machineId); // Iterate through all the parts for that machine Id - for (Pair* partId : parts->keySet()) { - CommitPart part = parts->get(partId); + for (Pair *partId : parts->keySet()) { + CommitPart *part = parts->get(partId); // Get the transaction object for that sequence number - Hashtable* commitForClientTable = liveCommitsTable->get(part->getMachineId()); + Hashtable *commitForClientTable = liveCommitsTable->get(part->getMachineId()); if (commitForClientTable == NULL) { // This is the first commit from this device - commitForClientTable = new Hashtable(); + commitForClientTable = new Hashtable(); liveCommitsTable->put(part->getMachineId(), commitForClientTable); } - Commit commit = commitForClientTable->get(part->getSequenceNumber()); + Commit *commit = commitForClientTable->get(part->getSequenceNumber()); if (commit == NULL) { // This is a new commit that we dont have so make a new one @@ -1925,10 +1929,10 @@ bool Table::updateCommittedTable() { for (int64_T arbitratorId : liveCommitsTable->keySet()) { // Get all the commits for a specific arbitrator - Hashtable commitForClientTable = liveCommitsTable->get(arbitratorId); + Hashtable *commitForClientTable = liveCommitsTable->get(arbitratorId); // Sort the commits in order - Vector* commitSequenceNumbers = new Vector(commitForClientTable->keySet()); + Vector *commitSequenceNumbers = new Vector(commitForClientTable->keySet()); Collections->sort(commitSequenceNumbers); // Get the last commit seen from this arbitrator @@ -2005,13 +2009,13 @@ bool Table::updateCommittedTable() { commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated - for (Commit * previousCommit : commitsToEdit) { + for (Commit *previousCommit : commitsToEdit) { // Only bother with live commits (TODO: Maybe remove this check) if (previousCommit->isLive()) { // Update which keys in the old commits are still live - for (KeyValue * kv : commit->getKeyValueUpdateSet()) { + for (KeyValue *kv : commit->getKeyValueUpdateSet()) { previousCommit->invalidateKey(kv->getKey()); } @@ -2055,7 +2059,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { } // Create a list of the transaction sequence numbers and sort them from oldest to newest - Vector* transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); + Vector *transactionSequenceNumbersSorted = new Vector(liveTransactionBySequenceNumberTable->keySet()); Collections->sort(transactionSequenceNumbersSorted); bool hasGapInTransactionSequenceNumbers = transactionSequenceNumbersSorted->get(0) != oldestTransactionSequenceNumberSpeculatedOn; @@ -2131,7 +2135,6 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr return; } - if (didProcessNewCommitsOrSpeculate || (firstPendingTransaction != pendingTransactionQueue->get(0))) { // need to reset on the pending speculation lastPendingTransactionSpeculatedOn = NULL; @@ -2167,7 +2170,7 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr void Table::updateLiveTransactionsAndStatus() { // Go through each of the transactions - for (IteratorEntry >* iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { + for (IteratorEntry > *iter = liveTransactionBySequenceNumberTable->entrySet()->iterator(); iter->hasNext();) { Transaction *transaction = iter->next()->getValue(); // Check if the transaction is dead @@ -2184,8 +2187,8 @@ void Table::updateLiveTransactionsAndStatus() { } // Go through each of the transactions - for (IteratorEntry >* iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { - TransactionStatus status = iter->next()->getValue(); + for (IteratorEntry > *iter = outstandingTransactionStatus->entrySet()->iterator(); iter->hasNext();) { + TransactionStatus *status = iter->next()->getValue(); // Check if the transaction is dead int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()); @@ -2203,7 +2206,7 @@ void Table::updateLiveTransactionsAndStatus() { /** * Process this slot, entry by entry-> Also update the latest message sent by slot */ -void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet) { +void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset *machineSet) { // Update the last message seen updateLastMessage(slot->getMachineID(), slot->getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); @@ -2213,31 +2216,31 @@ void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLoc switch (entry->getType()) { case TypeCommitPart: - processEntry((CommitPart)entry); + processEntry((CommitPart *)entry); break; case TypeAbort: - processEntry((Abort)entry); + processEntry((Abort *)entry); break; case TypeTransactionPart: - processEntry((TransactionPart)entry); + processEntry((TransactionPart *)entry); break; case TypeNewKey: - processEntry((NewKey)entry); + processEntry((NewKey *)entry); break; case TypeLastMessage: - processEntry((LastMessage)entry, machineSet); + processEntry((LastMessage *)entry, machineSet); break; case TypeRejectedMessage: - processEntry((RejectedMessage)entry, indexer); + processEntry((RejectedMessage *)entry, indexer); break; case TypeTableStatus: - processEntry((TableStatus)entry, slot->getSequenceNumber()); + processEntry((TableStatus *)entry, slot->getSequenceNumber()); break; default: @@ -2249,7 +2252,7 @@ void Table::processSlot(SlotIndexer indexer, Slot *slot, bool acceptUpdatesToLoc /** * Update the last message that was sent for a machine Id */ -void Table::processEntry(LastMessage entry, Hashset *machineSet) { +void Table::processEntry(LastMessage *entry, Hashset *machineSet) { // Update what the last message received by a machine was updateLastMessage(entry->getMachineID(), entry->getSequenceNumber(), entry, false, machineSet); } @@ -2257,13 +2260,13 @@ void Table::processEntry(LastMessage entry, Hashset *machineSet) { /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) */ -void Table::processEntry(NewKey* entry) { +void Table::processEntry(NewKey *entry) { // Update the arbitrator table with the new key information arbitratorTable->put(entry->getKey(), entry->getMachineID()); // Update what the latest live new key is - NewKey oldNewKey = liveNewKeyTable->put(entry->getKey(), entry); + NewKey *oldNewKey = liveNewKeyTable->put(entry->getKey(), entry); if (oldNewKey != NULL) { // Delete the old new key messages oldNewKey->setDead(); @@ -2293,7 +2296,7 @@ void Table::processEntry(TableStatus entry, int64_t seq) { /** * Check old messages to see if there is a block chain violation-> Also */ -void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { +void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { int64_t oldSeqNum = entry->getOldSeqNum(); int64_t newSeqNum = entry->getNewSeqNum(); bool isequal = entry->getEqual(); @@ -2320,7 +2323,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { // Create a list of clients to watch until they see this rejected message entry-> Hashset *deviceWatchSet = new Hashset(); - for (Map->Entry*>* lastMessageEntry : lastMessageTable->entrySet()) { + for (Map->Entry *> *lastMessageEntry : lastMessageTable->entrySet()) { // Machine ID for the last message entry int64_t lastMessageEntryMachineId = lastMessageEntry->getKey(); @@ -2331,7 +2334,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { continue; } - Pair *lastMessageValue = lastMessageEntry->getValue(); + Pair *lastMessageValue = lastMessageEntry->getValue(); int64_t entrySequenceNumber = lastMessageValue->getFirst(); if (entrySequenceNumber < seq) { @@ -2357,19 +2360,19 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { * Check if this abort is live, if not then save it so we can kill it later-> * update the last transaction number that was arbitrated on-> */ -void Table::processEntry(Abort entry) { +void Table::processEntry(Abort *entry) { if (entry->getTransactionSequenceNumber() != -1) { // update the transaction status if it was sent to the server - TransactionStatus status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); + TransactionStatus *status = outstandingTransactionStatus->remove(entry->getTransactionSequenceNumber()); if (status != NULL) { status->setStatus(TransactionStatus_StatusAborted); } } // Abort has not been seen by the client it is for yet so we need to keep track of it - Abort previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); + Abort *previouslySeenAbort = liveAbortTable->put(entry->getAbortId(), entry); if (previouslySeenAbort != NULL) { previouslySeenAbort->setDead(); // Delete old version of the abort since we got a rescued newer version } @@ -2391,9 +2394,6 @@ void Table::processEntry(Abort entry) { return; } - - - // Update the last arbitration data that we have seen so far if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { @@ -2428,7 +2428,7 @@ void Table::processEntry(Abort entry) { /** * Set dead the transaction part if that transaction is dead and keep track of all new parts */ -void Table::processEntry(TransactionPart entry) { +void Table::processEntry(TransactionPart *entry) { // Check if we have already seen this transaction and set it dead OR if it is not alive int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getArbitratorId()); if ((lastTransactionNumber != NULL) && (lastTransactionNumber >= entry->getSequenceNumber())) { @@ -2438,11 +2438,11 @@ void Table::processEntry(TransactionPart entry) { } // This part is still alive - Hashtable*, TransactionPart*>* transactionPart = newTransactionParts->get(entry->getMachineId()); + Hashtable *, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one - transactionPart = new Hashtable*, TransactionPart*>(); + transactionPart = new Hashtable *, TransactionPart *>(); newTransactionParts->put(entry->getMachineId(), transactionPart); } @@ -2456,9 +2456,7 @@ void Table::processEntry(TransactionPart entry) { /** * Process new commit entries and save them for future use-> Delete duplicates */ -void Table::processEntry(CommitPart entry) { - - +void Table::processEntry(CommitPart *entry) { // Update the last transaction that was updated if we can if (entry->getTransactionSequenceNumber() != -1) { int64_t lastTransactionNumber = lastArbitratedTransactionNumberByArbitratorTable->get(entry->getMachineId()); @@ -2472,16 +2470,16 @@ void Table::processEntry(CommitPart entry) { - Hashtable*, CommitPart*>* commitPart = newCommitParts->get(entry->getMachineId()); + Hashtable *, CommitPart *> *commitPart = newCommitParts->get(entry->getMachineId()); if (commitPart == NULL) { // Don't have a table for this machine Id yet so make one - commitPart = new Hashtable*, CommitPart*>(); + commitPart = new Hashtable *, CommitPart *>(); newCommitParts->put(entry->getMachineId(), commitPart); } // Update the part and set dead ones we have already seen (got a rescued version) - CommitPart previouslySeenPart = commitPart->put(entry->getPartId(), entry); + CommitPart *previouslySeenPart = commitPart->put(entry->getPartId(), entry); if (previouslySeenPart != NULL) { previouslySeenPart->setDead(); } @@ -2493,7 +2491,7 @@ void Table::processEntry(CommitPart entry) { * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message-> */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet->remove(machineId); @@ -2505,9 +2503,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene if (watchset != NULL) { // Go through each rejected message that this machine Id has not seen yet - for (Iterator rmit = watchset->iterator(); rmit->hasNext(); ) { + for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { - RejectedMessage rm = rmit->next(); + RejectedMessage *rm = rmit->next(); // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { @@ -2522,8 +2520,8 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene } // Set dead the abort - for (IteratorEntry*, Abort*> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { - Abort abort = i->next()->getValue(); + for (IteratorEntry *, Abort *> > i = liveAbortTable->entrySet()->iterator(); i->hasNext();) { + Abort *abort = i->next()->getValue(); if ((abort->getTransactionMachineId() == machineId) && (abort->getSequenceNumber() <= seqNum)) { abort->setDead(); @@ -2540,30 +2538,30 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene if (machineId == localMachineId) { // Our own messages are immediately dead-> if (liveness instanceof LastMessage) { - ((LastMessage)liveness)->setDead(); + ((LastMessage *)liveness)->setDead(); } else if (liveness instanceof Slot) { - ((Slot)liveness)->setDead(); + ((Slot *)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); + Pair *lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; } int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); - Liveness lastEntry = lastMessageEntry->getSecond(); + Liveness *lastEntry = lastMessageEntry->getSecond(); // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { if (lastEntry instanceof LastMessage) { - ((LastMessage)lastEntry)->setDead(); + ((LastMessage *)lastEntry)->setDead(); } else if (lastEntry instanceof Slot) { - ((Slot)lastEntry)->setDead(); + ((Slot *)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); } @@ -2595,7 +2593,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene * Add a rejected message entry to the watch set to keep track of which clients have seen that * rejected message entry and which have not-> */ -void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { +void Table::addWatchVector(int64_t machineId, RejectedMessage *entry) { Hashset *entries = rejectedMessageWatchVectorTable->get(machineId); if (entries == NULL) { // There is no set for this machine ID yet so create one @@ -2608,13 +2606,13 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { /** * Check if the HMAC chain is not violated */ -void Table::checkHMACChain(SlotIndexer indexer, Array *newSlots) { +void Table::checkHMACChain(SlotIndexer *indexer, Array *newSlots) { for (int i = 0; i < newSlots->length; i++) { - Slot currSlot = newSlots[i]; - Slot prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); + Slot *currSlot = newSlots[i]; + Slot *prevSlot = indexer->getSlot(currSlot->getSequenceNumber() - 1); if (prevSlot != NULL && - !Arrays->equals(prevSlot->getHMAC(), currSlot->getPrevHMAC())) - throw new Error("Server Error: Invalid HMAC Chain" + currSlot + " " + prevSlot); + !prevSlot->getHMAC()->equals(currSlot->getPrevHMAC())) + throw new Error("Server Error: Invalid HMAC Chain"); } } diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index 1d58776..3f52214 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -110,7 +110,7 @@ private: /** * Checks for malicious activity and updates the local copy of the block chain. */ - void validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal); + void validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal); void updateLiveStateFromServer(); @@ -146,7 +146,7 @@ private: void arbitrateFromServer(); - Pair arbitrateOnLocalTransaction(Transaction * transaction); + Pair arbitrateOnLocalTransaction(Transaction *transaction); /** * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates @@ -186,7 +186,7 @@ private: /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) */ - void processEntry(NewKey * entry); + void processEntry(NewKey *entry); /** * Process new table status entries and set dead the old ones as new ones come in. @@ -228,12 +228,12 @@ private: * Add a rejected message entry to the watch set to keep track of which clients have seen that * rejected message entry and which have not. */ - void addWatchVector(int64_t machineId, RejectedMessage * entry); + void addWatchVector(int64_t machineId, RejectedMessage *entry); /** * Check if the HMAC chain is not violated */ - void checkHMACChain(SlotIndexer * indexer, Array *newSlots); + void checkHMACChain(SlotIndexer *indexer, Array *newSlots); public: @@ -249,7 +249,7 @@ public: /** * Rebuild the table from scratch by pulling the latest block chain from the server. */ - bool update(); + void rebuild(); void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber); int64_t getArbitrator(IoTString *key); @@ -257,6 +257,8 @@ public: IoTString *getCommitted(IoTString *key); IoTString *getSpeculative(IoTString *key); IoTString *getCommittedAtomic(IoTString *key); + IoTString *getSpeculativeAtomic(IoTString *key); + bool update(); bool createNewKey(IoTString *keyName, int64_t machineId); void startTransaction(); void addKV(IoTString *key, IoTString *value); diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 8dba5a3..b5b8e50 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -48,7 +48,7 @@ void Transaction::addPartDecode(TransactionPart *newPart) { clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber(); machineId = newPart->getMachineId(); - TransactionPart * previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); + TransactionPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); if (previoslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part @@ -198,7 +198,7 @@ void Transaction::setDead() { // Make all the parts of this transaction dead for (int32_t partNumber : parts->keySet()) { - TransactionPart* part = parts->get(partNumber); + TransactionPart *part = parts->get(partNumber); part->setDead(); } } @@ -227,7 +227,7 @@ void Transaction::decodeTransactionData() { } // Decoder Object - ByteBuffer* bbDecode = ByteBuffer_wrap(combinedData); + ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData); // Decode how many key value pairs need to be decoded int numberOfKVGuards = bbDecode->getInt(); diff --git a/version2/src/C/common.h b/version2/src/C/common.h index a3888cd..0f96b7f 100644 --- a/version2/src/C/common.h +++ b/version2/src/C/common.h @@ -44,6 +44,8 @@ class Transaction; class TransactionPart; class TransactionStatus; class Error; +class Exception; +class ServerException; //Code to write class SecretKeySpec; -- 2.34.1