#include "Transaction.h"
Transaction::Transaction() :
- parts(new Hashtable<int32_t, TransactionPart>()),
+ parts(new Hashtable<int32_t, TransactionPart *>()),
missingParts(NULL),
partsPendingSend(new Vector<int32_t>()),
fldisComplete(false),
hasLastPart(false),
- keyValueGuardSet(new HashSet<KeyValue>()),
- keyValueUpdateSet(new HashSet<KeyValue>()),
+ keyValueGuardSet(new Hashset<KeyValue *>()),
+ keyValueUpdateSet(new Hashset<KeyValue *>()),
isDead(false),
sequenceNumber(-1),
clientLocalSequenceNumber(-1),
}
void Transaction::addPartEncode(TransactionPart *newPart) {
- parts.put(newPart.getPartNumber(), newPart);
- partsPendingSend.add(newPart.getPartNumber());
+ parts->put(newPart->getPartNumber(), newPart);
+ partsPendingSend->add(newPart->getPartNumber());
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
+ sequenceNumber = newPart->getSequenceNumber();
+ arbitratorId = newPart->getArbitratorId();
+ transactionId = newPart->getTransactionId();
+ clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+ machineId = newPart->getMachineId();
fldisComplete = true;
}
void Transaction::addPartDecode(TransactionPart *newPart) {
if (isDead) {
// If dead then just kill this part and move on
- newPart.setDead();
+ newPart->setDead();
return;
}
- sequenceNumber = newPart.getSequenceNumber();
- arbitratorId = newPart.getArbitratorId();
- transactionId = newPart.getTransactionId();
- clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
- machineId = newPart.getMachineId();
+ sequenceNumber = newPart->getSequenceNumber();
+ arbitratorId = newPart->getArbitratorId();
+ transactionId = newPart->getTransactionId();
+ clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
+ machineId = newPart->getMachineId();
- TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+ TransactionPart previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
if (previoslySeenPart != NULL) {
// Set dead the old one since the new one is a rescued version of this part
- previoslySeenPart.setDead();
- } else if (newPart.isLastPart()) {
- missingParts = new HashSet<int32_t>();
+ previoslySeenPart->setDead();
+ } else if (newPart->isLastPart()) {
+ missingParts = new Hashset<int32_t>();
hasLastPart = true;
- for (int i = 0; i < newPart.getPartNumber(); i++) {
- if (parts.get(i) == NULL) {
- missingParts.add(i);
+ for (int i = 0; i < newPart->getPartNumber(); i++) {
+ if (parts->get(i) == NULL) {
+ missingParts->add(i);
}
}
}
if (!fldisComplete && hasLastPart) {
// We have seen this part so remove it from the set of missing parts
- missingParts.remove(newPart.getPartNumber());
+ missingParts->remove(newPart->getPartNumber());
// Check if all the parts have been seen
- if (missingParts.size() == 0) {
+ if (missingParts->size() == 0) {
// We have all the parts
fldisComplete = true;
}
void Transaction::addUpdateKV(KeyValue *kv) {
- keyValueUpdateSet.add(kv);
+ keyValueUpdateSet->add(kv);
}
void Transaction::addGuardKV(KeyValue *kv) {
- keyValueGuardSet.add(kv);
+ keyValueGuardSet->add(kv);
}
void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
sequenceNumber = _sequenceNumber;
- for (int32_t i : parts.keySet()) {
- parts.get(i).setSequenceNumber(sequenceNumber);
+ for (int32_t i : parts->keySet()) {
+ parts->get(i)->setSequenceNumber(sequenceNumber);
}
}
}
TransactionPart *Transaction::getNextPartToSend() {
- if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+ if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
return NULL;
}
- TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+ TransactionPart part = parts->get(partsPendingSend->get(nextPartToSend));
nextPartToSend++;
return part;
}
void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
nextPartToSend = 0;
- if (partsPendingSend.removeAll(sentParts))
+ if (partsPendingSend->removeAll(sentParts))
{
flddidSendAPartToServer = true;
- transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+ transactionStatus->setTransactionSequenceNumber(sequenceNumber);
}
}
bool Transaction::didSendAllParts() {
- return partsPendingSend.isEmpty();
+ return partsPendingSend->isEmpty();
}
Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
}
int Transaction::getNumberOfParts() {
- return parts.size();
+ return parts->size();
}
int64_t Transaction::getMachineId() {
isDead = true;
// Make all the parts of this transaction dead
- for (int32_t partNumber : parts.keySet()) {
- TransactionPart part = parts.get(partNumber);
- part.setDead();
+ for (int32_t partNumber : parts->keySet()) {
+ TransactionPart part = parts->get(partNumber);
+ part->setDead();
}
}
TransactionPart *Transaction::getPart(int index) {
- return parts.get(index);
+ return parts->get(index);
}
void Transaction::decodeTransactionData() {
// Calculate the size of the data section
int dataSize = 0;
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- dataSize += tp.getDataSize();
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ TransactionPart tp = parts->get(i);
+ dataSize += tp->getDataSize();
}
Array<char> *combinedData = new char[dataSize];
int currentPosition = 0;
// Stitch all the data sections together
- for (int i = 0; i < parts.keySet().size(); i++) {
- TransactionPart tp = parts.get(i);
- System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
- currentPosition += tp.getDataSize();
+ for (int i = 0; i < parts->keySet()->size(); i++) {
+ TransactionPart tp = parts->get(i);
+ System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
+ currentPosition += tp->getDataSize();
}
// Decoder Object
- ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+ ByteBuffer bbDecode = ByteBuffer_wrap(combinedData);
// Decode how many key value pairs need to be decoded
- int numberOfKVGuards = bbDecode.getInt();
- int numberOfKVUpdates = bbDecode.getInt();
+ int numberOfKVGuards = bbDecode->getInt();
+ int numberOfKVUpdates = bbDecode->getInt();
// Decode all the guard key values
for (int i = 0; i < numberOfKVGuards; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueGuardSet.add(kv);
+ KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueGuardSet->add(kv);
}
// Decode all the updates key values
for (int i = 0; i < numberOfKVUpdates; i++) {
- KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
- keyValueUpdateSet.add(kv);
+ KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
+ keyValueUpdateSet->add(kv);
}
}
for (KeyValue *kvGuard : keyValueGuardSet) {
// First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue kv = NULL;
+ KeyValue * kv = NULL;
// If we have a speculation table then use it first
if (pendingTransactionSpeculatedKeyValueTable != NULL) {
- kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+ kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
}
// If we have a speculation table then use it first
if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
- kv = speculatedKeyValueTable.get(kvGuard.getKey());
+ kv = speculatedKeyValueTable->get(kvGuard->getKey());
}
if (kv == NULL) {
// if it is not in the speculative table then check the committed table and use that
// value as our latest assumption
- kv = committedKeyValueTable.get(kvGuard.getKey());
+ kv = committedKeyValueTable->get(kvGuard->getKey());
}
- if (kvGuard.getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+ if (kvGuard->getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
if (kv != NULL) {
- System.out.println(kvGuard.getValue() + " " + kv.getValue());
+ System.out.println(kvGuard->getValue() + " " + kv->getValue());
} else {
- System.out.println(kvGuard.getValue() + " " + kv);
+ System.out.println(kvGuard->getValue() + " " + kv);
}
return false;
#include "TransactionPart.h"
+#include "ByteBuffer.h"
int TransactionPart::getSize() {
if (data == NULL) {
return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char));
}
- return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data.length;
+ return (4 * sizeof(int64_t)) + (2 * sizeof(int32_t)) + (2 * sizeof(char)) + data->length();
}
void TransactionPart::setSlot(Slot *s) {
return arbitratorId;
}
-Pair<int64_t int32_t> *TransactionPart::getPartId() {
+Pair<int64_t, int32_t> *TransactionPart::getPartId() {
return partId;
}
}
int TransactionPart::getDataSize() {
- return data.length;
+ return data->length();
}
Array<char> *TransactionPart::getData() {
}
bool TransactionPart::isLastPart() {
- return isLastPart;
+ return fldisLastPart;
}
int64_t TransactionPart::getMachineId() {
Array<char> *data = new Array<char>(dataSize);
bb->get(data);
- TransactionPart returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- returnTransactionPart.setSequenceNumber(sequenceNumber);
+ TransactionPart * returnTransactionPart = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
+ returnTransactionPart->setSequenceNumber(sequenceNumber);
return returnTransactionPart;
}
void TransactionPart::encode(ByteBuffer *bb) {
- bb->put(Entry.TypeTransactionPart);
+ bb->put(TypeTransactionPart);
bb->putLong(sequenceNumber);
bb->putLong(machineId);
bb->putLong(arbitratorId);
bb->putLong(clientLocalSequenceNumber);
bb->putInt(partNumber);
- bb->putInt(data.length);
+ bb->putInt(data->length());
- if (isLastPart) {
+ if (fldisLastPart) {
bb->put((char)1);
} else {
bb->put((char)0);
}
Entry *TransactionPart::getCopy(Slot *s) {
- TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, isLastPart);
- copyTransaction.setSequenceNumber(sequenceNumber);
+ TransactionPart *copyTransaction = new TransactionPart(s, machineId, arbitratorId, clientLocalSequenceNumber, partNumber, data, fldisLastPart);
+ copyTransaction->setSequenceNumber(sequenceNumber);
return copyTransaction;
}