From: bdemsky Date: Sun, 21 Jan 2018 08:04:30 +0000 (-0800) Subject: edits X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=f0e5bd79679ee0241e757953260e481dca736068;p=iotcloud.git edits --- diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index 6019732..fb09a94 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -1,14 +1,15 @@ #include "Commit.h" #include "CommitPart.h" #include "ByteBuffer.h" -#include "KeyValue.h" +#include "IoTString.h" Commit::Commit() : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), fldisComplete(false), hasLastPart(false), - keyValueUpdateSet(new Hashset()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(-1), machineId(-1), @@ -17,11 +18,12 @@ Commit::Commit() : } Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : - parts(new Hashtable()), + parts(new Vector()), + partCount(0), missingParts(NULL), fldisComplete(true), hasLastPart(false), - keyValueUpdateSet(new Hashset()), + keyValueUpdateSet(new Hashset()), isDead(false), sequenceNumber(_sequenceNumber), machineId(_machineId), @@ -36,11 +38,13 @@ void Commit::addPartDecode(CommitPart *newPart) { return; } - CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { + CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart); + if(previouslySeenPart == NULL) + partCount++; + + if (previouslySeenPart != NULL) { // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart->setDead(); + previouslySeenPart->setDead(); } else if (newPart->isLastPart()) { missingParts = new Hashset(); hasLastPart = true; @@ -82,7 +86,7 @@ int64_t Commit::getTransactionSequenceNumber() { return transactionSequenceNumber; } -Hashtable *Commit::getParts() { +Vector *Commit::getParts() { return parts; } @@ -99,21 +103,22 @@ void Commit::invalidateKey(IoTString *key) { } } -Hashset *Commit::getKeyValueUpdateSet() { +Hashset *Commit::getKeyValueUpdateSet() { return keyValueUpdateSet; } int32_t Commit::getNumberOfParts() { - return parts->size(); + return partCount; } void Commit::setDead() { if (!isDead) { isDead = true; // Make all the parts of this transaction dead - for (int32_t partNumber : parts->keySet()) { + for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) { CommitPart *part = parts->get(partNumber); - part->setDead(); + if (parts!=NULL) + part->setDead(); } } } @@ -124,6 +129,7 @@ CommitPart *Commit::getPart(int index) { void Commit::createCommitParts() { parts->clear(); + partCount = 0; // Convert to chars Array *charData = convertDataToBytes(); @@ -144,8 +150,8 @@ void Commit::createCommitParts() { Array *partData = new Array(copySize); System_arraycopy(charData, currentPosition, partData, 0, copySize); - CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); - parts->put(part->getPartNumber(), part); + CommitPart* part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); + parts->setExpand(part->getPartNumber(), part); // Update position, count and remaining currentPosition += copySize; @@ -157,19 +163,22 @@ void Commit::createCommitParts() { void Commit::decodeCommitData() { // Calculate the size of the data section int dataSize = 0; - for (int i = 0; i < parts->keySet()->size(); i++) { + for (int i = 0; i < parts->size(); i++) { CommitPart *tp = parts->get(i); - dataSize += tp->getDataSize(); + if (tp!=NULL) + dataSize += tp->getDataSize(); } Array *combinedData = new Array(dataSize); int currentPosition = 0; // Stitch all the data sections together - for (int i = 0; i < parts->keySet()->size(); i++) { + for (int i = 0; i < parts->size(); i++) { CommitPart *tp = parts->get(i); - System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); - currentPosition += tp->getDataSize(); + if (tp!=NULL) { + System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize()); + currentPosition += tp->getDataSize(); + } } // Decoder Object @@ -186,14 +195,16 @@ void Commit::decodeCommitData() { } } -Array *convertDataToBytes() { - +Array *Commit::convertDataToBytes() { // Calculate the size of the data int sizeOfData = sizeof(int32_t); // Number of Update KV's - for (KeyValue *kv : keyValueUpdateSet) { + SetIterator * kvit = keyValueUpdateSet->iterator(); + while(kvit->hasNext()) { + KeyValue * kv = kvit->next(); sizeOfData += kv->getSize(); } - + delete kvit; + // Data handlers and storage Array *dataArray = new Array(sizeOfData); ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray); @@ -202,18 +213,25 @@ Array *convertDataToBytes() { bbEncode->putInt(keyValueUpdateSet->size()); // Encode all the updates - for (KeyValue *kv : keyValueUpdateSet) { + kvit = keyValueUpdateSet->iterator(); + while(kvit->hasNext()) { + KeyValue * kv = kvit->next(); kv->encode(bbEncode); } - + delete kvit; + return bbEncode->array(); } -void Commit::setKVsMap(Hashtable *newKVs) { +void Commit::setKVsMap(Hashset *newKVs) { keyValueUpdateSet->clear(); - keyValueUpdateSet->addAll(newKVs->values()); + keyValueUpdateSet->addAll(newKVs); liveKeys->clear(); - liveKeys->addAll(newKVs->keySet()); + SetIterator *kvit = newKVs->iterator(); + while(kvit->hasNext()) { + liveKeys->add(kvit->next()->getKey()); + } + delete kvit; } Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { @@ -222,14 +240,20 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { } else if (newer == NULL) { return older; } - Hashtable *kvSet = new Hashtable(); - for (KeyValue *kv : older->getKeyValueUpdateSet()) { - kvSet->put(kv->getKey(), kv); + Hashset *kvSet = new Hashset(); + SetIterator *kvit = older->getKeyValueUpdateSet()->iterator(); + while(kvit->hasNext()) { + KeyValue* kv=kvit->next(); + kvSet->add(kv); } - for (KeyValue *kv : newer->getKeyValueUpdateSet()) { - kvSet->put(kv->getKey(), kv); + delete kvit; + kvit = newer->getKeyValueUpdateSet()->iterator(); + while(kvit->hasNext()) { + KeyValue* kv=kvit->next(); + kvSet->add(kv); } - + delete kvit; + int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber(); if (transactionSequenceNumber == -1) { transactionSequenceNumber = older->getTransactionSequenceNumber(); @@ -238,5 +262,6 @@ Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) { Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber); newCommit->setKVsMap(kvSet); + delete kvSet; return newCommit; } diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index 4de3d22..8084eb7 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -1,21 +1,23 @@ #ifndef COMMIT_H #define COMMIT_H #include "common.h" +#include "KeyValue.h" class Commit { private: - Hashtable *parts; + Vector *parts; + uint32_t partCount; Hashset *missingParts; bool fldisComplete; bool hasLastPart; - Hashset *keyValueUpdateSet; + Hashset *keyValueUpdateSet; bool isDead; int64_t sequenceNumber; int64_t machineId; int64_t transactionSequenceNumber; Hashset *liveKeys; Array *convertDataToBytes(); - void setKVsMap(Hashtable *newKVs); + void setKVsMap(Hashset *newKVs); public: Commit(); @@ -23,10 +25,10 @@ public: void addPartDecode(CommitPart *newPart); int64_t getSequenceNumber(); int64_t getTransactionSequenceNumber(); - Hashtable *getParts(); + Vector *getParts(); void addKV(KeyValue *kv); void invalidateKey(IoTString *key); - Hashset *getKeyValueUpdateSet(); + Hashset *getKeyValueUpdateSet(); int32_t getNumberOfParts(); int64_t getMachineId() { return machineId; } bool isComplete() { return fldisComplete; } diff --git a/version2/src/C/IoTString.h b/version2/src/C/IoTString.h index 4500a66..f72dfa8 100644 --- a/version2/src/C/IoTString.h +++ b/version2/src/C/IoTString.h @@ -9,26 +9,41 @@ * @version 1.0 */ +inline int hashCharArray(Array * array) { + uint len = array->length(); + int hash=0; + for(uint i=0; i get(i); + } + return hash; +} + class IoTString { private: Array *array; IoTString() {} - + int hashvalue; /** * Builds an IoTString object around the char array. This * constructor makes a copy, so the caller is free to modify the char array. */ public: - IoTString(Array *_array) : array(new Array(_array)) {} + IoTString(Array *_array) : + array(new Array(_array)), + hashvalue(hashCharArray(array)) { + } IoTString(const char *_array) { int32_t len = strlen(_array); array = new Array(len); strcpy(array->internalArray(), _array); + hashvalue=hashCharArray(array); } - IoTString(IoTString *string) : array(new Array(string->array)) { + IoTString(IoTString *string) : + array(new Array(string->array)), + hashvalue(hashCharArray(array)) { } ~IoTString() { @@ -62,6 +77,7 @@ public: return result == 0; } + int hashValue() { return hashvalue;} int length() { return array->length(); } friend IoTString *IoTString_shallow(Array *_array); }; @@ -71,4 +87,12 @@ IoTString *IoTString_shallow(Array *_array) { str->array = _array; return str; } + +inline int hashString(IoTString *a) { + return a->hashValue(); +} + +inline bool StringEquals(IoTString *a, IoTString *b) { + return a->equals(b); +} #endif diff --git a/version2/src/C/KeyValue.h b/version2/src/C/KeyValue.h index 06f514b..2493bb2 100644 --- a/version2/src/C/KeyValue.h +++ b/version2/src/C/KeyValue.h @@ -8,7 +8,7 @@ * @version 1.0 */ -class KeyValue {/*extends Entry */ +class KeyValue { /*extends Entry */ private: IoTString *key; IoTString *value; @@ -28,4 +28,6 @@ public: }; KeyValue *KeyValue_decode(ByteBuffer *bb); -#endif +unsigned int hashKeyValue(KeyValue *kv); +bool equalsKeyValue(KeyValue *a, KeyValue *b); +#endif diff --git a/version2/src/C/PendingTransaction.h b/version2/src/C/PendingTransaction.h index c036c86..3e72fa3 100644 --- a/version2/src/C/PendingTransaction.h +++ b/version2/src/C/PendingTransaction.h @@ -14,6 +14,7 @@ private: public: PendingTransaction(int64_t _machineId); + ~PendingTransaction(); /** * Add a new key value to the updates * diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 4c0331e..dcdc99b 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -11,6 +11,9 @@ #include "Transaction.h" #include "LastMessage.h" #include "Random.h" +#include "ByteBuffer.h" +#include "Abort.h" +#include "CommitPart.h" Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) : @@ -153,12 +156,12 @@ void Table::init() { lastMessageTable = new Hashtable >(); rejectedMessageWatchVectorTable = new Hashtable * >(); arbitratorTable = new Hashtable(); - liveAbortTable = new Hashtable, Abort *>(); - newTransactionParts = new Hashtable, TransactionPart *> *>(); - newCommitParts = new Hashtable, CommitPart *> *>(); + liveAbortTable = new Hashtable, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>(); + newTransactionParts = new Hashtable, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); + newCommitParts = new Hashtable, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>(); lastArbitratedTransactionNumberByArbitratorTable = new Hashtable(); liveTransactionBySequenceNumberTable = new Hashtable(); - liveTransactionByTransactionIdTable = new Hashtable, Transaction *>(); + liveTransactionByTransactionIdTable = new Hashtable, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>(); liveCommitsTable = new Hashtable >(); liveCommitsByKeyTable = new Hashtable(); lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable(); @@ -168,7 +171,7 @@ void Table::init() { transactionPartsSent = new Hashtable *>(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new Hashset >(); + offlineTransactionsCommittedAndAtServer = new Hashset, uintptr_t, 0, pairHashFunction, pairEquals>(); localCommunicationTable = new Hashtable >(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); @@ -396,9 +399,12 @@ TransactionStatus *Table::commitTransaction() { } catch (ServerException *e) { Hashset *arbitratorTriedAndFailed = new Hashset(); - for (Iterator *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) { - Transaction *transaction = iter->next(); - + uint size = pendingTransactionQueue->size(); + uint oldindex = 0; + for(int iter = 0; iter < size; iter++) { + Transaction *transaction = pendingTransactionQueue->get(iter); + pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter)); + if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) { // Already contacted this client so ignore all attempts to contact this client // to preserve ordering for arbitrator @@ -407,20 +413,21 @@ TransactionStatus *Table::commitTransaction() { Pair sendReturn = sendTransactionToLocal(transaction); - if (sendReturn->getFirst()) { + if (sendReturn.getFirst()) { // Failed to contact over local arbitratorTriedAndFailed->add(transaction->getArbitrator()); } else { // Successful contact or should not contact - if (sendReturn->getSecond()) { + if (sendReturn.getSecond()) { // did arbitrate - iter->remove(); + oldindex--; } } } } - + pendingTransactionQueue->setSize(oldindex); + updateLiveStateFromLocal(); return transactionStatus; @@ -447,7 +454,7 @@ bool Table::sendToServer(NewKey *newKey) { fromRetry = true; ThreeTuple *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); - if (sendSlotsReturn->getFirst()) { + if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) { newKey = NULL; @@ -472,7 +479,7 @@ bool Table::sendToServer(NewKey *newKey) { } } } else { - newSlots = sendSlotsReturn->getThird(); + newSlots = sendSlotsReturn.getThird(); bool isInserted = false; for (uint si = 0; si < newSlots->length(); si++) { Slot *s = newSlots->get(si); @@ -542,9 +549,9 @@ bool Table::sendToServer(NewKey *newKey) { } } - if (sendSlotsReturn->getThird()->length() != 0) { + if (sendSlotsReturn.getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } // continue; } else { @@ -650,9 +657,9 @@ bool Table::sendToServer(NewKey *newKey) { // Try to fill the slot with data ThreeTuple fillSlotsReturn = fillSlot(slot, false, newKey); - bool needsResize = fillSlotsReturn->getFirst(); - int newSize = fillSlotsReturn->getSecond(); - bool insertedNewKey = fillSlotsReturn->getThird(); + bool needsResize = fillSlotsReturn.getFirst(); + int newSize = fillSlotsReturn.getSecond(); + bool insertedNewKey = fillSlotsReturn.getThird(); if (needsResize) { // Reset which transaction to send @@ -684,7 +691,7 @@ bool Table::sendToServer(NewKey *newKey) { ThreeTuple *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); - if (sendSlotsReturn->getFirst()) { + if (sendSlotsReturn.getFirst()) { // Did insert into the block chain @@ -740,9 +747,9 @@ bool Table::sendToServer(NewKey *newKey) { pendingSendArbitrationEntriesToDelete->clear(); transactionPartsSent->clear(); - if (sendSlotsReturn->getThird()->length() != 0) { + if (sendSlotsReturn.getThird()->length() != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn->getThird(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } } @@ -801,7 +808,7 @@ bool Table::updateFromLocal(int64_t machineId) { bbEncode->putInt(0); // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -816,10 +823,10 @@ bool Table::updateFromLocal(int64_t machineId) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort*)Abort_decode(NULL, bbDecode); processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -863,7 +870,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { // Send by local - Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond()); + Array *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); localSequenceNumber++; if (returnData == NULL) { @@ -881,7 +888,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { for (int i = 0; i < numberOfEntries; i++) { char type = bbDecode->get(); if (type == TypeAbort) { - Abort *abort = (Abort)Abort_decode(NULL, bbDecode); + Abort *abort = (Abort*)Abort_decode(NULL, bbDecode); if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) { foundAbort = true; @@ -889,7 +896,7 @@ Pair Table::sendTransactionToLocal(Transaction *transaction) { processEntry(abort); } else if (type == TypeCommitPart) { - CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode); + CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode); processEntry(commitPart); } } @@ -938,8 +945,8 @@ Array *Table::acceptDataFromLocal(Array *data) { // Arbitrate on transaction and pull relevant return data Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); - couldArbitrate = localArbitrateReturn->getFirst(); - didCommit = localArbitrateReturn->getSecond(); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); updateLiveStateFromLocal(); @@ -981,7 +988,7 @@ Array *Table::acceptDataFromLocal(Array *data) { unseenArbitrations->addAll(commit->getParts()->values()); - for (CommitPart commitPart : commit->getParts()->values()) { + for (CommitPart *commitPart : commit->getParts()->values()) { returnDataSize += commitPart->getSize(); } } @@ -1013,11 +1020,12 @@ Array *Table::acceptDataFromLocal(Array *data) { } bbEncode->putInt(unseenArbitrations->size()); - for (Entry *entry : unseenArbitrations) { + uint size = unseenArbitrations->size(); + for(uint i = 0; i< size; i++) { + Entry * entry = unseenArbitrations->get(i); entry->encode(bbEncode); } - localSequenceNumber++; return returnData; } @@ -1044,14 +1052,17 @@ ThreeTuple *> Table::sendSlotsToServer(Slot *slot, int if (hadPartialSendToServer) { bool isInserted = false; - for (Slot *s : array) { + uint size = s->size(); + for(uint i=0; i < size; i++) { + Slot *s = array->get(i); if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) { isInserted = true; break; } } - for (Slot *s : array) { + for(uint i=0; i < size; i++) { + Slot *s = array->get(i); if (isInserted) { break; } @@ -1107,9 +1118,9 @@ ThreeTuple Table::fillSlot(Slot *slot, bool resize, NewKey ThreeTuple mandatoryRescueReturn = doMandatoryResuce(slot, resize); // Extract working variables - bool needsResize = mandatoryRescueReturn->getFirst(); - bool seenLiveSlot = mandatoryRescueReturn->getSecond(); - int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird(); + bool needsResize = mandatoryRescueReturn.getFirst(); + bool seenLiveSlot = mandatoryRescueReturn.getSecond(); + int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird(); if (needsResize && !resize) { // We need to resize but we are not resizing so return false @@ -1266,14 +1277,11 @@ ThreeTuple Table::doMandatoryResuce(Slot *slot, bool resize // Iterate over all the live entries and try to rescue them for (Entry *liveEntry : liveEntries) { if (slot->hasSpace(liveEntry)) { - // Enough space to rescue the entry slot->addEntry(liveEntry); } else if (currentSequenceNumber == firstIfFull) { //if there's no space but the entry is about to fall off the queue - System->out->println("B"); //? return ThreeTuple(true, seenLiveSlot, currentSequenceNumber); - } } } @@ -1357,7 +1365,7 @@ void Table::validateAndUpdate(Array *newSlots, bool acceptUpdatesToLocal // must have a last message message-> If not then the server is // hiding slots if (!machineSet->isEmpty()) { - throw new Error("Missing record for machines: " + machineSet); + throw new Error("Missing record for machines: "); } } @@ -1446,7 +1454,7 @@ void Table::updateExpectedSize() { */ void Table::checkNumSlots(int numberOfSlots) { if (numberOfSlots != expectedsize) { - throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots); + throw new Error("Server Error: Server did not send all slots-> Expected: "); } } @@ -2210,7 +2218,7 @@ void Table::processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLo processEntry((TableStatus *)entry, slot->getSequenceNumber()); break; default: - throw new Error("Unrecognized type: " + entry->getType()); + throw new Error("Unrecognized type: "); } } } @@ -2282,7 +2290,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { // a rejected slot int64_t slotMachineId = slot->getMachineID(); if (isequal != (slotMachineId == machineId)) { - throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum); + throw new Error("Server Error: Trying to insert rejected message for slot "); } } } @@ -2301,7 +2309,7 @@ void Table::processEntry(RejectedMessage *entry, SlotIndexer *indexer) { } Pair lastMessageValue = lastMessageEntry->getValue(); - int64_t entrySequenceNumber = lastMessageValue->getFirst(); + int64_t entrySequenceNumber = lastMessageValue.getFirst(); if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this @@ -2345,7 +2353,7 @@ void Table::processEntry(Abort *entry) { liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry); } - if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) { + if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) { // The machine already saw this so it is dead entry->setDead(); liveAbortTable->remove(entry->getAbortId()); @@ -2400,7 +2408,7 @@ void Table::processEntry(TransactionPart *entry) { } // This part is still alive - Hashtable, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId()); + Hashtable, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId()); if (transactionPart == NULL) { // Dont have a table for this machine Id yet so make one @@ -2502,8 +2510,8 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven return; } - int64_t lastMessageSeqNum = lastMessageEntry->getFirst(); - Liveness *lastEntry = lastMessageEntry->getSecond(); + int64_t lastMessageSeqNum = lastMessageEntry.getFirst(); + Liveness *lastEntry = lastMessageEntry.getSecond(); // If it is not our machine Id since we already set ours to dead if (machineId != localMachineId) { @@ -2520,12 +2528,12 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liven if (hadPartialSendToServer) { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: "); } } else { // We were not making any updates and we had a machine mismatch if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); + throw new Error("Server Error: Mismatch on local machine sequence number, needed: "); } } } else { diff --git a/version2/src/C/Transaction.cc b/version2/src/C/Transaction.cc index 2398544..51fcbf6 100644 --- a/version2/src/C/Transaction.cc +++ b/version2/src/C/Transaction.cc @@ -210,19 +210,15 @@ Pair Transaction::getId() { } void Transaction::setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) { - TransactionPart *part = parts->get(partNumber); - if (part != NULL) - part->setDead(); + if (!isDead) { + // Set dead + isDead = true; + // Make all the parts of this transaction dead + for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) { + TransactionPart *part = parts->get(partNumber); + if (part != NULL) + part->setDead(); + } } } diff --git a/version2/src/C/hashset.h b/version2/src/C/hashset.h index d37ac3b..a9af155 100644 --- a/version2/src/C/hashset.h +++ b/version2/src/C/hashset.h @@ -11,22 +11,15 @@ #define HASHSET_H #include "hashtable.h" -template -struct Linknode { - _Key key; - Linknode<_Key> *prev; - Linknode<_Key> *next; -}; - template class Hashset; template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class SetIterator { public: - SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) : - curr(_curr), - set(_set) + SetIterator(Hashlistnode<_Key, _Key> *_curr, Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *_table) : +curr(_curr), +table(_table) { } @@ -67,33 +60,25 @@ public: void remove() { _Key k = last->key; - set->remove(k); + table->remove(k); } private: - Linknode<_Key> *curr; - Linknode<_Key> *last; - Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set; +Hashlistnode<_Key,_Key> *curr; +Hashlistnode<_Key, _Key> *last; +Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table; }; template, bool (*equals)(_Key, _Key) = defaultEquals<_Key> > class Hashset { public: Hashset(unsigned int initialcapacity = 16, double factor = 0.5) : - table(new Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor)), - list(NULL), - tail(NULL) +table(new Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor)) { } /** @brief Hashset destructor */ ~Hashset() { - Linknode<_Key> *tmp = list; - while (tmp != NULL) { - Linknode<_Key> *tmpnext = tmp->next; - ourfree(tmp); - tmp = tmpnext; - } delete table; } @@ -107,27 +92,9 @@ public: } void clear() { - Linknode<_Key> *tmp = list; - while (tmp != NULL) { - Linknode<_Key> *tmpnext = tmp->next; - ourfree(tmp); - tmp = tmpnext; - } - list = tail = NULL; table->clear(); } - void resetAndDelete() { - Linknode<_Key> *tmp = list; - while (tmp != NULL) { - Linknode<_Key> *tmpnext = tmp->next; - ourfree(tmp); - tmp = tmpnext; - } - list = tail = NULL; - table->resetAndDeleteKeys(); - } - /** @brief Adds a new key to the hashset. Returns false if the key * is already present. */ @@ -142,77 +109,32 @@ public: * is already present. */ bool add(_Key key) { - Linknode<_Key> *val = table->get(key); - if (val == NULL) { - Linknode<_Key> *newnode = (Linknode<_Key> *)ourmalloc(sizeof(struct Linknode<_Key>)); - newnode->prev = tail; - newnode->next = NULL; - newnode->key = key; - if (tail != NULL) - tail->next = newnode; - else - list = newnode; - tail = newnode; - table->put(key, newnode); + if (!table->contains(key)) { + table->put(key, key); return true; } else return false; } - /** @brief Return random key from set. */ - - _Key getRandomElement() { - if (size() == 0) - return NULL; - else if (size() < 6) { - uint count = random() % size(); - Linknode<_Key> *ptr = list; - while (count > 0) { - ptr = ptr->next; - count--; - } - return ptr->key; - } else - return table->getRandomValue()->key; - } - /** @brief Gets the original key corresponding to this one from the * hashset. Returns NULL if not present. */ _Key get(_Key key) { - Linknode<_Key> *val = table->get(key); - if (val != NULL) - return val->key; - else - return NULL; + return table->get(key); } _Key getFirstKey() { - return list->key; + return table->list->key; } bool contains(_Key key) { - return table->get(key) != NULL; + return table->contains(key); } bool remove(_Key key) { - Linknode<_Key> *oldlinknode; - oldlinknode = table->get(key); - if (oldlinknode == NULL) { + if (!table->contains(key)) return false; - } table->remove(key); - - //remove link node from the list - if (oldlinknode->prev == NULL) - list = oldlinknode->next; - else - oldlinknode->prev->next = oldlinknode->next; - if (oldlinknode->next != NULL) - oldlinknode->next->prev = oldlinknode->prev; - else - tail = oldlinknode->prev; - ourfree(oldlinknode); return true; } @@ -225,7 +147,7 @@ public: } SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *iterator() { - return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(list, this); + return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table); } /** Override: new operator */ @@ -248,8 +170,11 @@ public: ourfree(p); } private: - Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals> *table; - Linknode<_Key> *list; - Linknode<_Key> *tail; + Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table; }; + +template + SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> * getKeyIterator(Hashtable<_Key,_Key,_KeyInt,_Shift,hash_function,equals> *table) { + return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table); +} #endif diff --git a/version2/src/C/hashtable.h b/version2/src/C/hashtable.h index b610154..3ded5b5 100644 --- a/version2/src/C/hashtable.h +++ b/version2/src/C/hashtable.h @@ -31,6 +31,8 @@ struct Hashlistnode { _Key key; _Val val; uint hashcode; + struct Hashlistnode<_Key, _Val> * next; + struct Hashlistnode<_Key, _Val> * prev; }; template @@ -77,6 +79,7 @@ public: threshold = (unsigned int)(initialcapacity * loadfactor); Size = 0; // Initial number of elements in the hash + tail = list = NULL; } /** @brief Hash table destructor */ @@ -114,6 +117,7 @@ public: zero = NULL; } Size = 0; + tail = list = NULL; } /** Doesn't work with zero value */ @@ -143,6 +147,7 @@ public: zero = NULL; } Size = 0; + tail = list = NULL; } void resetAndDeleteVals() { @@ -163,6 +168,7 @@ public: zero = NULL; } Size = 0; + tail = list = NULL; } void resetAndFreeVals() { @@ -183,6 +189,7 @@ public: zero = NULL; } Size = 0; + tail = list = NULL; } /** @@ -196,6 +203,12 @@ public: _Val oldval; if (!zero) { zero = (struct Hashlistnode<_Key, _Val> *)ourmalloc(sizeof(struct Hashlistnode<_Key, _Val>)); + zero->next = list; + if (list != NULL) + list->prev = zero; + else + tail = zero; + list = zero; Size++; oldval = (_Val) 0; } else @@ -231,6 +244,11 @@ public: search->key = key; search->val = val; search->hashcode = hashcode; + search->next = list; + if (list == NULL) + tail = search; + else + list->prev = search; Size++; return (_Val) 0; } @@ -279,12 +297,21 @@ public: _Val remove(_Key key) { struct Hashlistnode<_Key, _Val> *search; - /* Hashtable cannot handle 0 as a key */ if (!key) { if (!zero) { return (_Val)0; } else { _Val v = zero->val; + if (zero -> next != NULL) + zero -> next -> prev = zero ->prev; + else + tail = zero -> prev; + + if (zero -> prev != NULL) + zero -> prev -> next = zero -> next; + else + list = zero->next; + ourfree(zero); zero = NULL; Size--; @@ -308,6 +335,17 @@ public: //empty out this bin search->val = (_Val) 1; search->key = 0; + + if (search -> next != NULL) + search -> next -> prev = search ->prev; + else + tail = search -> prev; + + if (search -> prev != NULL) + search -> prev -> next = search -> next; + else + list = search->next; + Size--; return v; } @@ -368,7 +406,7 @@ public: table = newtable; // Update the global hashtable upon resize() capacity = newsize; capacitymask = newsize - 1; - + list = tail = NULL; threshold = (unsigned int)(newsize * loadfactor); struct Hashlistnode<_Key, _Val> *bin = &oldtable[0]; @@ -388,6 +426,12 @@ public: index++; } while (search->key); + if (tail == NULL) + tail = search; + search -> next = list; + if (list != NULL) + list -> prev = search; + list = search; search->hashcode = hashcode; search->key = key; search->val = bin->val; @@ -399,7 +443,9 @@ public: unsigned int getCapacity() {return capacity;} struct Hashlistnode<_Key, _Val> *table; struct Hashlistnode<_Key, _Val> *zero; - unsigned int capacity; + struct Hashlistnode<_Key, _Val> * list; + struct Hashlistnode<_Key, _Val> * tail; + unsigned int capacity; unsigned int Size; private: unsigned int capacitymask;