-#include "commit.h"
+#include "Commit.h"
+#include "CommitPart.h"
+#include "ByteBuffer.h"
+#include "KeyValue.h"
Commit::Commit() :
parts(new Hashtable<int32_t, CommitPart *>()),
missingParts(NULL),
fldisComplete(false),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
isDead(false),
sequenceNumber(-1),
machineId(-1),
missingParts(NULL),
fldisComplete(true),
hasLastPart(false),
- keyValueUpdateSet(new HashSet<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
isDead(false),
sequenceNumber(_sequenceNumber),
machineId(_machineId),
liveKeys(new Hashset<IoTString *>) {
}
-void Commit::addPartDecode(CommitPart newPart) {
-
+void Commit::addPartDecode(CommitPart *newPart) {
if (isDead) {
// If dead then just kill this part and move on
newPart->setDead();
// Set dead the old one since the new one is a rescued version of this part
previoslySeenPart->setDead();
} else if (newPart->isLastPart()) {
- missingParts = new HashSet<int32_t>();
+ missingParts = new Hashset<int32_t>();
hasLastPart = true;
for (int i = 0; i < newPart->getPartNumber(); i++) {
// Make all the parts of this transaction dead
for (int32_t partNumber : parts->keySet()) {
- CommitPart part = parts->get(partNumber);
+ CommitPart *part = parts->get(partNumber);
part->setDead();
}
}
// Copy to a smaller version
Array<char> *partData = new Array<char>(copySize);
- System->arraycopy(charData, currentPosition, partData, 0, copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
parts->put(part->getPartNumber(), part);
// Stitch all the data sections together
for (int i = 0; i < parts->keySet()->size(); i++) {
CommitPart *tp = parts->get(i);
- System->arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
currentPosition += tp->getDataSize();
}
// Decode all the updates key values
for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue *kv = (KeyValue *)KeyValue->decode(bbDecode);
+ KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
keyValueUpdateSet->add(kv);
liveKeys->add(kv->getKey());
}
keyValueUpdateSet->addAll(newKVs->values());
liveKeys->addAll(newKVs->keySet());
-
}
-
Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
if (older == NULL) {