int32_t getInt();
char get();
void get(Array<char> * array);
+ Array<char> * array();
private:
};
+ByteBuffer * ByteBuffer_wrap(Array<char> * array);
#endif
* Returns the length in chars of the IoTString.
*/
+ bool equals(IoTString * str) {
+ uint strlength = str->array->length();
+ uint thislength = array->length();
+ if (strlength != thislength)
+ return false;
+
+ int result = memcmp(str->array->internalArray(), array->internalArray(), strlength);
+ return result == 0;
+ }
+
int length() { return array->length(); }
friend IoTString *IoTString_shallow(Array<char> *_array);
};
#include "PendingTransaction.h"
+#include "KeyValue.h"
+#include "IoTString.h"
+#include "Transaction.h"
+#include "TransactionPart.h"
+#include "ByteBuffer.h"
PendingTransaction::PendingTransaction(int64_t _machineId) :
keyValueUpdateSet(new Hashset<KeyValue *>()),
- keyValueGuardSet(new HashSet<KeyValue *>()),
+ keyValueGuardSet(new Hashset<KeyValue *>()),
arbitrator(-1),
clientLocalSequenceNumber(-1),
machineId(_machineId),
*/
void PendingTransaction::addKV(KeyValue *newKV) {
- KeyValue rmKV = NULL;
+ KeyValue * rmKV = NULL;
// Make sure there are no duplicates
- for (KeyValue kv : keyValueUpdateSet) {
- if (kv.getKey().equals(newKV.getKey())) {
+ SetIterator<KeyValue *> * kvit = keyValueUpdateSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ if (kv->getKey()->equals(newKV->getKey())) {
// Remove key if we are adding a newer version of the same key
rmKV = kv;
break;
}
}
-
+ delete kvit;
+
// Remove key if we are adding a newer version of the same key
if (rmKV != NULL) {
- keyValueUpdateSet.remove(rmKV);
- currentDataSize -= rmKV.getSize();
+ keyValueUpdateSet->remove(rmKV);
+ currentDataSize -= rmKV->getSize();
}
// Add the key to the hash set
- keyValueUpdateSet.add(newKV);
- currentDataSize += newKV.getSize();
+ keyValueUpdateSet->add(newKV);
+ currentDataSize += newKV->getSize();
}
/**
*/
void PendingTransaction::addKVGuard(KeyValue *newKV) {
// Add the key to the hash set
- keyValueGuardSet.add(newKV);
- currentDataSize += newKV.getSize();
+ keyValueGuardSet->add(newKV);
+ currentDataSize += newKV->getSize();
}
/**
return arb == arbitrator;
}
-bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative) {
- for (KeyValue kvGuard : keyValueGuardSet) {
+bool PendingTransaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> * keyValTableCommitted, Hashtable<IoTString *, KeyValue *> * keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> * keyValTablePendingTransSpeculative) {
+ SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kvGuard = kvit->next();
// First check if the key is in the speculative table, this is the value of the latest assumption
- KeyValue kv = keyValTablePendingTransSpeculative.get(kvGuard.getKey());
+ KeyValue * kv = keyValTablePendingTransSpeculative->get(kvGuard->getKey());
if (kv == NULL) {
// if it is not in the pending trans table then check the speculative table and use that
// value as our latest assumption
- kv = keyValTableSpeculative.get(kvGuard.getKey());
+ kv = keyValTableSpeculative->get(kvGuard->getKey());
}
if (kv == NULL) {
// if it is not in the speculative table then check the committed table and use that
// value as our latest assumption
- kv = keyValTableCommitted.get(kvGuard.getKey());
+ kv = keyValTableCommitted->get(kvGuard->getKey());
}
- if (kvGuard.getValue() != NULL) {
- if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
+ if (kvGuard->getValue() != NULL) {
+ if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
+ delete kvit;
return false;
}
} else {
if (kv != NULL) {
+ delete kvit;
return false;
}
}
}
+ delete kvit;
return true;
}
Array<char> *charData = convertDataToBytes();
int currentPosition = 0;
- int remaining = charData.length;
+ int remaining = charData->length();
while (remaining > 0) {
bool isLastPart = false;
// determine how much to copy
- int copySize = TransactionPart.MAX_NON_HEADER_SIZE;
- if (remaining <= TransactionPart.MAX_NON_HEADER_SIZE) {
+ int copySize = TransactionPart_MAX_NON_HEADER_SIZE;
+ if (remaining <= TransactionPart_MAX_NON_HEADER_SIZE) {
copySize = remaining;
isLastPart = true;// last bit of data so last part
}
// Copy to a smaller version
- Array<char> *partData = new char[copySize];
- System.arraycopy(charData, currentPosition, partData, 0, copySize);
+ Array<char> *partData = new Array<char>(copySize);
+ System_arraycopy(charData, currentPosition, partData, 0, copySize);
- TransactionPart part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
- newTransaction.addPartEncode(part);
+ TransactionPart * part = new TransactionPart(NULL, machineId, arbitrator, clientLocalSequenceNumber, transactionPartCount, partData, isLastPart);
+ newTransaction->addPartEncode(part);
// Update position, count and remaining
currentPosition += copySize;
}
// Add the Guard Conditions
- for (KeyValue kv : keyValueGuardSet) {
- newTransaction.addGuardKV(kv);
+ SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ newTransaction->addGuardKV(kv);
}
-
+ delete kvit;
+
// Add the updates
- for (KeyValue kv : keyValueUpdateSet) {
- newTransaction.addUpdateKV(kv);
+ kvit = keyValueUpdateSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
+ newTransaction->addUpdateKV(kv);
}
-
+ delete kvit;
return newTransaction;
}
-Arrar<char> *PendingTransaction::convertDataToBytes() {
+Array<char> *PendingTransaction::convertDataToBytes() {
// Calculate the size of the data
int sizeOfData = 2 * sizeof(int32_t); // Number of Update KV's and Guard KV's
sizeOfData += currentDataSize;
ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
// Encode the size of the updates and guard sets
- bbEncode->putInt(keyValueGuardSet.size());
- bbEncode->putInt(keyValueUpdateSet.size());
+ bbEncode->putInt(keyValueGuardSet->size());
+ bbEncode->putInt(keyValueUpdateSet->size());
// Encode all the guard conditions
- for (KeyValue kv : keyValueGuardSet) {
+ SetIterator<KeyValue *> * kvit = keyValueGuardSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
kv->encode(bbEncode);
}
-
+ delete kvit;
+
// Encode all the updates
- for (KeyValue kv : keyValueUpdateSet) {
+ kvit = keyValueUpdateSet->iterator();
+ while(kvit->hasNext()) {
+ KeyValue *kv = kvit->next();
kv->encode(bbEncode);
}
-
+ delete kvit;
+
return bbEncode->array();
}
class PendingTransaction {
private:
- Hashset<KeyValue *> *keyValueUpdateSet = NULL;
- Hashset<KeyValue *> *keyValueGuardSet = NULL;
- int64_t arbitrator = -1;
- int64_t clientLocalSequenceNumber = -1;
- int64_t machineId = -1;
- int32_T currentDataSize = 0;
+ Hashset<KeyValue *> *keyValueUpdateSet;
+ Hashset<KeyValue *> *keyValueGuardSet;
+ int64_t arbitrator;
+ int64_t clientLocalSequenceNumber;
+ int64_t machineId;
+ int32_t currentDataSize;
public:
PendingTransaction(int64_t _machineId);
* Add a new key value to the updates
*
*/
- void addKV(KeyValue newKV);
+ void addKV(KeyValue * newKV);
/**
* Add a new key value to the guard set
*
*/
- void addKVGuard(KeyValue newKV);
+ void addKVGuard(KeyValue * newKV);
/**
* Checks if the arbitrator is the same
*/
/**
* Get the key value update set
*/
- public Hashset<KeyValue *> *getKVGuard() { return keyValueGuardSet; }
+ Hashset<KeyValue *> *getKVGuard() { return keyValueGuardSet; }
void setClientLocalSequenceNumber(int64_t _clientLocalSequenceNumber) { clientLocalSequenceNumber = _clientLocalSequenceNumber; }
int64_t getMachineId() { return machineId; }
- bool evaluateGuard(Hashtable<IoTString *, KeyValue *> keyValTableCommitted, Hashtable<IoTString *, KeyValue *> keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> keyValTablePendingTransSpeculative);
+ bool evaluateGuard(Hashtable<IoTString *, KeyValue *> * keyValTableCommitted, Hashtable<IoTString *, KeyValue *> * keyValTableSpeculative, Hashtable<IoTString *, KeyValue *> * keyValTablePendingTransSpeculative);
Transaction *createTransaction();
#include "Transaction.h"
-Transaction::Transaction() {
- parts = new Hashtable<int32_t, TransactionPart>();
- keyValueGuardSet = new HashSet<KeyValue>();
- keyValueUpdateSet = new HashSet<KeyValue>();
- partsPendingSend = new Vector<int32_t>();
+Transaction::Transaction() :
+ parts(new Hashtable<int32_t, TransactionPart>()),
+ missingParts(NULL),
+ partsPendingSend(new Vector<int32_t>()),
+ fldisComplete(false),
+ hasLastPart(false),
+ keyValueGuardSet(new HashSet<KeyValue>()),
+ keyValueUpdateSet(new HashSet<KeyValue>()),
+ isDead(false),
+ sequenceNumber(-1),
+ clientLocalSequenceNumber(-1),
+ arbitratorId(-1),
+ machineId(-1),
+ transactionId(NULL),
+ hadServerFailure(false) {
}
void Transaction::addPartEncode(TransactionPart *newPart) {
clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
machineId = newPart.getMachineId();
- isComplete = true;
+ fldisComplete = true;
}
void Transaction::addPartDecode(TransactionPart *newPart) {
}
}
- if (!isComplete && hasLastPart) {
+ if (!fldisComplete && hasLastPart) {
// We have seen this part so remove it from the set of missing parts
missingParts.remove(newPart.getPartNumber());
if (missingParts.size() == 0) {
// We have all the parts
- isComplete = true;
+ fldisComplete = true;
// Decode all the parts and create the key value guard and update sets
decodeTransactionData();
}
bool Transaction::didSendAPartToServer() {
- return didSendAPartToServer;
+ return flddidSendAPartToServer;
}
void Transaction::resetNextPartToSend() {
nextPartToSend = 0;
if (partsPendingSend.removeAll(sentParts))
{
- didSendAPartToServer = true;
+ flddidSendAPartToServer = true;
transactionStatus.setTransactionSequenceNumber(sequenceNumber);
}
}
}
bool Transaction::isComplete() {
- return isComplete;
+ return fldisComplete;
}
Pair<int64_t, int64_t> *Transaction::getId() {
#ifndef TRANSACTION_H
#define TRANSACTION_H
#include "common.h"
+#include "Pair.h"
class Transaction {
private:
- Hashtable<int32_t, TransactionPart *> *parts = NULL;
- Hashset<int32_t> *missingParts = NULL;
- Vector<int32_t> *partsPendingSend = NULL;
- bool isComplete = false;
- bool hasLastPart = false;
- Hashset<KeyValue *> *keyValueGuardSet = NULL;
- Hashset<KeyValue *> *keyValueUpdateSet = NULL;
- bool isDead = false;
- int64_t sequenceNumber = -1;
- int64_t clientLocalSequenceNumber = -1;
- int64_t arbitratorId = -1;
- int64_t machineId = -1;
- Pair<uint64_t, uint64_t> *transactionId = NULL;
- int nextPartToSend = 0;
- bool didSendAPartToServer = false;
- TransactionStatus *transactionStatus = NULL;
- bool hadServerFailure = false;
+ Hashtable<int32_t, TransactionPart *> *parts;
+ Hashset<int32_t> *missingParts;
+ Vector<int32_t> *partsPendingSend;
+ bool fldisComplete;
+ bool hasLastPart;
+ Hashset<KeyValue *> *keyValueGuardSet;
+ Hashset<KeyValue *> *keyValueUpdateSet;
+ bool isDead;
+ int64_t sequenceNumber;
+ int64_t clientLocalSequenceNumber;
+ int64_t arbitratorId;
+ int64_t machineId;
+ Pair<uint64_t, uint64_t> *transactionId;
+ int nextPartToSend;
+ bool flddidSendAPartToServer;
+ TransactionStatus *transactionStatus;
+ bool hadServerFailure;
void decodeTransactionData();
public:
#ifndef ARRAY_H
#define ARRAY_H
#include <inttypes.h>
+#include "common.h"
typedef uint32_t uint;
type *array;
uint size;
};
+
+template<typename type>
+void System_arraycopy(Array<type> * src, int32_t srcPos, Array<type> *dst, int32_t dstPos, int32_t len) {
+ if (srcPos + len > src->length() ||
+ dstPos + len > dst->length())
+ ASSERT(0);
+ uint bytesToCopy = len * sizeof(type);
+ memcpy(&dst->internalArray()[dstPos], &src->internalArray()[srcPos], bytesToCopy);
+}
#endif
typedef uint32_t uint;
#define CMEMALLOC ;
#define model_print printf
+#define ASSERT(expr) \
+ do { \
+ if (!(expr)) { \
+ fprintf(stderr, "Error: assertion failed in %s at line %d\n", __FILE__, __LINE__); \
+ /* print_trace(); // Trace printing may cause dynamic memory allocation */ \
+ exit(EXIT_FAILURE); \
+ } \
+ } while (0)
#include "hashset.h"
#include "vector.h"
class Mac;
class Error;
-#define ASSERT(expr) \
- do { \
- if (!(expr)) { \
- fprintf(stderr, "Error: assertion failed in %s at line %d\n", __FILE__, __LINE__); \
- /* print_trace(); // Trace printing may cause dynamic memory allocation */ \
- exit(EXIT_FAILURE); \
- } \
- } while (0)
+
#endif
template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key), bool (*equals)(_Key, _Key)>
class Hashset;
-template<typename _Key, typename _KeyInt, int _Shift, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
+template<typename _Key, typename _KeyInt = uintptr_t, int _Shift = 0, unsigned int (*hash_function)(_Key) = defaultHashFunction<_Key, _Shift, _KeyInt>, bool (*equals)(_Key, _Key) = defaultEquals<_Key> >
class SetIterator {
public:
SetIterator(Linknode<_Key> *_curr, Hashset <_Key, _KeyInt, _Shift, hash_function, equals> *_set) :