edits
[iotcloud.git] / version2 / src / C / Table.cc
index de0a1dafa95a15df4f784b17bd21a292a29ea1ca..562e164be72e6b371afc62ee7ce9050e0cecd264 100644 (file)
 #include "Table.h"
 
-Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
-       localMachineId = _localMachineId;
-       cloud = new CloudComm(this, baseurl, password, listeningPort);
-
+Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) :
+       buffer(NULL),
+       cloud(new CloudComm(this, baseurl, password, listeningPort)),
+       random(NULL),
+       liveTableStatus(NULL),
+       pendingTransactionBuilder(NULL),
+       lastPendingTransactionSpeculatedOn(NULL),
+       firstPendingTransaction(NULL),
+       numberOfSlots(0),
+       bufferResizeThreshold(0),
+       liveSlotCount(0),
+       oldestLiveSlotSequenceNumber(0),
+       localMachineId(_localMachineId),
+       sequenceNumber(0),
+       localTransactionSequenceNumber(0),
+       lastTransactionSequenceNumberSpeculatedOn(0),
+       oldestTransactionSequenceNumberSpeculatedOn(0),
+       localArbitrationSequenceNumber(0),
+       hadPartialSendToServer(false),
+       attemptedToSendToServer(false),
+       expectedSize(0),
+       didFindTableStatus(false),
+       currMaxSize(0),
+       lastSlotAttemptedToSend(NULL),
+       lastIsNewKey(false),
+       lastNewSize(0),
+       lastTransactionPartsSent(NULL),
+       lastPendingSendArbitrationEntriesToDelete(NULL),
+       lastNewKey(NULL),
+       committedKeyValueTable(NULL),
+       speculatedKeyValueTable(NULL),
+       pendingTransactionSpeculatedKeyValueTable(NULL),
+  liveNewKeyTable(NULL),
+       lastMessageTable(NULL),
+       rejectedMessageWatchVectorTable(NULL),
+       arbitratorTable(NULL),
+       liveAbortTable(NULL),
+       newTransactionParts(NULL),
+       newCommitParts(NULL),
+       lastArbitratedTransactionNumberByArbitratorTable(NULL),
+       liveTransactionBySequenceNumberTable(NULL),
+       liveTransactionByTransactionIdTable(NULL),
+       liveCommitsTable(NULL),
+       liveCommitsByKeyTable(NULL),
+       lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+       rejectedSlotVector(NULL),
+       pendingTransactionQueue(NULL),
+       pendingSendArbitrationRounds(NULL),
+       pendingSendArbitrationEntriesToDelete(NULL),
+       transactionPartsSent(NULL),
+       outstandingTransactionStatus(NULL),
+       liveAbortsGeneratedByLocal(NULL),
+       offlineTransactionsCommittedAndAtServer(NULL),
+       localCommunicationTable(NULL),
+       lastTransactionSeenFromMachineFromServer(NULL),
+       lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+       lastInsertedNewKey(false),
+       lastSeqNumArbOn(0)
+{
        init();
 }
 
-Table::Table(CloudComm _cloud, int64_t _localMachineId) {
-       localMachineId = _localMachineId;
-       cloud = _cloud;
-
+Table::Table(CloudComm _cloud, int64_t _localMachineId) :
+       buffer(NULL),
+       cloud(_cloud),
+       random(NULL),
+       liveTableStatus(NULL),
+       pendingTransactionBuilder(NULL),
+       lastPendingTransactionSpeculatedOn(NULL),
+       firstPendingTransaction(NULL),
+       numberOfSlots(0),
+       bufferResizeThreshold(0),
+       liveSlotCount(0),
+       oldestLiveSlotSequenceNumber(0),
+       localMachineId(_localMachineId),
+       sequenceNumber(0),
+       localTransactionSequenceNumber(0),
+       lastTransactionSequenceNumberSpeculatedOn(0),
+       oldestTransactionSequenceNumberSpeculatedOn(0),
+       localArbitrationSequenceNumber(0),
+       hadPartialSendToServer(false),
+       attemptedToSendToServer(false),
+       expectedSize(0),
+       didFindTableStatus(false),
+       currMaxSize(0),
+       lastSlotAttemptedToSend(NULL),
+       lastIsNewKey(false),
+       lastNewSize(0),
+       lastTransactionPartsSent(NULL),
+       lastPendingSendArbitrationEntriesToDelete(NULL),
+       lastNewKey(NULL),
+       committedKeyValueTable(NULL),
+       speculatedKeyValueTable(NULL),
+       pendingTransactionSpeculatedKeyValueTable(NULL),
+  liveNewKeyTable(NULL),
+       lastMessageTable(NULL),
+       rejectedMessageWatchVectorTable(NULL),
+       arbitratorTable(NULL),
+       liveAbortTable(NULL),
+       newTransactionParts(NULL),
+       newCommitParts(NULL),
+       lastArbitratedTransactionNumberByArbitratorTable(NULL),
+       liveTransactionBySequenceNumberTable(NULL),
+       liveTransactionByTransactionIdTable(NULL),
+       liveCommitsTable(NULL),
+       liveCommitsByKeyTable(NULL),
+       lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+       rejectedSlotVector(NULL),
+       pendingTransactionQueue(NULL),
+       pendingSendArbitrationRounds(NULL),
+       pendingSendArbitrationEntriesToDelete(NULL),
+       transactionPartsSent(NULL),
+       outstandingTransactionStatus(NULL),
+       liveAbortsGeneratedByLocal(NULL),
+       offlineTransactionsCommittedAndAtServer(NULL),
+       localCommunicationTable(NULL),
+       lastTransactionSeenFromMachineFromServer(NULL),
+       lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+       lastInsertedNewKey(false),
+       lastSeqNumArbOn(0)
+{
        init();
 }
 
@@ -32,7 +142,7 @@ void Table::init() {
        pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
        liveNewKeyTable = new Hashtable<IoTString, NewKey>();
        lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
-       rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
+       rejectedMessageWatchVectorTable = new Hashtable<int64_t Hashset<RejectedMessage> >();
        arbitratorTable = new Hashtable<IoTString, Long>();
        liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
        newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
@@ -49,7 +159,7 @@ void Table::init() {
        transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
        outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
        liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
-       offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+       offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
        localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
        lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
        pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
@@ -66,7 +176,7 @@ synchronized void Table::printSlots() {
        int64_t o = buffer.getOldestSeqNum();
        int64_t n = buffer.getNewestSeqNum();
 
-       int[] types = new int[10];
+       Array<int> * types = new Array<int>(10);
 
        int num = 0;
 
@@ -144,7 +254,7 @@ synchronized void Table::printSlots() {
  * Initialize the table by inserting a table status as the first entry into the table status
  * also initialize the crypto stuff.
  */
-synchronized void Table::initTable() throws ServerException {
+synchronized void Table::initTable() {
        cloud.initSecurity();
 
        // Create the first insertion into the block chain which is the table status
@@ -152,10 +262,11 @@ synchronized void Table::initTable() throws ServerException {
        localSequenceNumber++;
        TableStatus status = new TableStatus(s, numberOfSlots);
        s.addEntry(status);
-       Slot[] array = cloud.putSlot(s, numberOfSlots);
+       Array<Slot> * array = cloud.putSlot(s, numberOfSlots);
 
        if (array == NULL) {
-               array = new Slot[] {s};
+               array = new Array<Slot>(1);
+               array->set(0, s);
                // update local block chain
                validateAndUpdate(array, true);
        } else if (array.length == 1) {
@@ -169,9 +280,9 @@ synchronized void Table::initTable() throws ServerException {
 /**
  * Rebuild the table from scratch by pulling the latest block chain from the server.
  */
-synchronized void Table::rebuild() throws ServerException {
+synchronized void Table::rebuild() {
        // Just pull the latest slots from the server
-       Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
+       Array<Slot> * newslots = cloud.getSlots(sequenceNumber + 1);
        validateAndUpdate(newslots, true);
        sendToServer(NULL);
        updateLiveTransactionsAndStatus();
@@ -286,7 +397,7 @@ synchronized IoTString Table::getSpeculativeAtomic(IoTString key) {
 
 synchronized bool Table::update()  {
        try {
-               Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+               Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
                validateAndUpdate(newSlots, false);
                sendToServer(NULL);
 
@@ -305,7 +416,7 @@ synchronized bool Table::update()  {
        return false;
 }
 
-synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) throws ServerException {
+synchronized bool Table::createNewKey(IoTString keyName, int64_t machineId) {
        while (true) {
                if (arbitratorTable.get(keyName) != NULL) {
                        // There is already an arbitrator
@@ -376,7 +487,7 @@ synchronized TransactionStatus Table::commitTransaction() {
                sendToServer(NULL);
        } catch (ServerException e) {
 
-               Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+               Set<Long> arbitratorTriedAndFailed = new Hashset<Long>();
                for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
                        Transaction transaction = iter.next();
 
@@ -436,16 +547,16 @@ int64_t Table::getLocalSequenceNumber() {
 
 bool lastInsertedNewKey = false;
 
-bool Table::sendToServer(NewKey newKey) throws ServerException {
+bool Table::sendToServer(NewKey newKey) {
 
        bool fromRetry = false;
 
        try {
                if (hadPartialSendToServer) {
-                       Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+                       Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
                        if (newSlots.length == 0) {
                                fromRetry = true;
-                               ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+                               ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
 
                                if (sendSlotsReturn.getFirst()) {
                                        if (newKey != NULL) {
@@ -682,7 +793,7 @@ bool Table::sendToServer(NewKey newKey) throws ServerException {
                        lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
 
 
-                       ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+                       ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
 
                        if (sendSlotsReturn.getFirst()) {
 
@@ -1061,17 +1172,17 @@ synchronized Array<char> *Table::acceptDataFromLocal(Array<char> *data) {
        return returnData;
 }
 
-ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey)  throws ServerException {
-
+ThreeTuple<bool, bool, Array<Slot>*> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
        bool attemptedToSendToServerTmp = attemptedToSendToServer;
        attemptedToSendToServer = true;
 
        bool inserted = false;
        bool lastTryInserted = false;
 
-       Slot[] array = cloud.putSlot(slot, newSize);
+       Array<Slot>* array = cloud.putSlot(slot, newSize);
        if (array == NULL) {
-               array = new Slot[] {slot};
+               array = new Array<Slot>();
+               array->set(0, slot);
                rejectedSlotVector.clear();
                inserted = true;
        } else {
@@ -1121,7 +1232,7 @@ ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize,
                }
        }
 
-       return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
+       return new ThreeTuple<bool, bool, Array<Slot>*>(inserted, lastTryInserted, array);
 }
 
 /**
@@ -1365,7 +1476,7 @@ search:
 /**
  * Checks for malicious activity and updates the local copy of the block chain.
  */
-void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
+void Table::validateAndUpdate(Array<Slot>* newSlots, bool acceptUpdatesToLocal) {
 
        // The cloud communication layer has checked slot HMACs already before decoding
        if (newSlots.length == 0) {
@@ -1386,7 +1497,7 @@ void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
        checkHMACChain(indexer, newSlots);
 
        // Set to keep track of messages from clients
-       HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
+       Hashset<Long> machineSet = new Hashset<int64_t>(lastMessageTable.keySet());
 
        // Process each slots data
        for (Slot slot : newSlots) {
@@ -1596,7 +1707,7 @@ void Table::arbitrateFromServer() {
 
        // The last transaction arbitrated on
        int64_t lastTransactionCommitted = -1;
-       Set<Abort> generatedAborts = new HashSet<Abort>();
+       Set<Abort> generatedAborts = new Hashset<Abort>();
 
        for (Long transactionSequenceNumber : transactionSequenceNumbers) {
                Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
@@ -1747,7 +1858,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
                newCommit.createCommitParts();
 
                // Append all the commit parts to the end of the pending queue waiting for sending to the server
-               ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+               ArbitrationRound * arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
                pendingSendArbitrationRounds.add(arbitrationRound);
 
                if (compactArbitrationData()) {
@@ -1782,7 +1893,7 @@ Pair<bool, bool> Table::arbitrateOnLocalTransaction(Transaction transaction) {
                                status.setStatus(TransactionStatus.StatusAborted);
                        }
                } else {
-                       Set addAbortSet = new HashSet<Abort>();
+                       Hashset<Abort*> addAbortSet = new Hashset<Abort* >();
 
 
                        // Create the abort
@@ -2030,7 +2141,7 @@ bool Table::updateCommittedTable() {
                        // If we got here then this is a brand new commit and needs full processing
 
                        // Get what commits should be edited, these are the commits that have live values for their keys
-                       Set<Commit> commitsToEdit = new HashSet<Commit>();
+                       Hashset<Commit*> * commitsToEdit = new Hashset<Commit*>();
                        for (KeyValue kv : commit.getKeyValueUpdateSet()) {
                                commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
                        }
@@ -2115,7 +2226,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) {
                return false;           // did not speculate
        }
 
-       Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
+       Hashset<int64_t> * incompleteTransactionArbitrator = new Hashset<int64_t>();
        bool didSkip = true;
 
        for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
@@ -2235,7 +2346,7 @@ void Table::updateLiveTransactionsAndStatus() {
 /**
  * Process this slot, entry by entry.  Also update the latest message sent by slot
  */
-void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> * machineSet) {
 
        // Update the last message seen
        updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
@@ -2281,7 +2392,7 @@ void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLoca
 /**
  * Update the last message that was sent for a machine Id
  */
-void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
+void Table::processEntry(LastMessage entry, Hashset<int64_t>* machineSet) {
        // Update what the last message received by a machine was
        updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
 }
@@ -2351,7 +2462,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) {
 
 
        // Create a list of clients to watch until they see this rejected message entry.
-       HashSet<Long> deviceWatchSet = new HashSet<Long>();
+       Hashset<int64_t>* deviceWatchSet = new Hashset<int64_t>();
        for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
 
                // Machine ID for the last message entry
@@ -2525,13 +2636,13 @@ void Table::processEntry(CommitPart entry) {
  * Check that the last message seen is correct and that there is no mismatch of our own last message or that
  * other clients have not had a rollback on the last message.
  */
-void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> * machineSet) {
 
        // We have seen this machine ID
        machineSet.remove(machineId);
 
        // Get the set of rejected messages that this machine Id is has not seen yet
-       HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
+       Hashset<RejectedMessage*>* watchset = rejectedMessageWatchVectorTable.get(machineId);
 
        // If there is a rejected message that this machine Id has not seen yet
        if (watchset != NULL) {
@@ -2628,10 +2739,10 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene
  * rejected message entry and which have not.
  */
 void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
-       HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
+       Hashset<RejectedMessage*>* entries = rejectedMessageWatchVectorTable.get(machineId);
        if (entries == NULL) {
                // There is no set for this machine ID yet so create one
-               entries = new HashSet<RejectedMessage>();
+               entries = new Hashset<RejectedMessage*>();
                rejectedMessageWatchVectorTable.put(machineId, entries);
        }
        entries.add(entry);
@@ -2640,7 +2751,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
 /**
  * Check if the HMAC chain is not violated
  */
-void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
+void Table::checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots) {
        for (int i = 0; i < newSlots.length; i++) {
                Slot currSlot = newSlots[i];
                Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);