From: bdemsky Date: Sat, 20 Jan 2018 00:04:11 +0000 (-0800) Subject: edits X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=1ae973a3cc837cb62ef220dedbb731ab3f833719;p=iotcloud.git edits --- diff --git a/version2/src/C/ByteBuffer.h b/version2/src/C/ByteBuffer.h index 086571b..a5b6dcf 100644 --- a/version2/src/C/ByteBuffer.h +++ b/version2/src/C/ByteBuffer.h @@ -12,8 +12,10 @@ public: int32_t getInt(); char get(); void get(Array * array); + void position(int32_t newPosition); Array * array(); private: }; ByteBuffer * ByteBuffer_wrap(Array * array); +ByteBuffer * ByteBuffer_allocate(uint size); #endif diff --git a/version2/src/C/CloudComm.h b/version2/src/C/CloudComm.h index bb319c2..3e0a56e 100644 --- a/version2/src/C/CloudComm.h +++ b/version2/src/C/CloudComm.h @@ -26,10 +26,10 @@ private: SecureRandom *random; Array *salt; Table *table; - int32_t listeningPort = -1; - Thread *localServerThread = NULL; - bool doEnd = false; - TimingSingleton *timer = NULL; + int32_t listeningPort; + Thread *localServerThread; + bool doEnd; + TimingSingleton *timer; /** * Generates Key from password. @@ -89,6 +89,6 @@ public: */ Array *sendLocalData(Array *sendData, int64_t localSequenceNumber, IoTString *host, int port); - public void close(); + void close(); }; #endif diff --git a/version2/src/C/Slot.cc b/version2/src/C/Slot.cc index 2ee6981..542705e 100644 --- a/version2/src/C/Slot.cc +++ b/version2/src/C/Slot.cc @@ -1,25 +1,49 @@ #include "Slot.h" - -Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber) { - seqnum = _seqnum; - machineid = _machineid; - prevhmac = _prevhmac; - hmac = _hmac; - entries = new Vector(); - livecount = 1; - seqnumlive = true; - freespace = SLOT_SIZE - getBaseSize(); - table = _table; - localSequenceNumber = _localSequenceNumber; +#include "ByteBuffer.h" +#include "Entry.h" +#include "Error.h" +#include "CloudComm.h" +#include "Table.h" +#include "LastMessage.h" + +Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, Array *_hmac, int64_t _localSequenceNumber) : + seqnum(_seqnum), + prevhmac(_prevhmac), + hmac(_hmac), + machineid(_machineid), + entries(new Vector()), + livecount(1), + seqnumlive(true), + freespace(SLOT_SIZE - getBaseSize()), + table(_table), + localSequenceNumber(_localSequenceNumber) { } -Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber) { - this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber); -} +Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, int64_t _localSequenceNumber) : + seqnum(_seqnum), + prevhmac(_prevhmac), + hmac(NULL), + machineid(_machineid), + entries(new Vector()), + livecount(1), + seqnumlive(true), + freespace(SLOT_SIZE - getBaseSize()), + table(_table), + localSequenceNumber(_localSequenceNumber) { + } -Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) { - this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber); -} +Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) : + seqnum(_seqnum), + prevhmac(new Array(HMAC_SIZE)), + hmac(NULL), + machineid(_machineid), + entries(new Vector()), + livecount(1), + seqnumlive(true), + freespace(SLOT_SIZE - getBaseSize()), + table(_table), + localSequenceNumber(_localSequenceNumber) { + } Entry *Slot::addEntry(Entry *e) { e = e->getCopy(this); @@ -54,32 +78,32 @@ Vector *Slot::getEntries() { return entries; } -Slot *Slotdecode(Table *table, char *array, Mac *mac) { +Slot *Slotdecode(Table *table, Array *array, Mac *mac) { mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); - char *realmac = mac->doFinal(); + Array *realmac = mac->doFinal(); ByteBuffer *bb = ByteBuffer_wrap(array); - char *hmac = new char[HMAC_SIZE]; - char *prevhmac = new char[HMAC_SIZE]; + Array *hmac = new Array(HMAC_SIZE); + Array *prevhmac = new Array(HMAC_SIZE); bb->get(hmac); bb->get(prevhmac); - if (!Arrays.equals(realmac, hmac)) + if (!realmac->equals(hmac)) throw new Error("Server Error: Invalid HMAC! Potential Attack!"); int64_t seqnum = bb->getLong(); int64_t machineid = bb->getLong(); int numentries = bb->getInt(); - Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1); + Slot *slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1); for (int i = 0; i < numentries; i++) { - slot->addShallowEntry(Entry->decode(slot, bb)); + slot->addShallowEntry(Entry_decode(slot, bb)); } return slot; } -char *Slot::encode(Mac * mac) { - char *array = new char[SLOT_SIZE]; +Array *Slot::encode(Mac * mac) { + Array *array = new Array(SLOT_SIZE); ByteBuffer *bb = ByteBuffer_wrap(array); /* Leave space for the slot HMAC. */ bb->position(HMAC_SIZE); @@ -87,12 +111,12 @@ char *Slot::encode(Mac * mac) { bb->putLong(seqnum); bb->putLong(machineid); bb->putInt(entries->size()); - for (Entry entry : entries) { + for (Entry *entry : entries) { entry->encode(bb); } /* Compute our HMAC */ mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE); - char *realmac = mac->doFinal(); + Array *realmac = mac->doFinal(); hmac = realmac; bb->position(0); bb->put(realmac); @@ -110,7 +134,7 @@ Vector *Slot::getLiveEntries(bool resize) { Vector *liveEntries = new Vector(); for (Entry *entry : entries) { if (entry->isLive()) { - if (!resize || entry->getType() != Entry->TypeTableStatus) + if (!resize || entry->getType() != TypeTableStatus) liveEntries->add(entry); } } @@ -143,8 +167,8 @@ void Slot::decrementLiveCount() { } } -char *Slot::getSlotCryptIV() { - ByteBuffer *buffer = ByteBuffer_allocate(CloudComm.IV_SIZE); +Array *Slot::getSlotCryptIV() { + ByteBuffer *buffer = ByteBuffer_allocate(CloudComm_IV_SIZE); buffer->putLong(machineid); int64_t localSequenceNumberShift = localSequenceNumber << 16; buffer->putLong(localSequenceNumberShift); diff --git a/version2/src/C/Slot.h b/version2/src/C/Slot.h index 76e0fb4..4ffe205 100644 --- a/version2/src/C/Slot.h +++ b/version2/src/C/Slot.h @@ -9,13 +9,13 @@ #define HMAC_SIZE 32 class Slot : public Liveness { -private: + private: /** Sequence number of the slot. */ int64_t seqnum; /** HMAC of previous slot. */ - char *prevhmac; + Array *prevhmac; /** HMAC of this slot. */ - char *hmac; + Array *hmac; /** Machine that sent this slot. */ int64_t machineid; /** Vector of entries in this slot. */ @@ -34,17 +34,17 @@ private: void addShallowEntry(Entry *e); public: - Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber); - Slot(Table _table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber); - Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber); + Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, Array *_hmac, int64_t _localSequenceNumber); + Slot(Table * _table, int64_t _seqnum, int64_t _machineid, Array *_prevhmac, int64_t _localSequenceNumber); + Slot(Table * _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber); - char *getHMAC() { return hmac; } - char *getPrevHMAC() { return prevhmac; } + Array *getHMAC() { return hmac; } + Array *getPrevHMAC() { return prevhmac; } Entry *addEntry(Entry *e); void removeEntry(Entry *e); bool hasSpace(Entry *e); Vector *getEntries(); - char *encode(Mac *mac); + Array *encode(Mac *mac); int getBaseSize() { return 2 * HMAC_SIZE + 2 * sizeof(int64_t) + sizeof(int); } Vector *getLiveEntries(bool resize); int64_t getSequenceNumber() { return seqnum; } @@ -52,8 +52,9 @@ public: void setDead(); void decrementLiveCount(); bool isLive() { return livecount > 0; } - char *getSlotCryptIV(); + Array *getSlotCryptIV(); + friend Slot *Slotdecode(Table *table, Array *array, Mac *mac); }; -Slot *Slotdecode(Table *table, char *array, Mac *mac); +Slot *Slotdecode(Table *table, Array *array, Mac *mac); #endif diff --git a/version2/src/C/Table.cc b/version2/src/C/Table.cc index 1148e93..562e164 100644 --- a/version2/src/C/Table.cc +++ b/version2/src/C/Table.cc @@ -1,16 +1,126 @@ #include "Table.h" -Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) { - localMachineId = _localMachineId; - cloud = new CloudComm(this, baseurl, password, listeningPort); - +Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) : + buffer(NULL), + cloud(new CloudComm(this, baseurl, password, listeningPort)), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumber(0), + localMachineId(_localMachineId), + sequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedSize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastPendingSendArbitrationEntriesToDelete(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ init(); } -Table::Table(CloudComm _cloud, int64_t _localMachineId) { - localMachineId = _localMachineId; - cloud = _cloud; - +Table::Table(CloudComm _cloud, int64_t _localMachineId) : + buffer(NULL), + cloud(_cloud), + random(NULL), + liveTableStatus(NULL), + pendingTransactionBuilder(NULL), + lastPendingTransactionSpeculatedOn(NULL), + firstPendingTransaction(NULL), + numberOfSlots(0), + bufferResizeThreshold(0), + liveSlotCount(0), + oldestLiveSlotSequenceNumber(0), + localMachineId(_localMachineId), + sequenceNumber(0), + localTransactionSequenceNumber(0), + lastTransactionSequenceNumberSpeculatedOn(0), + oldestTransactionSequenceNumberSpeculatedOn(0), + localArbitrationSequenceNumber(0), + hadPartialSendToServer(false), + attemptedToSendToServer(false), + expectedSize(0), + didFindTableStatus(false), + currMaxSize(0), + lastSlotAttemptedToSend(NULL), + lastIsNewKey(false), + lastNewSize(0), + lastTransactionPartsSent(NULL), + lastPendingSendArbitrationEntriesToDelete(NULL), + lastNewKey(NULL), + committedKeyValueTable(NULL), + speculatedKeyValueTable(NULL), + pendingTransactionSpeculatedKeyValueTable(NULL), + liveNewKeyTable(NULL), + lastMessageTable(NULL), + rejectedMessageWatchVectorTable(NULL), + arbitratorTable(NULL), + liveAbortTable(NULL), + newTransactionParts(NULL), + newCommitParts(NULL), + lastArbitratedTransactionNumberByArbitratorTable(NULL), + liveTransactionBySequenceNumberTable(NULL), + liveTransactionByTransactionIdTable(NULL), + liveCommitsTable(NULL), + liveCommitsByKeyTable(NULL), + lastCommitSeenSequenceNumberByArbitratorTable(NULL), + rejectedSlotVector(NULL), + pendingTransactionQueue(NULL), + pendingSendArbitrationRounds(NULL), + pendingSendArbitrationEntriesToDelete(NULL), + transactionPartsSent(NULL), + outstandingTransactionStatus(NULL), + liveAbortsGeneratedByLocal(NULL), + offlineTransactionsCommittedAndAtServer(NULL), + localCommunicationTable(NULL), + lastTransactionSeenFromMachineFromServer(NULL), + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL), + lastInsertedNewKey(false), + lastSeqNumArbOn(0) +{ init(); } @@ -32,7 +142,7 @@ void Table::init() { pendingTransactionSpeculatedKeyValueTable = new Hashtable(); liveNewKeyTable = new Hashtable(); lastMessageTable = new Hashtable >(); - rejectedMessageWatchVectorTable = new Hashtable >(); + rejectedMessageWatchVectorTable = new Hashtable >(); arbitratorTable = new Hashtable(); liveAbortTable = new Hashtable, Abort>(); newTransactionParts = new Hashtable, TransactionPart> >(); @@ -49,7 +159,7 @@ void Table::init() { transactionPartsSent = new Hashtable >(); outstandingTransactionStatus = new Hashtable(); liveAbortsGeneratedByLocal = new Hashtable(); - offlineTransactionsCommittedAndAtServer = new HashSet >(); + offlineTransactionsCommittedAndAtServer = new Hashset >(); localCommunicationTable = new Hashtable >(); lastTransactionSeenFromMachineFromServer = new Hashtable(); pendingSendArbitrationRounds = new Vector(); @@ -66,7 +176,7 @@ synchronized void Table::printSlots() { int64_t o = buffer.getOldestSeqNum(); int64_t n = buffer.getNewestSeqNum(); - int[] types = new int[10]; + Array * types = new Array(10); int num = 0; @@ -152,10 +262,11 @@ synchronized void Table::initTable() { localSequenceNumber++; TableStatus status = new TableStatus(s, numberOfSlots); s.addEntry(status); - Slot[] array = cloud.putSlot(s, numberOfSlots); + Array * array = cloud.putSlot(s, numberOfSlots); if (array == NULL) { - array = new Slot[] {s}; + array = new Array(1); + array->set(0, s); // update local block chain validateAndUpdate(array, true); } else if (array.length == 1) { @@ -171,7 +282,7 @@ synchronized void Table::initTable() { */ synchronized void Table::rebuild() { // Just pull the latest slots from the server - Slot[] newslots = cloud.getSlots(sequenceNumber + 1); + Array * newslots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newslots, true); sendToServer(NULL); updateLiveTransactionsAndStatus(); @@ -286,7 +397,7 @@ synchronized IoTString Table::getSpeculativeAtomic(IoTString key) { synchronized bool Table::update() { try { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + Array * newSlots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(NULL); @@ -376,7 +487,7 @@ synchronized TransactionStatus Table::commitTransaction() { sendToServer(NULL); } catch (ServerException e) { - Set arbitratorTriedAndFailed = new HashSet(); + Set arbitratorTriedAndFailed = new Hashset(); for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { Transaction transaction = iter.next(); @@ -442,10 +553,10 @@ bool Table::sendToServer(NewKey newKey) { try { if (hadPartialSendToServer) { - Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); + Array * newSlots = cloud.getSlots(sequenceNumber + 1); if (newSlots.length == 0) { fromRetry = true; - ThreeTuple sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); + ThreeTuple*> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey); if (sendSlotsReturn.getFirst()) { if (newKey != NULL) { @@ -682,7 +793,7 @@ bool Table::sendToServer(NewKey newKey) { lastPendingSendArbitrationEntriesToDelete = new Vector(pendingSendArbitrationEntriesToDelete); - ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); + ThreeTuple*> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL); if (sendSlotsReturn.getFirst()) { @@ -1061,16 +1172,17 @@ synchronized Array *Table::acceptDataFromLocal(Array *data) { return returnData; } -ThreeTuple Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) { +ThreeTuple*> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) { bool attemptedToSendToServerTmp = attemptedToSendToServer; attemptedToSendToServer = true; bool inserted = false; bool lastTryInserted = false; - Slot[] array = cloud.putSlot(slot, newSize); + Array* array = cloud.putSlot(slot, newSize); if (array == NULL) { - array = new Slot[] {slot}; + array = new Array(); + array->set(0, slot); rejectedSlotVector.clear(); inserted = true; } else { @@ -1120,7 +1232,7 @@ ThreeTuple Table::sendSlotsToServer(Slot slot, int newSize, } } - return new ThreeTuple(inserted, lastTryInserted, array); + return new ThreeTuple*>(inserted, lastTryInserted, array); } /** @@ -1364,7 +1476,7 @@ search: /** * Checks for malicious activity and updates the local copy of the block chain. */ -void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { +void Table::validateAndUpdate(Array* newSlots, bool acceptUpdatesToLocal) { // The cloud communication layer has checked slot HMACs already before decoding if (newSlots.length == 0) { @@ -1385,7 +1497,7 @@ void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) { checkHMACChain(indexer, newSlots); // Set to keep track of messages from clients - HashSet machineSet = new HashSet(lastMessageTable.keySet()); + Hashset machineSet = new Hashset(lastMessageTable.keySet()); // Process each slots data for (Slot slot : newSlots) { @@ -1595,7 +1707,7 @@ void Table::arbitrateFromServer() { // The last transaction arbitrated on int64_t lastTransactionCommitted = -1; - Set generatedAborts = new HashSet(); + Set generatedAborts = new Hashset(); for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); @@ -1746,7 +1858,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + ArbitrationRound * arbitrationRound = new ArbitrationRound(newCommit, new Hashset()); pendingSendArbitrationRounds.add(arbitrationRound); if (compactArbitrationData()) { @@ -1781,7 +1893,7 @@ Pair Table::arbitrateOnLocalTransaction(Transaction transaction) { status.setStatus(TransactionStatus.StatusAborted); } } else { - Set addAbortSet = new HashSet(); + Hashset addAbortSet = new Hashset(); // Create the abort @@ -2029,7 +2141,7 @@ bool Table::updateCommittedTable() { // If we got here then this is a brand new commit and needs full processing // Get what commits should be edited, these are the commits that have live values for their keys - Set commitsToEdit = new HashSet(); + Hashset * commitsToEdit = new Hashset(); for (KeyValue kv : commit.getKeyValueUpdateSet()) { commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey())); } @@ -2114,7 +2226,7 @@ bool Table::updateSpeculativeTable(bool didProcessNewCommits) { return false; // did not speculate } - Set incompleteTransactionArbitrator = new HashSet(); + Hashset * incompleteTransactionArbitrator = new Hashset(); bool didSkip = true; for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) { @@ -2234,7 +2346,7 @@ void Table::updateLiveTransactionsAndStatus() { /** * Process this slot, entry by entry. Also update the latest message sent by slot */ -void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet) { +void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset * machineSet) { // Update the last message seen updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet); @@ -2280,7 +2392,7 @@ void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLoca /** * Update the last message that was sent for a machine Id */ -void Table::processEntry(LastMessage entry, HashSet machineSet) { +void Table::processEntry(LastMessage entry, Hashset* machineSet) { // Update what the last message received by a machine was updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet); } @@ -2350,7 +2462,7 @@ void Table::processEntry(RejectedMessage entry, SlotIndexer indexer) { // Create a list of clients to watch until they see this rejected message entry. - HashSet deviceWatchSet = new HashSet(); + Hashset* deviceWatchSet = new Hashset(); for (Map.Entry > lastMessageEntry : lastMessageTable.entrySet()) { // Machine ID for the last message entry @@ -2524,13 +2636,13 @@ void Table::processEntry(CommitPart entry) { * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message. */ -void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet) { +void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset * machineSet) { // We have seen this machine ID machineSet.remove(machineId); // Get the set of rejected messages that this machine Id is has not seen yet - HashSet watchset = rejectedMessageWatchVectorTable.get(machineId); + Hashset* watchset = rejectedMessageWatchVectorTable.get(machineId); // If there is a rejected message that this machine Id has not seen yet if (watchset != NULL) { @@ -2627,10 +2739,10 @@ void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness livene * rejected message entry and which have not. */ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { - HashSet entries = rejectedMessageWatchVectorTable.get(machineId); + Hashset* entries = rejectedMessageWatchVectorTable.get(machineId); if (entries == NULL) { // There is no set for this machine ID yet so create one - entries = new HashSet(); + entries = new Hashset(); rejectedMessageWatchVectorTable.put(machineId, entries); } entries.add(entry); @@ -2639,7 +2751,7 @@ void Table::addWatchVector(int64_t machineId, RejectedMessage entry) { /** * Check if the HMAC chain is not violated */ -void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) { +void Table::checkHMACChain(SlotIndexer indexer, Array * newSlots) { for (int i = 0; i < newSlots.length; i++) { Slot currSlot = newSlots[i]; Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1); diff --git a/version2/src/C/Table.h b/version2/src/C/Table.h index 3bc9cb9..7944347 100644 --- a/version2/src/C/Table.h +++ b/version2/src/C/Table.h @@ -1,6 +1,8 @@ #ifndef Table_H #define Table_H - +#include "common.h" +#include "Pair.h" +#include "ThreeTuple.h" /** * IoTTable data structure. Provides client interface. * @author Brian Demsky @@ -19,77 +21,81 @@ class Table { private: /* Helper Objects */ SlotBuffer *buffer; - CloudComm *cloud = NULL; - Random *random = NULL; - TableStatus *liveTableStatus = NULL; - PendingTransaction *pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction - Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction - Transaction *firstPendingTransaction = NULL; // first transaction in the pending transaction list + CloudComm *cloud; + Random *random; + TableStatus *liveTableStatus; + PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction + Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction + Transaction *firstPendingTransaction; // first transaction in the pending transaction list /* Variables */ - int numberOfSlots = 0; // Number of slots stored in buffer - int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed - int64_t liveSlotCount = 0;// Number of currently live slots - int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry - int64_t localMachineId = 0; // Machine ID of this client device - int64_t sequenceNumber = 0; // Largest sequence number a client has received - int64_t localSequenceNumber = 0; + int numberOfSlots; // Number of slots stored in buffer + int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed + int64_t liveSlotCount;// Number of currently live slots + int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry + int64_t localMachineId; // Machine ID of this client device + int64_t sequenceNumber; // Largest sequence number a client has received + int64_t localSequenceNumber; // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server - int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions - int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on - int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on - int64_t localArbitrationSequenceNumber = 0; - bool hadPartialSendToServer = false; - bool attemptedToSendToServer = false; + int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions + int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on + int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on + int64_t localArbitrationSequenceNumber; + bool hadPartialSendToServer; + bool attemptedToSendToServer; int64_t expectedsize; - bool didFindTableStatus = false; - int64_t currMaxSize = 0; + bool didFindTableStatus; + int64_t currMaxSize; - Slot *lastSlotAttemptedToSend = NULL; - bool lastIsNewKey = false; - int lastNewSize = 0; - Hashtable *> lastTransactionPartsSent = NULL; - Vector *lastPendingSendArbitrationEntriesToDelete = NULL; - NewKey *lastNewKey = NULL; + Slot *lastSlotAttemptedToSend; + bool lastIsNewKey; + int lastNewSize; + Hashtable *> lastTransactionPartsSent; + Vector *lastPendingSendArbitrationEntriesToDelete; + NewKey *lastNewKey; /* Data Structures */ - Hashtable *committedKeyValueTable = NULL;// Table of committed key value pairs - Hashtable *speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value - Hashtable *pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions - Hashtable *liveNewKeyTable = NULL; // Table of live new keys - Hashtable *> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); - Hashtable *> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet - Hashtable *arbitratorTable = NULL;// Table of keys and their arbitrators - Hashtable *, Abort *> *liveAbortTable = NULL;// Table live abort messages - Hashtable *, TransactionPart *> *> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server - Hashtable *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server - Hashtable *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on - Hashtable *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number - Hashtable *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID - Hashtable > *liveCommitsTable = NULL; - Hashtable *liveCommitsByKeyTable = NULL; - Hashtable *lastCommitSeenSequenceNumberByArbitratorTable = NULL; - Vector *rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server - Vector *pendingTransactionQueue = NULL; - Vector *pendingSendArbitrationRounds = NULL; - Vector *pendingSendArbitrationEntriesToDelete = NULL; - Hashtable *> *transactionPartsSent = NULL; - Hashtable *outstandingTransactionStatus = NULL; - Hashtable *liveAbortsGeneratedByLocal = NULL; - Hashset *> *offlineTransactionsCommittedAndAtServer = NULL; - Hashtable > *localCommunicationTable = NULL; - Hashtable *lastTransactionSeenFromMachineFromServer = NULL; - Hashtable *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL; + Hashtable *committedKeyValueTable;// Table of committed key value pairs + Hashtable *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value + Hashtable *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions + Hashtable *liveNewKeyTable; // Table of live new keys + Hashtable *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); + Hashtable *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet + Hashtable *arbitratorTable;// Table of keys and their arbitrators + Hashtable *, Abort *> *liveAbortTable;// Table live abort messages + Hashtable *, TransactionPart *> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server + Hashtable *, CommitPart *> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server + Hashtable *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on + Hashtable *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number + Hashtable *, Transaction *> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID + Hashtable > *liveCommitsTable; + Hashtable *liveCommitsByKeyTable; + Hashtable *lastCommitSeenSequenceNumberByArbitratorTable; + Vector *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server + Vector *pendingTransactionQueue; + Vector *pendingSendArbitrationRounds; + Vector *pendingSendArbitrationEntriesToDelete; + Hashtable *> *transactionPartsSent; + Hashtable *outstandingTransactionStatus; + Hashtable *liveAbortsGeneratedByLocal; + Hashset *> *offlineTransactionsCommittedAndAtServer; + Hashtable > *localCommunicationTable; + Hashtable *lastTransactionSeenFromMachineFromServer; + Hashtable *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator; + bool lastInsertedNewKey; + int64_t lastSeqNumArbOn; + + void init(); /** * Recalculate the new resize threshold */ void setResizeThreshold(); bool sendToServer(NewKey *newKey); - synchronized bool updateFromLocal(int64_t machineId); + bool updateFromLocal(int64_t machineId); Pair sendTransactionToLocal(Transaction *transaction); ThreeTuple *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey); /** @@ -104,7 +110,7 @@ private: /** * Checks for malicious activity and updates the local copy of the block chain. */ - void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal); + void validateAndUpdate(Array * newSlots, bool acceptUpdatesToLocal); void updateLiveStateFromServer(); @@ -136,7 +142,7 @@ private: */ void processNewTransactionParts(); - int64_t lastSeqNumArbOn = 0; + void arbitrateFromServer(); @@ -170,12 +176,12 @@ private: /** * Process this slot, entry by entry. Also update the latest message sent by slot */ - void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet machineSet); + void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset machineSet); /** * Update the last message that was sent for a machine Id */ - void processEntry(LastMessage entry, HashSet machineSet); + void processEntry(LastMessage entry, Hashset machineSet); /** * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message) @@ -216,7 +222,7 @@ private: * Check that the last message seen is correct and that there is no mismatch of our own last message or that * other clients have not had a rollback on the last message. */ - void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet machineSet); + void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset machineSet); /** * Add a rejected message entry to the watch set to keep track of which clients have seen that @@ -227,11 +233,11 @@ private: /** * Check if the HMAC chain is not violated */ - void checkHMACChain(SlotIndexer indexer, Slot[] newSlots); - bool lastInsertedNewKey = false; + void checkHMACChain(SlotIndexer indexer, Array * newSlots); + public: - Table(String baseurl, String password, int64_t _localMachineId, int listeningPort); + Table(IoTString * baseurl, IoTString * password, int64_t _localMachineId, int listeningPort); Table(CloudComm _cloud, int64_t _localMachineId); /** @@ -244,7 +250,7 @@ public: * Rebuild the table from scratch by pulling the latest block chain from the server. */ void rebuild(); - void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber); + void addLocalCommunication(int64_t arbitrator, IoTString* hostName, int portNumber); uint64_t getArbitrator(IoTString *key); void close(); IoTString *getCommitted(IoTString *key); diff --git a/version2/src/C/array.h b/version2/src/C/array.h index b9d7196..3d77da9 100644 --- a/version2/src/C/array.h +++ b/version2/src/C/array.h @@ -53,6 +53,13 @@ public: ourfree(array); } + bool equals(Array * _array) { + if (_array->size != size) + return false; + int cmp=memcmp(array, _array->array, size * sizeof(type)); + return cmp == 0; + } + type get(uint index) const { return array[index]; } diff --git a/version2/src/C/common.h b/version2/src/C/common.h index 7973b6b..bc23a7c 100644 --- a/version2/src/C/common.h +++ b/version2/src/C/common.h @@ -39,14 +39,18 @@ class SlotBuffer; class SlotIndexer; class Table; class TableStatus; -class ThreeTuple; class TimingSingleton; class Transaction; class TransactionPart; class TransactionStatus; -class Mac; class Error; - - +//Code to write +class SecretKeySpec; +class Mac; +class SecureRandom; +class Thread; +class DataInputStream; +class URL; +class Random; #endif diff --git a/version2/src/C/vector.h b/version2/src/C/vector.h index 5a7770d..00ff962 100644 --- a/version2/src/C/vector.h +++ b/version2/src/C/vector.h @@ -7,58 +7,58 @@ template class Vector { public: Vector(uint _capacity = VECTOR_DEFCAP) : - size(0), + fldsize(0), capacity(_capacity), array((type *) ourmalloc(sizeof(type) * _capacity)) { } Vector(uint _capacity, type *_array) : - size(_capacity), + fldsize(_capacity), capacity(_capacity), array((type *) ourmalloc(sizeof(type) * _capacity)) { memcpy(array, _array, capacity * sizeof(type)); } Vector(Vector *v) : - size(v->size), + fldsize(v->fldsize), capacity(v->capacity), array((type *) ourmalloc(sizeof(type) * v->capacity)) { memcpy(array, v->array, capacity * sizeof(type)); } void pop() { - size--; + fldsize--; } type last() const { - return array[size - 1]; + return array[fldsize - 1]; } void setSize(uint _size) { - if (_size <= size) { - size = _size; + if (_size <= fldsize) { + fldsize = _size; return; } else if (_size > capacity) { array = (type *)ourrealloc(array, _size * sizeof(type)); capacity = _size; } - bzero(&array[size], (_size - size) * sizeof(type)); - size = _size; + bzero(&array[fldsize], (_size - fldsize) * sizeof(type)); + fldsize = _size; } void addAll(Vector *v) { - int oldsize = size; - setSize(size + v->size); - memcpy(&array[size], v->array, v->size * sizeof(type)); + int oldsize = fldsize; + setSize(fldsize + v->fldsize); + memcpy(&array[fldsize], v->array, v->fldsize * sizeof(type)); } void add(type item) { - if (size >= capacity) { + if (fldsize >= capacity) { uint newcap = capacity << 1; array = (type *)ourrealloc(array, newcap * sizeof(type)); capacity = newcap; } - array[size++] = item; + array[fldsize++] = item; } type get(uint index) const { @@ -66,7 +66,7 @@ public: } void setExpand(uint index, type item) { - if (index >= size) + if (index >= fldsize) setSize(index + 1); set(index, item); } @@ -75,12 +75,12 @@ public: array[index] = item; } - uint getSize() const { - return size; + uint size() const { + return fldsize; } bool isEmpty() const { - return size == 0; + return fldsize == 0; } ~Vector() { @@ -88,7 +88,7 @@ public: } void clear() { - size = 0; + fldsize = 0; } type *expose() { @@ -96,7 +96,7 @@ public: } CMEMALLOC; private: - uint size; + uint fldsize; uint capacity; type *array; };