#include "Commit.h"
#include "CommitPart.h"
#include "ByteBuffer.h"
-#include "KeyValue.h"
+#include "IoTString.h"
Commit::Commit() :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(false),
hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(-1),
machineId(-1),
}
Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
- parts(new Hashtable<int32_t, CommitPart *>()),
+ parts(new Vector<CommitPart *>()),
+ partCount(0),
missingParts(NULL),
fldisComplete(true),
hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
isDead(false),
sequenceNumber(_sequenceNumber),
machineId(_machineId),
return;
}
- CommitPart *previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
-
- if (previoslySeenPart != NULL) {
+ CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
+ if(previouslySeenPart == NULL)
+ partCount++;
+
+ if (previouslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart->setDead();
+ previouslySeenPart->setDead();
} else if (newPart->isLastPart()) {
missingParts = new Hashset<int32_t>();
hasLastPart = true;
return transactionSequenceNumber;
}
-Hashtable<int32_t, CommitPart *> *Commit::getParts() {
+Vector<CommitPart *> *Commit::getParts() {
return parts;
}
}
}
-Hashset<KeyValue *> *Commit::getKeyValueUpdateSet() {
+Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
return keyValueUpdateSet;
}
int32_t Commit::getNumberOfParts() {
- return parts->size();
+ return partCount;
}
void Commit::setDead() {
if (!isDead) {
isDead = true;
// Make all the parts of this transaction dead
- for (int32_t partNumber : parts->keySet()) {
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
CommitPart *part = parts->get(partNumber);
- part->setDead();
+ if (parts!=NULL)
+ part->setDead();
}
}
}
void Commit::createCommitParts() {
parts->clear();
+ partCount = 0;
// Convert to chars
Array<char> *charData = convertDataToBytes();
Array<char> *partData = new Array<char>(copySize);
System_arraycopy(charData, currentPosition, partData, 0, copySize);
- CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
- parts->put(part->getPartNumber(), part);
+ CommitPart* part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
+ parts->setExpand(part->getPartNumber(), part);
// Update position, count and remaining
currentPosition += copySize;
void Commit::decodeCommitData() {
// Calculate the size of the data section
int dataSize = 0;
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
CommitPart *tp = parts->get(i);
- dataSize += tp->getDataSize();
+ if (tp!=NULL)
+ dataSize += tp->getDataSize();
}
Array<char> *combinedData = new Array<char>(dataSize);
int currentPosition = 0;
// Stitch all the data sections together
- for (int i = 0; i < parts->keySet()->size(); i++) {
+ for (int i = 0; i < parts->size(); i++) {
CommitPart *tp = parts->get(i);
- System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
- currentPosition += tp->getDataSize();
+ if (tp!=NULL) {
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
+ }
}
// Decoder Object
}
}
-Array<char> *convertDataToBytes() {
-
+Array<char> *Commit::convertDataToBytes() {
// Calculate the size of the data
int sizeOfData = sizeof(int32_t); // Number of Update KV's
- for (KeyValue *kv : keyValueUpdateSet) {
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> * kvit = keyValueUpdateSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue * kv = kvit->next();
sizeOfData += kv->getSize();
}
-
+ delete kvit;
+
// Data handlers and storage
Array<char> *dataArray = new Array<char>(sizeOfData);
ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
bbEncode->putInt(keyValueUpdateSet->size());
// Encode all the updates
- for (KeyValue *kv : keyValueUpdateSet) {
+ kvit = keyValueUpdateSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue * kv = kvit->next();
kv->encode(bbEncode);
}
-
+ delete kvit;
+
return bbEncode->array();
}
-void Commit::setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs) {
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
keyValueUpdateSet->clear();
- keyValueUpdateSet->addAll(newKVs->values());
+ keyValueUpdateSet->addAll(newKVs);
liveKeys->clear();
- liveKeys->addAll(newKVs->keySet());
+ SetIterator<KeyValue*, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+ while(kvit->hasNext()) {
+ liveKeys->add(kvit->next()->getKey());
+ }
+ delete kvit;
}
Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
} else if (newer == NULL) {
return older;
}
- Hashtable<IoTString *, KeyValue *> *kvSet = new Hashtable<IoTString *, KeyValue *>();
- for (KeyValue *kv : older->getKeyValueUpdateSet()) {
- kvSet->put(kv->getKey(), kv);
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
+ SetIterator<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+ while(kvit->hasNext()) {
+ KeyValue* kv=kvit->next();
+ kvSet->add(kv);
}
- for (KeyValue *kv : newer->getKeyValueUpdateSet()) {
- kvSet->put(kv->getKey(), kv);
+ delete kvit;
+ kvit = newer->getKeyValueUpdateSet()->iterator();
+ while(kvit->hasNext()) {
+ KeyValue* kv=kvit->next();
+ kvSet->add(kv);
}
-
+ delete kvit;
+
int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
if (transactionSequenceNumber == -1) {
transactionSequenceNumber = older->getTransactionSequenceNumber();
Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
newCommit->setKVsMap(kvSet);
+ delete kvSet;
return newCommit;
}
#ifndef COMMIT_H
#define COMMIT_H
#include "common.h"
+#include "KeyValue.h"
class Commit {
private:
- Hashtable<int32_t, CommitPart *> *parts;
+ Vector<CommitPart *> *parts;
+ uint32_t partCount;
Hashset<int32_t> *missingParts;
bool fldisComplete;
bool hasLastPart;
- Hashset<KeyValue *> *keyValueUpdateSet;
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *keyValueUpdateSet;
bool isDead;
int64_t sequenceNumber;
int64_t machineId;
int64_t transactionSequenceNumber;
Hashset<IoTString *> *liveKeys;
Array<char> *convertDataToBytes();
- void setKVsMap(Hashtable<IoTString *, KeyValue *> *newKVs);
+ void setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs);
public:
Commit();
void addPartDecode(CommitPart *newPart);
int64_t getSequenceNumber();
int64_t getTransactionSequenceNumber();
- Hashtable<int32_t, CommitPart *> *getParts();
+ Vector<CommitPart *> *getParts();
void addKV(KeyValue *kv);
void invalidateKey(IoTString *key);
- Hashset<KeyValue *> *getKeyValueUpdateSet();
+ Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *getKeyValueUpdateSet();
int32_t getNumberOfParts();
int64_t getMachineId() { return machineId; }
bool isComplete() { return fldisComplete; }
* @version 1.0
*/
+inline int hashCharArray(Array<char> * array) {
+ uint len = array->length();
+ int hash=0;
+ for(uint i=0; i <len; i++) {
+ hash = 31 * hash + array->get(i);
+ }
+ return hash;
+}
+
class IoTString {
private:
Array<char> *array;
IoTString() {}
-
+ int hashvalue;
/**
* Builds an IoTString object around the char array. This
* constructor makes a copy, so the caller is free to modify the char array.
*/
public:
- IoTString(Array<char> *_array) : array(new Array<char>(_array)) {}
+ IoTString(Array<char> *_array) :
+ array(new Array<char>(_array)),
+ hashvalue(hashCharArray(array)) {
+ }
IoTString(const char *_array) {
int32_t len = strlen(_array);
array = new Array<char>(len);
strcpy(array->internalArray(), _array);
+ hashvalue=hashCharArray(array);
}
- IoTString(IoTString *string) : array(new Array<char>(string->array)) {
+ IoTString(IoTString *string) :
+ array(new Array<char>(string->array)),
+ hashvalue(hashCharArray(array)) {
}
~IoTString() {
return result == 0;
}
+ int hashValue() { return hashvalue;}
int length() { return array->length(); }
friend IoTString *IoTString_shallow(Array<char> *_array);
};
str->array = _array;
return str;
}
+
+inline int hashString(IoTString *a) {
+ return a->hashValue();
+}
+
+inline bool StringEquals(IoTString *a, IoTString *b) {
+ return a->equals(b);
+}
#endif
* @version 1.0
*/
-class KeyValue {/*extends Entry */
+class KeyValue { /*extends Entry */
private:
IoTString *key;
IoTString *value;
};
KeyValue *KeyValue_decode(ByteBuffer *bb);
-#endif
+unsigned int hashKeyValue(KeyValue *kv);
+bool equalsKeyValue(KeyValue *a, KeyValue *b);
+#endif
public:
PendingTransaction(int64_t _machineId);
+ ~PendingTransaction();
/**
* Add a new key value to the updates
*
#include "Transaction.h"
#include "LastMessage.h"
#include "Random.h"
+#include "ByteBuffer.h"
+#include "Abort.h"
+#include "CommitPart.h"
Table::Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort) :
lastMessageTable = new Hashtable<int64_t, Pair<int64_t, Liveness *> >();
rejectedMessageWatchVectorTable = new Hashtable<int64_t, Hashset<RejectedMessage *> * >();
arbitratorTable = new Hashtable<IoTString *, int64_t>();
- liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *>();
- newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *>();
- newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *> *>();
+ liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort *, uintptr_t, 0, pairHashFunction, pairEquals>();
+ newTransactionParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
+ newCommitParts = new Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t>, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *>();
lastArbitratedTransactionNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
liveTransactionBySequenceNumberTable = new Hashtable<int64_t, Transaction *>();
- liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *>();
+ liveTransactionByTransactionIdTable = new Hashtable<Pair<int64_t, int64_t>, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals>();
liveCommitsTable = new Hashtable<int64_t, Hashtable<int64_t, Commit *> >();
liveCommitsByKeyTable = new Hashtable<IoTString *, Commit *>();
lastCommitSeenSequenceNumberByArbitratorTable = new Hashtable<int64_t, int64_t>();
transactionPartsSent = new Hashtable<Transaction *, Vector<int32_t> *>();
outstandingTransactionStatus = new Hashtable<int64_t, TransactionStatus *>();
liveAbortsGeneratedByLocal = new Hashtable<int64_t, Abort *>();
- offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
+ offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t>, uintptr_t, 0, pairHashFunction, pairEquals>();
localCommunicationTable = new Hashtable<int64_t, Pair<IoTString *, int32_t> >();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
pendingSendArbitrationRounds = new Vector<ArbitrationRound *>();
} catch (ServerException *e) {
Hashset<int64_t> *arbitratorTriedAndFailed = new Hashset<int64_t>();
- for (Iterator<Transaction *> *iter = pendingTransactionQueue->iterator(); iter->hasNext(); ) {
- Transaction *transaction = iter->next();
-
+ uint size = pendingTransactionQueue->size();
+ uint oldindex = 0;
+ for(int iter = 0; iter < size; iter++) {
+ Transaction *transaction = pendingTransactionQueue->get(iter);
+ pendingTransactionQueue->set(oldindex++, pendingTransactionQueue->get(iter));
+
if (arbitratorTriedAndFailed->contains(transaction->getArbitrator())) {
// Already contacted this client so ignore all attempts to contact this client
// to preserve ordering for arbitrator
Pair<bool, bool> sendReturn = sendTransactionToLocal(transaction);
- if (sendReturn->getFirst()) {
+ if (sendReturn.getFirst()) {
// Failed to contact over local
arbitratorTriedAndFailed->add(transaction->getArbitrator());
} else {
// Successful contact or should not contact
- if (sendReturn->getSecond()) {
+ if (sendReturn.getSecond()) {
// did arbitrate
- iter->remove();
+ oldindex--;
}
}
}
}
-
+ pendingTransactionQueue->setSize(oldindex);
+
updateLiveStateFromLocal();
return transactionStatus;
fromRetry = true;
ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
- if (sendSlotsReturn->getFirst()) {
+ if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
newKey = NULL;
}
}
} else {
- newSlots = sendSlotsReturn->getThird();
+ newSlots = sendSlotsReturn.getThird();
bool isInserted = false;
for (uint si = 0; si < newSlots->length(); si++) {
Slot *s = newSlots->get(si);
}
}
- if (sendSlotsReturn->getThird()->length() != 0) {
+ if (sendSlotsReturn.getThird()->length() != 0) {
// insert into the local block chain
- validateAndUpdate(sendSlotsReturn->getThird(), true);
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
}
// continue;
} else {
// Try to fill the slot with data
ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
- bool needsResize = fillSlotsReturn->getFirst();
- int newSize = fillSlotsReturn->getSecond();
- bool insertedNewKey = fillSlotsReturn->getThird();
+ bool needsResize = fillSlotsReturn.getFirst();
+ int newSize = fillSlotsReturn.getSecond();
+ bool insertedNewKey = fillSlotsReturn.getThird();
if (needsResize) {
// Reset which transaction to send
ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
- if (sendSlotsReturn->getFirst()) {
+ if (sendSlotsReturn.getFirst()) {
// Did insert into the block chain
pendingSendArbitrationEntriesToDelete->clear();
transactionPartsSent->clear();
- if (sendSlotsReturn->getThird()->length() != 0) {
+ if (sendSlotsReturn.getThird()->length() != 0) {
// insert into the local block chain
- validateAndUpdate(sendSlotsReturn->getThird(), true);
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
}
}
bbEncode->putInt(0);
// Send by local
- Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+ Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
localSequenceNumber++;
if (returnData == NULL) {
for (int i = 0; i < numberOfEntries; i++) {
char type = bbDecode->get();
if (type == TypeAbort) {
- Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
+ Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
processEntry(abort);
} else if (type == TypeCommitPart) {
- CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
+ CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
processEntry(commitPart);
}
}
// Send by local
- Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation->getFirst(), localCommunicationInformation->getSecond());
+ Array<char> *returnData = cloud->sendLocalData(sendData, localSequenceNumber, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
localSequenceNumber++;
if (returnData == NULL) {
for (int i = 0; i < numberOfEntries; i++) {
char type = bbDecode->get();
if (type == TypeAbort) {
- Abort *abort = (Abort)Abort_decode(NULL, bbDecode);
+ Abort *abort = (Abort*)Abort_decode(NULL, bbDecode);
if ((abort->getTransactionMachineId() == localMachineId) && (abort->getTransactionClientLocalSequenceNumber() == transaction->getClientLocalSequenceNumber())) {
foundAbort = true;
processEntry(abort);
} else if (type == TypeCommitPart) {
- CommitPart *commitPart = (CommitPart)CommitPart_decode(NULL, bbDecode);
+ CommitPart *commitPart = (CommitPart*)CommitPart_decode(NULL, bbDecode);
processEntry(commitPart);
}
}
// Arbitrate on transaction and pull relevant return data
Pair<bool, bool> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
- couldArbitrate = localArbitrateReturn->getFirst();
- didCommit = localArbitrateReturn->getSecond();
+ couldArbitrate = localArbitrateReturn.getFirst();
+ didCommit = localArbitrateReturn.getSecond();
updateLiveStateFromLocal();
unseenArbitrations->addAll(commit->getParts()->values());
- for (CommitPart commitPart : commit->getParts()->values()) {
+ for (CommitPart *commitPart : commit->getParts()->values()) {
returnDataSize += commitPart->getSize();
}
}
}
bbEncode->putInt(unseenArbitrations->size());
- for (Entry *entry : unseenArbitrations) {
+ uint size = unseenArbitrations->size();
+ for(uint i = 0; i< size; i++) {
+ Entry * entry = unseenArbitrations->get(i);
entry->encode(bbEncode);
}
-
localSequenceNumber++;
return returnData;
}
if (hadPartialSendToServer) {
bool isInserted = false;
- for (Slot *s : array) {
+ uint size = s->size();
+ for(uint i=0; i < size; i++) {
+ Slot *s = array->get(i);
if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
isInserted = true;
break;
}
}
- for (Slot *s : array) {
+ for(uint i=0; i < size; i++) {
+ Slot *s = array->get(i);
if (isInserted) {
break;
}
ThreeTuple<bool, bool, int64_t> mandatoryRescueReturn = doMandatoryResuce(slot, resize);
// Extract working variables
- bool needsResize = mandatoryRescueReturn->getFirst();
- bool seenLiveSlot = mandatoryRescueReturn->getSecond();
- int64_t currentRescueSequenceNumber = mandatoryRescueReturn->getThird();
+ bool needsResize = mandatoryRescueReturn.getFirst();
+ bool seenLiveSlot = mandatoryRescueReturn.getSecond();
+ int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
if (needsResize && !resize) {
// We need to resize but we are not resizing so return false
// Iterate over all the live entries and try to rescue them
for (Entry *liveEntry : liveEntries) {
if (slot->hasSpace(liveEntry)) {
-
// Enough space to rescue the entry
slot->addEntry(liveEntry);
} else if (currentSequenceNumber == firstIfFull) {
//if there's no space but the entry is about to fall off the queue
- System->out->println("B"); //?
return ThreeTuple<bool, bool, int64_t>(true, seenLiveSlot, currentSequenceNumber);
-
}
}
}
// must have a last message message-> If not then the server is
// hiding slots
if (!machineSet->isEmpty()) {
- throw new Error("Missing record for machines: " + machineSet);
+ throw new Error("Missing record for machines: ");
}
}
*/
void Table::checkNumSlots(int numberOfSlots) {
if (numberOfSlots != expectedsize) {
- throw new Error("Server Error: Server did not send all slots-> Expected: " + expectedsize + " Received:" + numberOfSlots);
+ throw new Error("Server Error: Server did not send all slots-> Expected: ");
}
}
processEntry((TableStatus *)entry, slot->getSequenceNumber());
break;
default:
- throw new Error("Unrecognized type: " + entry->getType());
+ throw new Error("Unrecognized type: ");
}
}
}
// a rejected slot
int64_t slotMachineId = slot->getMachineID();
if (isequal != (slotMachineId == machineId)) {
- throw new Error("Server Error: Trying to insert rejected message for slot " + seqNum);
+ throw new Error("Server Error: Trying to insert rejected message for slot ");
}
}
}
}
Pair<int64_t, Liveness *> lastMessageValue = lastMessageEntry->getValue();
- int64_t entrySequenceNumber = lastMessageValue->getFirst();
+ int64_t entrySequenceNumber = lastMessageValue.getFirst();
if (entrySequenceNumber < seq) {
// Add this rejected message to the set of messages that this
liveAbortsGeneratedByLocal->put(entry->getArbitratorLocalSequenceNumber(), entry);
}
- if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId())->getFirst() >= entry->getSequenceNumber())) {
+ if ((entry->getSequenceNumber() != -1) && (lastMessageTable->get(entry->getTransactionMachineId()).getFirst() >= entry->getSequenceNumber())) {
// The machine already saw this so it is dead
entry->setDead();
liveAbortTable->remove(entry->getAbortId());
}
// This part is still alive
- Hashtable<Pair<int64_t, int32_t>, TransactionPart *> *transactionPart = newTransactionParts->get(entry->getMachineId());
+ Hashtable<Pair<int64_t, int32_t>, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *transactionPart = newTransactionParts->get(entry->getMachineId());
if (transactionPart == NULL) {
// Dont have a table for this machine Id yet so make one
return;
}
- int64_t lastMessageSeqNum = lastMessageEntry->getFirst();
- Liveness *lastEntry = lastMessageEntry->getSecond();
+ int64_t lastMessageSeqNum = lastMessageEntry.getFirst();
+ Liveness *lastEntry = lastMessageEntry.getSecond();
// If it is not our machine Id since we already set ours to dead
if (machineId != localMachineId) {
if (hadPartialSendToServer) {
// We were not making any updates and we had a machine mismatch
if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: ");
}
} else {
// We were not making any updates and we had a machine mismatch
if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: ");
}
}
} else {
}
void Transaction::setDead() {
- if (isDead) {
- // Already dead
- return;
- }
-
- // Set dead
- isDead = true;
-
- // Make all the parts of this transaction dead
- for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
- TransactionPart *part = parts->get(partNumber);
- if (part != NULL)
- part->setDead();
+ if (!isDead) {
+ // Set dead
+ isDead = true;
+ // Make all the parts of this transaction dead
+ for (int32_t partNumber = 0; partNumber < parts->size(); partNumber ++) {
+ TransactionPart *part = parts->get(partNumber);
+ if (part != NULL)
+ part->setDead();
+ }
}
}
#define HASHSET_H
#include "hashtable.h"
-template<typename _Key>
-struct Linknode {
- _Key key;
- Linknode<_Key> *prev;
- Linknode<_Key> *next;
-};
-
template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
class Hashset;
template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class SetIterator {
public:
- SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) :
- curr(_curr),
- set(_set)
+ SetIterator(Hashlistnode<_Key, _Key> *_curr, Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *_table) :
+curr(_curr),
+table(_table)
{
}
void remove() {
_Key k = last->key;
- set->remove(k);
+ table->remove(k);
}
private:
- Linknode<_Key> *curr;
- Linknode<_Key> *last;
- Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set;
+Hashlistnode<_Key,_Key> *curr;
+Hashlistnode<_Key, _Key> *last;
+Hashtable <_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table;
};
template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class Hashset {
public:
Hashset(unsigned int initialcapacity = 16, double factor = 0.5) :
- table(new Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor)),
- list(NULL),
- tail(NULL)
+table(new Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals>(initialcapacity, factor))
{
}
/** @brief Hashset destructor */
~Hashset() {
- Linknode<_Key> *tmp = list;
- while (tmp != NULL) {
- Linknode<_Key> *tmpnext = tmp->next;
- ourfree(tmp);
- tmp = tmpnext;
- }
delete table;
}
}
void clear() {
- Linknode<_Key> *tmp = list;
- while (tmp != NULL) {
- Linknode<_Key> *tmpnext = tmp->next;
- ourfree(tmp);
- tmp = tmpnext;
- }
- list = tail = NULL;
table->clear();
}
- void resetAndDelete() {
- Linknode<_Key> *tmp = list;
- while (tmp != NULL) {
- Linknode<_Key> *tmpnext = tmp->next;
- ourfree(tmp);
- tmp = tmpnext;
- }
- list = tail = NULL;
- table->resetAndDeleteKeys();
- }
-
/** @brief Adds a new key to the hashset. Returns false if the key
* is already present. */
* is already present. */
bool add(_Key key) {
- Linknode<_Key> *val = table->get(key);
- if (val == NULL) {
- Linknode<_Key> *newnode = (Linknode<_Key> *)ourmalloc(sizeof(struct Linknode<_Key>));
- newnode->prev = tail;
- newnode->next = NULL;
- newnode->key = key;
- if (tail != NULL)
- tail->next = newnode;
- else
- list = newnode;
- tail = newnode;
- table->put(key, newnode);
+ if (!table->contains(key)) {
+ table->put(key, key);
return true;
} else
return false;
}
- /** @brief Return random key from set. */
-
- _Key getRandomElement() {
- if (size() == 0)
- return NULL;
- else if (size() < 6) {
- uint count = random() % size();
- Linknode<_Key> *ptr = list;
- while (count > 0) {
- ptr = ptr->next;
- count--;
- }
- return ptr->key;
- } else
- return table->getRandomValue()->key;
- }
-
/** @brief Gets the original key corresponding to this one from the
* hashset. Returns NULL if not present. */
_Key get(_Key key) {
- Linknode<_Key> *val = table->get(key);
- if (val != NULL)
- return val->key;
- else
- return NULL;
+ return table->get(key);
}
_Key getFirstKey() {
- return list->key;
+ return table->list->key;
}
bool contains(_Key key) {
- return table->get(key) != NULL;
+ return table->contains(key);
}
bool remove(_Key key) {
- Linknode<_Key> *oldlinknode;
- oldlinknode = table->get(key);
- if (oldlinknode == NULL) {
+ if (!table->contains(key))
return false;
- }
table->remove(key);
-
- //remove link node from the list
- if (oldlinknode->prev == NULL)
- list = oldlinknode->next;
- else
- oldlinknode->prev->next = oldlinknode->next;
- if (oldlinknode->next != NULL)
- oldlinknode->next->prev = oldlinknode->prev;
- else
- tail = oldlinknode->prev;
- ourfree(oldlinknode);
return true;
}
}
SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> *iterator() {
- return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(list, this);
+ return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table);
}
/** Override: new operator */
ourfree(p);
}
private:
- Hashtable<_Key, Linknode<_Key> *, _KeyInt, _Shift, hash_function, equals> *table;
- Linknode<_Key> *list;
- Linknode<_Key> *tail;
+ Hashtable<_Key, _Key, _KeyInt, _Shift, hash_function, equals> *table;
};
+
+template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
+ SetIterator<_Key, _KeyInt, _Shift, hash_function, equals> * getKeyIterator(Hashtable<_Key,_Key,_KeyInt,_Shift,hash_function,equals> *table) {
+ return new SetIterator<_Key, _KeyInt, _Shift, hash_function, equals>(table->list, table);
+}
#endif
_Key key;
_Val val;
uint hashcode;
+ struct Hashlistnode<_Key, _Val> * next;
+ struct Hashlistnode<_Key, _Val> * prev;
};
template<typename _Key, int _Shift, typename _KeyInt>
threshold = (unsigned int)(initialcapacity * loadfactor);
Size = 0; // Initial number of elements in the hash
+ tail = list = NULL;
}
/** @brief Hash table destructor */
zero = NULL;
}
Size = 0;
+ tail = list = NULL;
}
/** Doesn't work with zero value */
zero = NULL;
}
Size = 0;
+ tail = list = NULL;
}
void resetAndDeleteVals() {
zero = NULL;
}
Size = 0;
+ tail = list = NULL;
}
void resetAndFreeVals() {
zero = NULL;
}
Size = 0;
+ tail = list = NULL;
}
/**
_Val oldval;
if (!zero) {
zero = (struct Hashlistnode<_Key, _Val> *)ourmalloc(sizeof(struct Hashlistnode<_Key, _Val>));
+ zero->next = list;
+ if (list != NULL)
+ list->prev = zero;
+ else
+ tail = zero;
+ list = zero;
Size++;
oldval = (_Val) 0;
} else
search->key = key;
search->val = val;
search->hashcode = hashcode;
+ search->next = list;
+ if (list == NULL)
+ tail = search;
+ else
+ list->prev = search;
Size++;
return (_Val) 0;
}
_Val remove(_Key key) {
struct Hashlistnode<_Key, _Val> *search;
- /* Hashtable cannot handle 0 as a key */
if (!key) {
if (!zero) {
return (_Val)0;
} else {
_Val v = zero->val;
+ if (zero -> next != NULL)
+ zero -> next -> prev = zero ->prev;
+ else
+ tail = zero -> prev;
+
+ if (zero -> prev != NULL)
+ zero -> prev -> next = zero -> next;
+ else
+ list = zero->next;
+
ourfree(zero);
zero = NULL;
Size--;
//empty out this bin
search->val = (_Val) 1;
search->key = 0;
+
+ if (search -> next != NULL)
+ search -> next -> prev = search ->prev;
+ else
+ tail = search -> prev;
+
+ if (search -> prev != NULL)
+ search -> prev -> next = search -> next;
+ else
+ list = search->next;
+
Size--;
return v;
}
table = newtable; // Update the global hashtable upon resize()
capacity = newsize;
capacitymask = newsize - 1;
-
+ list = tail = NULL;
threshold = (unsigned int)(newsize * loadfactor);
struct Hashlistnode<_Key, _Val> *bin = &oldtable[0];
index++;
} while (search->key);
+ if (tail == NULL)
+ tail = search;
+ search -> next = list;
+ if (list != NULL)
+ list -> prev = search;
+ list = search;
search->hashcode = hashcode;
search->key = key;
search->val = bin->val;
unsigned int getCapacity() {return capacity;}
struct Hashlistnode<_Key, _Val> *table;
struct Hashlistnode<_Key, _Val> *zero;
- unsigned int capacity;
+ struct Hashlistnode<_Key, _Val> * list;
+ struct Hashlistnode<_Key, _Val> * tail;
+ unsigned int capacity;
unsigned int Size;
private:
unsigned int capacitymask;