From b47765b628c0c160dd13622f8e241c377b8ce3cb Mon Sep 17 00:00:00 2001 From: bdemsky Date: Thu, 18 Jan 2018 14:17:42 -0800 Subject: [PATCH] edits --- version2/src/C/ArbitrationRound.cc | 50 ++--- version2/src/C/ArbitrationRound.h | 26 +-- version2/src/C/Commit.cc | 167 +++++++-------- version2/src/C/Commit.h | 317 ++++------------------------- version2/src/C/TableStatus.cc | 49 +---- version2/src/C/TableStatus.h | 58 +++--- version2/src/C/array.h | 5 +- version2/src/C/common.h | 15 +- version2/src/C/hashset.h | 2 +- version2/src/C/hashtable.h | 2 +- version2/src/C/vector.h | 1 - 11 files changed, 207 insertions(+), 485 deletions(-) diff --git a/version2/src/C/ArbitrationRound.cc b/version2/src/C/ArbitrationRound.cc index 8b84785..f788671 100644 --- a/version2/src/C/ArbitrationRound.cc +++ b/version2/src/C/ArbitrationRound.cc @@ -1,25 +1,29 @@ -#include"ArbitrationRound.h" +#include "ArbitrationRound.h" +#include "Commit.h" -ArbitrationRound::ArbitrationRound(Commit * _commit, Set * _abortsBefore) { - parts = new ArrayList(); - commit = _commit; - abortsBefore = _abortsBefore; +ArbitrationRound::ArbitrationRound(Commit * _commit, Hashset * _abortsBefore) : + abortsBefore(_abortsBefore), + parts(new Vector()), + commit(_commit), + currentSize(0), + didSendPart(false), + didGenerateParts(false) { if (commit != NULL) { - commit.createCommitParts(); - currentSize += commit.getNumberOfParts(); - } + commit->createCommitParts(); + currentSize += commit->getNumberOfParts(); + } - currentSize += abortsBefore.size(); + currentSize += abortsBefore->size(); } void ArbitrationRound::generateParts() { if (didGenerateParts) { return; } - parts = new ArrayList(abortsBefore); + parts = new Vector(abortsBefore); if (commit != NULL) { - parts.addAll(commit.getParts().values()); + parts->addAll(commit->getParts()->values()); } } @@ -28,16 +32,16 @@ List * ArbitrationRound::getParts() { } void ArbitrationRound::removeParts(List * removeParts) { - parts.removeAll(removeParts); + parts->removeAll(removeParts); didSendPart = true; } bool ArbitrationRound::isDoneSending() { - if ((commit == NULL) && abortsBefore.isEmpty()) { + if ((commit == NULL) && abortsBefore->isEmpty()) { return true; } - return parts.isEmpty(); + return parts->isEmpty(); } Commit * ArbitrationRound::getCommit() { @@ -46,31 +50,31 @@ Commit * ArbitrationRound::getCommit() { void ArbitrationRound::setCommit(Commit * _commit) { if (commit != NULL) { - currentSize -= commit.getNumberOfParts(); + currentSize -= commit->getNumberOfParts(); } commit = _commit; if (commit != NULL) { - currentSize += commit.getNumberOfParts(); + currentSize += commit->getNumberOfParts(); } } void ArbitrationRound::addAbort(Abort * abort) { - abortsBefore.add(abort); + abortsBefore->add(abort); currentSize++; } -void ArbitrationRound::addAborts(Set * aborts) { - abortsBefore.addAll(aborts); - currentSize += aborts.size(); +void ArbitrationRound::addAborts(Hashset * aborts) { + abortsBefore->addAll(aborts); + currentSize += aborts->size(); } -Set ArbitrationRound::getAborts() { +Hashset * ArbitrationRound::getAborts() { return abortsBefore; } int ArbitrationRound::getAbortsCount() { - return abortsBefore.size(); + return abortsBefore->size(); } int ArbitrationRound::getCurrentSize() { @@ -81,7 +85,7 @@ bool ArbitrationRound::isFull() { return currentSize >= MAX_PARTS; } -bool ArbitrationRound::didSendPart() { +bool ArbitrationRound::getDidSendPart() { return didSendPart; } diff --git a/version2/src/C/ArbitrationRound.h b/version2/src/C/ArbitrationRound.h index 7ef4a2e..da1a2a8 100644 --- a/version2/src/C/ArbitrationRound.h +++ b/version2/src/C/ArbitrationRound.h @@ -2,30 +2,32 @@ #define ARBITRATIONROUND_H #define MAX_PARTS 10 +#include "common.h" class ArbitrationRound { private: - Set * abortsBefore = NULL; - List * parts = NULL; - Commit commit = NULL; - int currentSize = 0; - bool didSendPart = false; - bool didGenerateParts = false; + Hashset * abortsBefore; + Vector * parts; + Commit * commit; + int currentSize; + bool didSendPart; + bool didGenerateParts; public: - ArbitrationRound(Commit * _commit, Set * _abortsBefore); + ArbitrationRound(Commit * _commit, Hashset * _abortsBefore); + ~ArbitrationRound(); void generateParts(); - List * getParts(); - void removeParts(List * removeParts); + Vector * getParts(); + void removeParts(Vector * removeParts); bool isDoneSending(); void setCommit(Commit * _commit); void addAbort(Abort * abort); - void addAborts(Set * aborts); - Set * getAborts(); + void addAborts(Hashset * aborts); + Hashset * getAborts(); int getAbortsCount(); int getCurrentSize(); bool isFull(); - bool didSendPart(); + bool getDidSendPart(); }; #endif diff --git a/version2/src/C/Commit.cc b/version2/src/C/Commit.cc index 4925d6f..32e2175 100644 --- a/version2/src/C/Commit.cc +++ b/version2/src/C/Commit.cc @@ -1,84 +1,77 @@ +#include "commit.h" + +Commit::Commit() : + parts(new HashMap()), + missingParts(NULL), + fldisComplete(false), + hasLastPart(false), + keyValueUpdateSet(new HashSet()), + isDead(false), + sequenceNumber(-1), + machineId(-1), + transactionSequenceNumber(-1), + liveKeys(new Hashset) { +} -class Commit { - - Map parts = NULL; - Set missingParts = NULL; - bool isComplete = false; - bool hasLastPart = false; - Set keyValueUpdateSet = NULL; - bool isDead = false; - int64_t sequenceNumber = -1; - int64_t machineId = -1; - int64_t transactionSequenceNumber = -1; - - Set liveKeys = NULL; - - Commit() { - parts = new HashMap(); - keyValueUpdateSet = new HashSet(); - - liveKeys = new HashSet(); - } - - Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) { - parts = new HashMap(); - keyValueUpdateSet = new HashSet(); - - liveKeys = new HashSet(); - - sequenceNumber = _sequenceNumber; - machineId = _machineId; - transactionSequenceNumber = _transactionSequenceNumber; - isComplete = true; - } - - - void addPartDecode(CommitPart newPart) { - - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } - - CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } - - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { - - // We have all the parts - isComplete = true; - - // Decode all the parts and create the key value guard and update sets - decodeCommitData(); +Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) : + parts(new HashMap()), + missingParts(NULL), + fldisComplete(true), + hasLastPart(false), + keyValueUpdateSet(new HashSet()), + isDead(false), + sequenceNumber(_sequenceNumber), + machineId(_machineId), + transactionSequenceNumber(_transactionSequenceNumber), + liveKeys(new Hashset) { +} - // Get the sequence number and arbitrator of this transaction - sequenceNumber = parts.get(0).getSequenceNumber(); - machineId = parts.get(0).getMachineId(); - transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber(); - } - } - } +void Commit::addPartDecode(CommitPart newPart) { + + if (isDead) { + // If dead then just kill this part and move on + newPart.setDead(); + return; + } + + CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); + + if (previoslySeenPart != NULL) { + // Set dead the old one since the new one is a rescued version of this part + previoslySeenPart.setDead(); + } else if (newPart.isLastPart()) { + missingParts = new HashSet(); + hasLastPart = true; + + for (int i = 0; i < newPart.getPartNumber(); i++) { + if (parts.get(i) == NULL) { + missingParts.add(i); + } + } + } + + if (!fldisComplete && hasLastPart) { + + // We have seen this part so remove it from the set of missing parts + missingParts.remove(newPart.getPartNumber()); + + // Check if all the parts have been seen + if (missingParts.size() == 0) { + + // We have all the parts + fldisComplete = true; + + // Decode all the parts and create the key value guard and update sets + decodeCommitData(); + + // Get the sequence number and arbitrator of this transaction + sequenceNumber = parts.get(0).getSequenceNumber(); + machineId = parts.get(0).getMachineId(); + transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber(); + } + } +} int64_t getSequenceNumber() { return sequenceNumber; @@ -109,21 +102,9 @@ class Commit { return keyValueUpdateSet; } - int getNumberOfParts() { - return parts.size(); - } - - int64_t getMachineId() { - return machineId; - } - - bool isComplete() { - return isComplete; - } - - bool isLive() { - return !isDead; - } +int32_t getNumberOfParts() { + return parts.size(); +} void setDead() { if (isDead) { diff --git a/version2/src/C/Commit.h b/version2/src/C/Commit.h index c235452..4dd5a9d 100644 --- a/version2/src/C/Commit.h +++ b/version2/src/C/Commit.h @@ -1,279 +1,42 @@ - +#ifndef COMMIT_H +#define COMMIT_H +#include "common.h" class Commit { - - private Map parts = NULL; - private Set missingParts = NULL; - private bool isComplete = false; - private bool hasLastPart = false; - private Set keyValueUpdateSet = NULL; - private bool isDead = false; - private int64_t sequenceNumber = -1; - private int64_t machineId = -1; - private int64_t transactionSequenceNumber = -1; - - private Set liveKeys = NULL; - - public Commit() { - parts = new HashMap(); - keyValueUpdateSet = new HashSet(); - - liveKeys = new HashSet(); - } - - public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) { - parts = new HashMap(); - keyValueUpdateSet = new HashSet(); - - liveKeys = new HashSet(); - - sequenceNumber = _sequenceNumber; - machineId = _machineId; - transactionSequenceNumber = _transactionSequenceNumber; - isComplete = true; - } - - - public void addPartDecode(CommitPart newPart) { - - if (isDead) { - // If dead then just kill this part and move on - newPart.setDead(); - return; - } - - CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart); - - if (previoslySeenPart != NULL) { - // Set dead the old one since the new one is a rescued version of this part - previoslySeenPart.setDead(); - } else if (newPart.isLastPart()) { - missingParts = new HashSet(); - hasLastPart = true; - - for (int i = 0; i < newPart.getPartNumber(); i++) { - if (parts.get(i) == NULL) { - missingParts.add(i); - } - } - } - - if (!isComplete && hasLastPart) { - - // We have seen this part so remove it from the set of missing parts - missingParts.remove(newPart.getPartNumber()); - - // Check if all the parts have been seen - if (missingParts.size() == 0) { - - // We have all the parts - isComplete = true; - - // Decode all the parts and create the key value guard and update sets - decodeCommitData(); - - // Get the sequence number and arbitrator of this transaction - sequenceNumber = parts.get(0).getSequenceNumber(); - machineId = parts.get(0).getMachineId(); - transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber(); - } - } - } - - public int64_t getSequenceNumber() { - return sequenceNumber; - } - - public int64_t getTransactionSequenceNumber() { - return transactionSequenceNumber; - } - - public Map getParts() { - return parts; - } - - public void addKV(KeyValue kv) { - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } - - public void invalidateKey(IoTString key) { - liveKeys.remove(key); - - if (liveKeys.size() == 0) { - setDead(); - } - } - - public Set getKeyValueUpdateSet() { - return keyValueUpdateSet; - } - - public int getNumberOfParts() { - return parts.size(); - } - - public int64_t getMachineId() { - return machineId; - } - - public bool isComplete() { - return isComplete; - } - - public bool isLive() { - return !isDead; - } - - public void setDead() { - if (isDead) { - // Already dead - return; - } - - // Set dead - isDead = true; - - // Make all the parts of this transaction dead - for (Integer partNumber : parts.keySet()) { - CommitPart part = parts.get(partNumber); - part.setDead(); - } - } - - public CommitPart getPart(int index) { - return parts.get(index); - } - - public void createCommitParts() { - - parts.clear(); - - // Convert to chars - char[] charData = convertDataToBytes(); - - - int commitPartCount = 0; - int currentPosition = 0; - int remaining = charData.length; - - while (remaining > 0) { - - Boolean isLastPart = false; - // determine how much to copy - int copySize = CommitPart.MAX_NON_HEADER_SIZE; - if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) { - copySize = remaining; - isLastPart = true; // last bit of data so last part - } - - // Copy to a smaller version - char[] partData = new char[copySize]; - System.arraycopy(charData, currentPosition, partData, 0, copySize); - - CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart); - parts.put(part.getPartNumber(), part); - - // Update position, count and remaining - currentPosition += copySize; - commitPartCount++; - remaining -= copySize; - } - } - - private void decodeCommitData() { - - // Calculate the size of the data section - int dataSize = 0; - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - dataSize += tp.getDataSize(); - } - - char[] combinedData = new char[dataSize]; - int currentPosition = 0; - - // Stitch all the data sections together - for (int i = 0; i < parts.keySet().size(); i++) { - CommitPart tp = parts.get(i); - System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize()); - currentPosition += tp.getDataSize(); - } - - // Decoder Object - ByteBuffer bbDecode = ByteBuffer.wrap(combinedData); - - // Decode how many key value pairs need to be decoded - int numberOfKVUpdates = bbDecode.getInt(); - - // Decode all the updates key values - for (int i = 0; i < numberOfKVUpdates; i++) { - KeyValue kv = (KeyValue)KeyValue.decode(bbDecode); - keyValueUpdateSet.add(kv); - liveKeys.add(kv.getKey()); - } - } - - private char[] convertDataToBytes() { - - // Calculate the size of the data - int sizeOfData = sizeof(int32_t); // Number of Update KV's - for (KeyValue kv : keyValueUpdateSet) { - sizeOfData += kv.getSize(); - } - - // Data handlers and storage - char[] dataArray = new char[sizeOfData]; - ByteBuffer bbEncode = ByteBuffer.wrap(dataArray); - - // Encode the size of the updates and guard sets - bbEncode.putInt(keyValueUpdateSet.size()); - - // Encode all the updates - for (KeyValue kv : keyValueUpdateSet) { - kv.encode(bbEncode); - } - - return bbEncode.array(); - } - - private void setKVsMap(Map newKVs) { - keyValueUpdateSet.clear(); - liveKeys.clear(); - - keyValueUpdateSet.addAll(newKVs.values()); - liveKeys.addAll(newKVs.keySet()); - - } - - - public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) { - - if (older == NULL) { - return newer; - } else if (newer == NULL) { - return older; - } - - Map kvSet = new HashMap(); - for (KeyValue kv : older.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } - - for (KeyValue kv : newer.getKeyValueUpdateSet()) { - kvSet.put(kv.getKey(), kv); - } - - int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber(); - - if (transactionSequenceNumber == -1) { - transactionSequenceNumber = older.getTransactionSequenceNumber(); - } - - Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber); - - newCommit.setKVsMap(kvSet); - - return newCommit; - } -} + private: + Hashtable * parts; + Hashset *missingParts; + bool fldisComplete; + bool hasLastPart; + Hashset *keyValueUpdateSet; + bool isDead; + int64_t sequenceNumber; + int64_t machineId; + int64_t transactionSequenceNumber; + Hashset * liveKeys; + Array * convertDataToBytes(); + void setKVsMap(Hashtable * newKVs); + + public: + Commit(); + Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber); + + void addPartDecode(CommitPart * newPart); + int64_t getSequenceNumber(); + int64_t getTransactionSequenceNumber(); + Hashtable *getParts(); + void addKV(KeyValue * kv); + void invalidateKey(IoTString * key); + Hashset * getKeyValueUpdateSet(); + int32_t getNumberOfParts(); + int64_t getMachineId() { return machineId; } + bool isComplete() { return fldisComplete; } + bool isLive() { return !isDead; } + void setDead(); + CommitPart * getPart(int32_t index); + void createCommitParts(); + void decodeCommitData(); +}; + +Commit * Commit_merge(Commit * newer, Commit * older, int64_t newSequenceNumber); +#endif diff --git a/version2/src/C/TableStatus.cc b/version2/src/C/TableStatus.cc index 10c259c..49fcb0f 100644 --- a/version2/src/C/TableStatus.cc +++ b/version2/src/C/TableStatus.cc @@ -1,43 +1,12 @@ +#include "TableStatus.h" +#include "ByteBuffer.h" -/** - * TableStatus entries record the current size of the data structure - * in slots. Used to remember the size and to perform resizes. - * @author Brian Demsky - * @version 1.0 - */ - - -class TableStatus extends Entry { - int maxslots; - - TableStatus(Slot slot, int _maxslots) { - super(slot); - maxslots=_maxslots; - } - - int getMaxSlots() { - return maxslots; - } - - static Entry decode(Slot slot, ByteBuffer bb) { - int maxslots=bb.getInt(); - return new TableStatus(slot, maxslots); - } - - void encode(ByteBuffer bb) { - bb.put(Entry.TypeTableStatus); - bb.putInt(maxslots); - } - - int getSize() { - return sizeof(int32_t)+sizeof(char); - } - - char getType() { - return Entry.TypeTableStatus; - } +Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb) { + int maxslots=bb.getInt(); + return new TableStatus(slot, maxslots); +} - Entry getCopy(Slot s) { - return new TableStatus(s, maxslots); - } +void TableStatus::encode(ByteBuffer * bb) { + bb->put(TypeTableStatus); + bb->putInt(maxslots); } diff --git a/version2/src/C/TableStatus.h b/version2/src/C/TableStatus.h index 9637f7a..ce2576e 100644 --- a/version2/src/C/TableStatus.h +++ b/version2/src/C/TableStatus.h @@ -1,3 +1,7 @@ +#ifndef TABLESTATUS_H +#define TABLESTATUS_H +#include "common.h" +#include "Entry.h" /** * TableStatus entries record the current size of the data structure @@ -6,38 +10,22 @@ * @version 1.0 */ - -class TableStatus extends Entry { - private int maxslots; - - TableStatus(Slot slot, int _maxslots) { - super(slot); - maxslots=_maxslots; - } - - int getMaxSlots() { - return maxslots; - } - - static Entry decode(Slot slot, ByteBuffer bb) { - int maxslots=bb.getInt(); - return new TableStatus(slot, maxslots); - } - - void encode(ByteBuffer bb) { - bb.put(Entry.TypeTableStatus); - bb.putInt(maxslots); - } - - int getSize() { - return sizeof(int32_t)+sizeof(char); - } - - char getType() { - return Entry.TypeTableStatus; - } - - Entry getCopy(Slot s) { - return new TableStatus(s, maxslots); - } -} +class TableStatus : public Entry { + private: + int maxslots; + + public: + TableStatus(Slot * slot, int _maxslots) : Entry(slot), + maxslots(_maxslots) { + } + int getMaxSlots() { return maxslots; } + void encode(ByteBuffer *bb); + int getSize() { return sizeof(int32_t)+sizeof(char); } + + char getType() { return TypeTableStatus; } + + Entry * getCopy(Slot * s) { return new TableStatus(s, maxslots); } +}; + +Entry * TableStatus_decode(Slot * slot, ByteBuffer * bb); +#endif diff --git a/version2/src/C/array.h b/version2/src/C/array.h index 2749822..3126238 100644 --- a/version2/src/C/array.h +++ b/version2/src/C/array.h @@ -1,5 +1,8 @@ #ifndef ARRAY_H #define ARRAY_H +#include + +typedef uint32_t uint; template class Array { @@ -9,7 +12,7 @@ class Array { size(0) { } - Array(uint _size) : + Array(uint32_t _size) : array((type *) ourcalloc(1, sizeof(type) * _size)), size(_size) { diff --git a/version2/src/C/common.h b/version2/src/C/common.h index 63ba35f..0decdda 100644 --- a/version2/src/C/common.h +++ b/version2/src/C/common.h @@ -1,12 +1,25 @@ #ifndef COMMON_H #define COMMON_H #include +typedef uint32_t uint; +#define CMEMALLOC ; +#define model_print printf + +#include "hashset.h" +#include "vector.h" +#include "array.h" + + class Abort; class Entry; class Slot; class ByteBuffer; class Liveness; - +class Commit; +class CommitPart; +class ArbitrationRound; +class KeyValue; +class IoTString; #endif diff --git a/version2/src/C/hashset.h b/version2/src/C/hashset.h index 009c047..2a94d96 100644 --- a/version2/src/C/hashset.h +++ b/version2/src/C/hashset.h @@ -76,7 +76,7 @@ private: Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *set; }; -template, bool (*equals) (_Key, _Key) = defaultEquals<_Key> > +template, bool (*equals) (_Key, _Key) = defaultEquals<_Key> > class Hashset { public: Hashset(unsigned int initialcapacity = 16, double factor = 0.5) : diff --git a/version2/src/C/hashtable.h b/version2/src/C/hashtable.h index 0a50bb9..648377e 100644 --- a/version2/src/C/hashtable.h +++ b/version2/src/C/hashtable.h @@ -17,8 +17,8 @@ #include #include #include -#include "mymemory.h" #include "common.h" +#include "mymemory.h" /** * @brief Hashtable node diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index 2d4cc19..7b49d4e 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -1,7 +1,6 @@ #ifndef CPPVECTOR_H #define CPPVECTOR_H #include - #define VECTOR_DEFCAP 8 template -- 2.34.1