#include "Table.h"
-Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
- localMachineId = _localMachineId;
- cloud = new CloudComm(this, baseurl, password, listeningPort);
-
+Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) :
+ buffer(NULL),
+ cloud(new CloudComm(this, baseurl, password, listeningPort)),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumber(0),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedSize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastPendingSendArbitrationEntriesToDelete(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
init();
}
-Table::Table(CloudComm _cloud, int64_t _localMachineId) {
- localMachineId = _localMachineId;
- cloud = _cloud;
-
+Table::Table(CloudComm _cloud, int64_t _localMachineId) :
+ buffer(NULL),
+ cloud(_cloud),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumber(0),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedSize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastPendingSendArbitrationEntriesToDelete(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
init();
}
pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
liveNewKeyTable = new Hashtable<IoTString, NewKey>();
lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
- rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t Hashset<RejectedMessage> >();
arbitratorTable = new Hashtable<IoTString, Long>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
- offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+ offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
int64_t o = buffer.getOldestSeqNum();
int64_t n = buffer.getNewestSeqNum();
- int[] types = new int[10];
+ Array<int> * types = new Array<int>(10);
int num = 0;
* Initialize the table by inserting a table status as the first entry into the table status
* also initialize the crypto stuff.
*/
-synchronized void Table::initTable() throws ServerException {
+synchronized void Table::initTable() {
cloud.initSecurity();
// Create the first insertion into the block chain which is the table status
localSequenceNumber++;
TableStatus status = new TableStatus(s, numberOfSlots);
s.addEntry(status);
- Slot[] array = cloud.putSlot(s, numberOfSlots);
+ Array<Slot> * array = cloud.putSlot(s, numberOfSlots);
if (array == NULL) {
- array = new Slot[] {s};
+ array = new Array<Slot>(1);
+ array->set(0, s);
// update local block chain
validateAndUpdate(array, true);
} else if (array.length == 1) {
/**
* Rebuild the table from scratch by pulling the latest block chain from the server.
*/
-synchronized void Table::rebuild() throws ServerException {
+synchronized void Table::rebuild() {
// Just pull the latest slots from the server
- Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newslots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newslots, true);
sendToServer(NULL);
updateLiveTransactionsAndStatus();
synchronized bool Table::update() {
try {
- Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newSlots, false);
sendToServer(NULL);
return false;
}
-synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) throws ServerException {
+synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) {
while (true) {
if (arbitratorTable.get(keyName) != NULL) {
// There is already an arbitrator
sendToServer(NULL);
} catch (ServerException e) {
- Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+ Set<Long> arbitratorTriedAndFailed = new Hashset<Long>();
for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
Transaction transaction = iter.next();
bool lastInsertedNewKey = false;
-bool Table::sendToServer(NewKey newKey) throws ServerException {
+bool Table::sendToServer(NewKey newKey) {
bool fromRetry = false;
try {
if (hadPartialSendToServer) {
- Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
if (newSlots.length == 0) {
fromRetry = true;
- ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+ ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+ ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
if (sendSlotsReturn.getFirst()) {
return returnData;
}
-ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) throws ServerException {
-
+ThreeTuple<bool, bool, Array<Slot>*> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
bool attemptedToSendToServerTmp = attemptedToSendToServer;
attemptedToSendToServer = true;
bool inserted = false;
bool lastTryInserted = false;
- Slot[] array = cloud.putSlot(slot, newSize);
+ Array<Slot>* array = cloud.putSlot(slot, newSize);
if (array == NULL) {
- array = new Slot[] {slot};
+ array = new Array<Slot>();
+ array->set(0, slot);
rejectedSlotVector.clear();
inserted = true;
} else {
}
}
- return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
+ return new ThreeTuple<bool, bool, Array<Slot>*>(inserted, lastTryInserted, array);
}
/**
/**
* Checks for malicious activity and updates the local copy of the block chain.
*/
-void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
+void Table::validateAndUpdate(Array<Slot>* newSlots, bool acceptUpdatesToLocal) {
// The cloud communication layer has checked slot HMACs already before decoding
if (newSlots.length == 0) {
checkHMACChain(indexer, newSlots);
// Set to keep track of messages from clients
- HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
+ Hashset<Long> machineSet = new Hashset<int64_t>(lastMessageTable.keySet());
// Process each slots data
for (Slot slot : newSlots) {
// The last transaction arbitrated on
int64_t lastTransactionCommitted = -1;
- Set<Abort> generatedAborts = new HashSet<Abort>();
+ Set<Abort> generatedAborts = new Hashset<Abort>();
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+ ArbitrationRound * arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
pendingSendArbitrationRounds.add(arbitrationRound);
if (compactArbitrationData()) {
status.setStatus(TransactionStatus.StatusAborted);
}
} else {
- Set addAbortSet = new HashSet<Abort>();
+ Hashset<Abort*> addAbortSet = new Hashset<Abort* >();
// Create the abort
// If we got here then this is a brand new commit and needs full processing
// Get what commits should be edited, these are the commits that have live values for their keys
- Set<Commit> commitsToEdit = new HashSet<Commit>();
+ Hashset<Commit*> * commitsToEdit = new Hashset<Commit*>();
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
}
return false; // did not speculate
}
- Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
+ Hashset<int64_t> * incompleteTransactionArbitrator = new Hashset<int64_t>();
bool didSkip = true;
for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
-void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> * machineSet) {
// Update the last message seen
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
/**
* Update the last message that was sent for a machine Id
*/
-void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
+void Table::processEntry(LastMessage entry, Hashset<int64_t>* machineSet) {
// Update what the last message received by a machine was
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
// Create a list of clients to watch until they see this rejected message entry.
- HashSet<Long> deviceWatchSet = new HashSet<Long>();
+ Hashset<int64_t>* deviceWatchSet = new Hashset<int64_t>();
for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
* Check that the last message seen is correct and that there is no mismatch of our own last message or that
* other clients have not had a rollback on the last message.
*/
-void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> * machineSet) {
// We have seen this machine ID
machineSet.remove(machineId);
// Get the set of rejected messages that this machine Id is has not seen yet
- HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
+ Hashset<RejectedMessage*>* watchset = rejectedMessageWatchVectorTable.get(machineId);
// If there is a rejected message that this machine Id has not seen yet
if (watchset != NULL) {
* rejected message entry and which have not.
*/
void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
- HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
+ Hashset<RejectedMessage*>* entries = rejectedMessageWatchVectorTable.get(machineId);
if (entries == NULL) {
// There is no set for this machine ID yet so create one
- entries = new HashSet<RejectedMessage>();
+ entries = new Hashset<RejectedMessage*>();
rejectedMessageWatchVectorTable.put(machineId, entries);
}
entries.add(entry);
/**
* Check if the HMAC chain is not violated
*/
-void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
+void Table::checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots) {
for (int i = 0; i < newSlots.length; i++) {
Slot currSlot = newSlots[i];
Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);