From 3e80b7cd423be3b7961a5f8d22bb6c274f1bc83a Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 15 Feb 2018 14:33:31 -0800 Subject: [PATCH] More edits --- version2/src/C/Error.h | 6 +- version2/src/C/Liveness.h | 1 + version2/src/C/Pair.h | 5 ++ version2/src/C/Table.cc | 146 ++++++++++++++++++++++---------------- version2/src/C/hashset.h | 22 +++--- version2/src/C/vector.h | 12 ++++ 6 files changed, 119 insertions(+), 73 deletions(-) diff --git a/version2/src/C/Error.h b/version2/src/C/Error.h index 7fcd2a9..1bb4dd0 100644 --- a/version2/src/C/Error.h +++ b/version2/src/C/Error.h @@ -10,8 +10,12 @@ public: Exception(const char *msg) {} }; +#define ServerException_TypeInputTimeout 1 + class ServerException { public: - ServerException(const char *msg) {} + ServerException(const char *msg, char _type) : type(_type) {} + char getType(); + char type; }; #endif diff --git a/version2/src/C/Liveness.h b/version2/src/C/Liveness.h index 1f92b99..2a5b204 100644 --- a/version2/src/C/Liveness.h +++ b/version2/src/C/Liveness.h @@ -2,6 +2,7 @@ #define LIVENESS_H class Liveness { + public: /** * Returns a char encoding the type of the entry object. */ diff --git a/version2/src/C/Pair.h b/version2/src/C/Pair.h index 5506eae..81d5ebb 100644 --- a/version2/src/C/Pair.h +++ b/version2/src/C/Pair.h @@ -13,6 +13,11 @@ public: b(_b) { } + Pair(Pair * p) : + a(p->a), + b(p->b) { + } + A getFirst() { return a; } diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index ee3b8bb..424e105 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -325,7 +325,7 @@ bool Table::update() { updateLiveTransactionsAndStatus(); return true; } catch (Exception *e) { - SetIterator *kit = getKeyIterator(localCommunicationTable); + SetIterator *> *kit = getKeyIterator(localCommunicationTable); while (kit->hasNext()) { int64_t m = kit->next(); updateFromLocal(m); @@ -468,7 +468,7 @@ bool Table::sendToServer(NewKey *newKey) { } } - SetIterator *trit = getKeyIterator(lastTransactionPartsSent); + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -527,7 +527,7 @@ bool Table::sendToServer(NewKey *newKey) { } } - SetIterator *trit = getKeyIterator(lastTransactionPartsSent); + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -557,7 +557,7 @@ bool Table::sendToServer(NewKey *newKey) { } } - SetIterator *trit = getKeyIterator(lastTransactionPartsSent); + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -612,7 +612,7 @@ bool Table::sendToServer(NewKey *newKey) { } } - SetIterator *trit = getKeyIterator(lastTransactionPartsSent); + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -640,7 +640,7 @@ bool Table::sendToServer(NewKey *newKey) { } delete trit; } else { - SetIterator *trit = getKeyIterator(lastTransactionPartsSent); + SetIterator *> *trit = getKeyIterator(lastTransactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -691,7 +691,7 @@ bool Table::sendToServer(NewKey *newKey) { if (needsResize) { // Reset which transaction to send - SetIterator *trit = getKeyIterator(transactionPartsSent); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); @@ -747,7 +747,7 @@ bool Table::sendToServer(NewKey *newKey) { } pendingSendArbitrationRounds->setSize(oldcount); - SetIterator *trit = getKeyIterator(transactionPartsSent); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetServerFailure(); @@ -770,7 +770,7 @@ bool Table::sendToServer(NewKey *newKey) { delete trit; } else { // Reset which transaction to send - SetIterator *trit = getKeyIterator(transactionPartsSent); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); @@ -794,9 +794,9 @@ bool Table::sendToServer(NewKey *newKey) { } } catch (ServerException *e) { - if (e->getType() != ServerException->TypeInputTimeout) { + if (e->getType() != ServerException_TypeInputTimeout) { // Nothing was able to be sent to the server so just clear these data structures - SetIterator *trit = getKeyIterator(transactionPartsSent); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); @@ -812,7 +812,7 @@ bool Table::sendToServer(NewKey *newKey) { hadPartialSendToServer = true; // Nothing was able to be sent to the server so just clear these data structures - SetIterator *trit = getKeyIterator(transactionPartsSent); + SetIterator *> *trit = getKeyIterator(transactionPartsSent); while (trit->hasNext()) { Transaction *transaction = trit->next(); transaction->resetNextPartToSend(); @@ -840,7 +840,7 @@ bool Table::updateFromLocal(int64_t machineId) { int sendDataSize = sizeof(int32_t) + sizeof(int64_t); int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(machineId)) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(machineId); } @@ -852,7 +852,7 @@ bool Table::updateFromLocal(int64_t machineId) { bbEncode->putInt(0); // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -883,19 +883,24 @@ bool Table::updateFromLocal(int64_t machineId) { Pair Table::sendTransactionToLocal(Transaction *transaction) { // Get the devices local communications - if (!localCommunicationTable->contains(machineId)) + if (!localCommunicationTable->contains(transaction->getArbitrator())) return Pair(true, false); - Pair localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); + Pair * localCommunicationInformation = localCommunicationTable->get(transaction->getArbitrator()); // Get the size of the send data int sendDataSize = sizeof(int32_t) + sizeof(int64_t); - for (TransactionPart *part : transaction->getParts()->values()) { - sendDataSize += part->getSize(); + { + Vector * tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart * part = tParts->get(i); + sendDataSize += part->getSize(); + } } int64_t lastArbitrationDataLocalSequenceNumber = (int64_t) -1; - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(transaction->getArbitrator())) { lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(transaction->getArbitrator()); } @@ -906,13 +911,17 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { // Encode the data bbEncode->putLong(lastArbitrationDataLocalSequenceNumber); bbEncode->putInt(transaction->getParts()->size()); - for (TransactionPart *part : transaction->getParts()->values()) { - part->encode(bbEncode); + { + Vector * tParts = transaction->getParts(); + uint tPartsSize = tParts->size(); + for (uint i = 0; i < tPartsSize; i++) { + TransactionPart * part = tParts->get(i); + part->encode(bbEncode); + } } - // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -994,7 +1003,7 @@ Array *Table::acceptDataFromLocal(Array *data) { // Transaction was sent to the server so keep track of it to prevent double commit if (transaction->getSequenceNumber() != -1) { - offlineTransactionsCommittedAndAtServer->add(transaction->getId()); + offlineTransactionsCommittedAndAtServer->add(new Pair(transaction->getId())); } } @@ -1262,7 +1271,7 @@ void Table::doRejectedMessages(Slot *s) { * there is already a sufficient entry in the queue (e->g->, * equalsto value of true and same sequence number)-> */ - int64_t old_seqn = rejectedSlotVector->firstElement(); + int64_t old_seqn = rejectedSlotVector->get(0); if (rejectedSlotVector->size() > Table_REJECTED_THRESHOLD) { int64_t new_seqn = rejectedSlotVector->lastElement(); RejectedMessage *rm = new RejectedMessage(s, s->getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); @@ -1645,7 +1654,7 @@ void Table::arbitrateFromServer() { // Guard evaluated as true // Update the local changes so we can make the commit - SetIterator *kvit = getKeyIterator(transaction->getKeyValueUpdateSet()); + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); speculativeTableTmp->put(kv->getKey(), kv); @@ -1743,7 +1752,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction *transaction) { localArbitrationSequenceNumber++; // Update the local changes so we can make the commit - SetIterator *kvit = getKeyIterator(transaction->getKeyValueUpdateSet()); + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); newCommit->addKV(kv); @@ -2037,12 +2046,14 @@ bool Table::updateCommittedTable() { // Get what commits should be edited, these are the commits that // have live values for their keys Hashset *commitsToEdit = new Hashset(); - SetIterator *kvit = getKeyIterator(commit->getKeyValueUpdateSet()); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + commitsToEdit->add(liveCommitsByKeyTable->get(kv->getKey())); + } + delete kvit; } - delete kvit; commitsToEdit->remove(NULL); // remove NULL since it could be in this set // Update each previous commit that needs to be updated @@ -2052,12 +2063,14 @@ bool Table::updateCommittedTable() { if (previousCommit->isLive()) { // Update which keys in the old commits are still live - SetIterator *kvit = getKeyIterator(commit->getKeyValueUpdateSet()); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - previousCommit->invalidateKey(kv->getKey()); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + previousCommit->invalidateKey(kv->getKey()); + } + delete kvit; } - delete kvit; // if the commit is now dead then remove it if (!previousCommit->isLive()) { @@ -2079,13 +2092,15 @@ bool Table::updateCommittedTable() { didProcessANewCommit = true; // Update the committed table of keys and which commit is using which key - SetIterator *kvit = getKeyIterator(commit->getKeyValueUpdateSet()); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - committedKeyValueTable->put(kv->getKey(), kv); - liveCommitsByKeyTable->put(kv->getKey(), commit); + { + SetIterator *kvit = commit->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + committedKeyValueTable->put(kv->getKey(), kv); + liveCommitsByKeyTable->put(kv->getKey(), commit); + } + delete kvit; } - delete kvit; } } @@ -2158,12 +2173,14 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, NULL)) { // Guard evaluated to true so update the speculative table - SetIterator *kvit = getKeyIterator(commit->getKeyValueUpdateSet()); - while (kvit->hasNext()) { - KeyValue *kv = kvit->next(); - speculatedKeyValueTable->put(kv->getKey(), kv); + { + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); + while (kvit->hasNext()) { + KeyValue *kv = kvit->next(); + speculatedKeyValueTable->put(kv->getKey(), kv); + } + delete kvit; } - delete kvit; } } @@ -2209,7 +2226,7 @@ void Table::updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOr if (transaction->evaluateGuard(committedKeyValueTable, speculatedKeyValueTable, pendingTransactionSpeculatedKeyValueTable)) { // Guard evaluated to true so update the speculative table - SetIterator *kvit = getKeyIterator(commit->getKeyValueUpdateSet()); + SetIterator *kvit = transaction->getKeyValueUpdateSet()->iterator(); while (kvit->hasNext()) { KeyValue *kv = kvit->next(); pendingTransactionSpeculatedKeyValueTable->put(kv->getKey(), kv); @@ -2433,7 +2450,7 @@ void Table::processEntry(Abort *entry) { if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); - liveAbortTable->remove(entry->getAbortId()); + liveAbortTable->remove(&entry->getAbortId()); if (entry->getTransactionArbitrator() == localMachineId) { liveAbortsGeneratedByLocal->remove(entry->getArbitratorLocalSequenceNumber()); @@ -2442,7 +2459,7 @@ void Table::processEntry(Abort *entry) { } // Update the last arbitration data that we have seen so far - if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()) != NULL) { + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->contains(entry->getTransactionArbitrator())) { int64_t lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator->get(entry->getTransactionArbitrator()); if (entry->getSequenceNumber() > lastArbitrationSequenceNumber) { // Is larger @@ -2537,7 +2554,7 @@ void Table::processEntry(CommitPart *entry) { * our own last message or that other clients have not had a rollback * on the last message-> */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset *machineSet) { // We have seen this machine ID machineSet->remove(machineId); @@ -2547,7 +2564,9 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven if (watchset != NULL) { // Go through each rejected message that this machine Id has not // seen yet - for (Iterator *rmit = watchset->iterator(); rmit->hasNext(); ) { + + SetIterator *rmit = watchset->iterator(); + while(rmit->hasNext()) { RejectedMessage *rm = rmit->next(); // If this machine Id has seen this rejected message->->-> if (rm->getSequenceNumber() <= seqNum) { @@ -2557,6 +2576,7 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven rm->removeWatcher(machineId); } } + delete rmit; } // Set dead the abort @@ -2572,29 +2592,33 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven } if (machineId == localMachineId) { // Our own messages are immediately dead-> - if (liveness instanceof LastMessage) { + char livenessType = liveness->getType(); + if (livenessType==TypeLastMessage) { ((LastMessage *)liveness)->setDead(); - } else if (liveness instanceof Slot) { + } else if (livenessType == TypeSlot) { ((Slot *)liveness)->setDead(); } else { throw new Error("Unrecognized type"); } } // Get the old last message for this device - Pair lastMessageEntry = lastMessageTable->put(machineId, Pair(seqNum, liveness)); + Pair * lastMessageEntry = lastMessageTable->put(machineId, new Pair(seqNum, liveness)); if (lastMessageEntry == NULL) { // If no last message then there is nothing else to process return; } - int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); - Liveness *lastEntry = lastMessageEntry.getSecond(); - + int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); + Liveness *lastEntry = lastMessageEntry->getSecond(); + delete lastMessageEntry; + // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { - if (lastEntry instanceof LastMessage) { + char lastEntryType = lastEntry->getType(); + + if (lastEntryType == TypeLastMessage) { ((LastMessage *)lastEntry)->setDead(); - } else if (lastEntry instanceof Slot) { + } else if (lastEntryType == TypeSlot) { ((Slot *)lastEntry)->setDead(); } else { throw new Error("Unrecognized type"); diff --git a/version2/src/C/hashset.h b/version2/src/C/hashset.h index 55ae1fb..bfbb6b8 100644 --- a/version2/src/C/hashset.h +++ b/version2/src/C/hashset.h @@ -14,10 +14,10 @@ template class Hashset; -template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > +template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class SetIterator { public: - SetIterator(Hashlistnode<_Key, _Key> *_curr, Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *_table) : + SetIterator(Hashlistnode<_Key, _Val> *_curr, Hashtable <_Key, _Val, _KeyInt, _Shift, hash_function, equals> *_table) : curr(_curr), table(_table) { @@ -64,9 +64,9 @@ public: } private: - Hashlistnode<_Key,_Key> *curr; - Hashlistnode<_Key, _Key> *last; - Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table; + Hashlistnode<_Key,_Val> *curr; + Hashlistnode<_Key, _Val> *last; + Hashtable <_Key, _Val, _KeyInt, _Shift, hash_function, equals> *table; }; template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > @@ -84,7 +84,7 @@ public: Hashset<_Key, _KeyInt, _Shift, hash_function, equals> *copy() { Hashset<_Key, _KeyInt, _Shift, hash_function, equals> *copy = new Hashset<_Key, _KeyInt, _Shift, hash_function, equals>(table->getCapacity(), table->getLoadFactor()); - SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *it = iterator(); + SetIterator<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *it = iterator(); while (it->hasNext()) copy->add(it->next()); delete it; @@ -99,7 +99,7 @@ public: * is already present. */ void addAll(Hashset<_Key, _KeyInt, _Shift, hash_function, equals> *table) { - SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *it = iterator(); + SetIterator<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *it = iterator(); while (it->hasNext()) add(it->next()); delete it; @@ -146,8 +146,8 @@ public: return size() == 0; } - SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *iterator() { - return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table); +SetIterator<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *iterator() { + return new SetIterator<_Key, _Key, _KeyInt, _Shift, hash_function, equals>(table->list, table); } /** Override: new operator */ @@ -174,7 +174,7 @@ private: }; template -SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *getKeyIterator(Hashtable<_Key,_Val,_KeyInt,_Shift,hash_function,equals> *table) { - return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table); + SetIterator<_Key, _Val ,_KeyInt, _Shift, hash_function, equals> *getKeyIterator(Hashtable<_Key,_Val,_KeyInt,_Shift,hash_function,equals> *table) { + return new SetIterator<_Key, _Val, _KeyInt, _Shift, hash_function, equals>(table->list, table); } #endif diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index fcdbe0c..ccd8649 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -30,6 +30,18 @@ public: fldsize--; } + void remove(type t) { + for (uint i=0; i