delete parts;
}
+void ArbitrationRound::generateParts() {
+ if (didGenerateParts) {
+ return;
+ }
+ parts = new Vector<Entry *>();
+ SetIterator<Abort *, Abort *> * abit = abortsBefore->iterator();
+ while(abit->hasNext())
+ parts->add((Entry *)abit->next());
+ delete abit;
+ if (commit != NULL) {
+ Vector<CommitPart *> * cParts = commit->getParts();
+ uint cPartsSize = cParts->size();
+ for(uint i=0; i < cPartsSize; i++) {
+ parts->add((Entry *)cParts->get(i));
+ }
+ }
+}
+
Vector<Entry *> *ArbitrationRound::getParts() {
return parts;
}
+void ArbitrationRound::removeParts(Vector<Entry *> *removeParts) {
+ parts->removeAll(removeParts);
+ didSendPart = true;
+}
+
+
bool ArbitrationRound::isDoneSending() {
if ((commit == NULL) && abortsBefore->isEmpty()) {
return true;
table(_table),
listeningPort(_listeningPort),
doEnd(false),
- timer(TimingSingleton_getInstance()) {
+ timer(TimingSingleton_getInstance()),
+ getslot(new Array<char>("getslot", 7)),
+ putslot(new Array<char>("putslot", 7)) {
if (listeningPort > 0) {
pthread_create(&localServerThread, NULL, threadWrapper, this);
}
}
+CloudComm::~CloudComm() {
+ delete random;
+ delete getslot;
+ delete putslot;
+}
+
/**
* Generates Key from password.
*/
* Constructor for actual use. Takes in the url and password.
*/
CloudComm(Table *_table, IoTString *_baseurl, IoTString *_password, int _listeningPort);
-
+ ~CloudComm();
+
/**
* Inits all the security stuff
*/
missingParts(NULL),
fldisComplete(false),
hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
isDead(false),
sequenceNumber(-1),
machineId(-1),
missingParts(NULL),
fldisComplete(true),
hasLastPart(false),
- keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>()),
+ keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
isDead(false),
sequenceNumber(_sequenceNumber),
machineId(_machineId),
}
}
-Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *Commit::getKeyValueUpdateSet() {
+Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
return keyValueUpdateSet;
}
Array<char> *Commit::convertDataToBytes() {
// Calculate the size of the data
int sizeOfData = sizeof(int32_t); // Number of Update KV's
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = keyValueUpdateSet->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
sizeOfData += kv->getSize();
return bbEncode->array();
}
-void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs) {
+void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0> *newKVs) {
keyValueUpdateSet->clear();
keyValueUpdateSet->addAll(newKVs);
liveKeys->clear();
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = newKVs->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = newKVs->iterator();
while (kvit->hasNext()) {
liveKeys->add(kvit->next()->getKey());
}
} else if (newer == NULL) {
return older;
}
- Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue>();
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = older->getKeyValueUpdateSet()->iterator();
+ Hashset<KeyValue *, uintptr_t, 0> *kvSet = new Hashset<KeyValue *, uintptr_t, 0>();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
kvSet->add(kv);
Hashset<int32_t> *missingParts;
bool fldisComplete;
bool hasLastPart;
- Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *keyValueUpdateSet;
+ Hashset<KeyValue *, uintptr_t, 0> *keyValueUpdateSet;
bool isDead;
int64_t sequenceNumber;
int64_t machineId;
int64_t transactionSequenceNumber;
Hashset<IoTString *> *liveKeys;
Array<char> *convertDataToBytes();
- void setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *newKVs);
+ void setKVsMap(Hashset<KeyValue *, uintptr_t, 0> *newKVs);
public:
Commit();
Vector<CommitPart *> *getParts();
void addKV(KeyValue *kv);
void invalidateKey(IoTString *key);
- Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *getKeyValueUpdateSet();
+ Hashset<KeyValue *, uintptr_t, 0> *getKeyValueUpdateSet();
int32_t getNumberOfParts();
int64_t getMachineId() { return machineId; }
bool isComplete() { return fldisComplete; }
#define ServerException_TypeSalt 3
class ServerException {
-public:
- ServerException(const char *msg, char _type) : type(_type) {}
- char getType();
char type;
+ public:
+ ServerException(const char *msg, char _type) : type(_type) {}
+ char getType() {return type;}
};
#endif
};
KeyValue *KeyValue_decode(ByteBuffer *bb);
-unsigned int hashKeyValue(KeyValue *kv);
-bool equalsKeyValue(KeyValue *a, KeyValue *b);
#endif
OBJECTS := $(CPP_SOURCES:%.cc=$(OBJ_DIR)/%.o) $(C_SOURCES:%.c=$(OBJ_DIR)/%.o)
-CFLAGS := -Wall -O3 -g
+CFLAGS := -Wall -O0 -g
CFLAGS += -I.
LDFLAGS := -ldl -lrt -rdynamic -g
SHARED := -shared
delete key;
}
-Entry *decode(Slot *slot, ByteBuffer *bb) {
+Entry *NewKey_decode(Slot *slot, ByteBuffer *bb) {
int keylength = bb->getInt();
Array<char> *key = new Array<char>(keylength);
bb->get(key);
/**
* Get the transaction arbitrator
*/
- int64_t getArbitrator();
+ int64_t getArbitrator() {return arbitrator;}
/**
* Get the key value update set
*/
init();
}
+Table::~Table() {
+ delete cloud;
+ delete random;
+ delete buffer;
+ // init data structs
+ delete committedKeyValueTable;
+ delete speculatedKeyValueTable;
+ delete pendingTransactionSpeculatedKeyValueTable;
+ delete liveNewKeyTable;
+ delete lastMessageTable;
+ delete rejectedMessageWatchVectorTable;
+ delete arbitratorTable;
+ delete liveAbortTable;
+ delete newTransactionParts;
+ delete newCommitParts;
+ delete lastArbitratedTransactionNumberByArbitratorTable;
+ delete liveTransactionBySequenceNumberTable;
+ delete liveTransactionByTransactionIdTable;
+ delete liveCommitsTable;
+ delete liveCommitsByKeyTable;
+ delete lastCommitSeenSequenceNumberByArbitratorTable;
+ delete rejectedSlotVector;
+ delete pendingTransactionQueue;
+ delete pendingSendArbitrationEntriesToDelete;
+ delete transactionPartsSent;
+ delete outstandingTransactionStatus;
+ delete liveAbortsGeneratedByLocal;
+ delete offlineTransactionsCommittedAndAtServer;
+ delete localCommunicationTable;
+ delete lastTransactionSeenFromMachineFromServer;
+ delete pendingSendArbitrationRounds;
+ delete lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+}
+
/**
* Init all the stuff needed for for table usage
*/
// have live values for their keys
Hashset<Commit *> *commitsToEdit = new Hashset<Commit *>();
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
Commit *commit = liveCommitsByKeyTable->get(kv->getKey());
// Update which keys in the old commits are still live
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
previousCommit->invalidateKey(kv->getKey());
// Update the committed table of keys and which commit is using which key
{
- SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, equalsKeyValue> *kvit = commit->getKeyValueUpdateSet()->iterator();
+ SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = commit->getKeyValueUpdateSet()->iterator();
while (kvit->hasNext()) {
KeyValue *kv = kvit->next();
committedKeyValueTable->put(kv->getKey(), kv);
public:
Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
Table(CloudComm *_cloud, int64_t _localMachineId);
-
+ ~Table();
+
/**
* Initialize the table by inserting a table status as the first entry into the table status
* also initialize the crypto stuff.
memcpy(&array[fldsize], v->array, v->fldsize * sizeof(type));
}
+ void removeAll(Vector<type> *v) {
+ uint vsize = v->size();
+ for(uint i = 0; i < vsize; i++)
+ remove(v->get(i));
+ }
+
void add(type item) {
if (fldsize >= capacity) {
uint newcap = capacity << 1;