5 #include "ThreeTuple.h"
9 * IoTTable data structure. Provides client interface.
10 * @author Brian Demsky
15 #define Table_FREE_SLOTS 2
16 // Number of slots that should be kept free // 10
17 #define Table_SKIP_THRESHOLD 10
18 #define Table_RESIZE_MULTIPLE ((double)1.2)
19 #define Table_RESIZE_THRESHOLD ((double)0.75)
20 #define Table_REJECTED_THRESHOLD 5
28 TableStatus *liveTableStatus;
29 PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
30 Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
31 Transaction *firstPendingTransaction; // first transaction in the pending transaction list
34 int numberOfSlots; // Number of slots stored in buffer
35 int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
36 int64_t liveSlotCount;// Number of currently live slots
37 int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
38 int64_t localMachineId; // Machine ID of this client device
39 int64_t sequenceNumber; // Largest sequence number a client has received
40 int64_t localSequenceNumber;
42 // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
43 // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
44 int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
45 int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
46 int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
47 int64_t localArbitrationSequenceNumber;
48 bool hadPartialSendToServer;
49 bool attemptedToSendToServer;
51 bool didFindTableStatus;
54 Slot *lastSlotAttemptedToSend;
57 Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
58 Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
60 void processTransactionList(bool handlePartial);
61 void clearSentParts();
64 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable;// Table of committed key value pairs
65 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
66 Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
67 Hashtable<IoTString *, NewKey *, uintptr_t, 0, hashString, StringEquals> *liveNewKeyTable; // Table of live new keys
68 Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
69 Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
70 Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals> *arbitratorTable;// Table of keys and their arbitrators
71 Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
72 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
73 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
74 Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
75 Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
76 Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
77 Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
78 Hashtable<IoTString *, Commit *, uintptr_t, 0, hashString, StringEquals> *liveCommitsByKeyTable;
79 Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
80 Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
81 Vector<Transaction *> *pendingTransactionQueue;
82 Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
83 Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
84 Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
85 Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
86 Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
87 Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
88 Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
89 Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
90 Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
91 bool lastInsertedNewKey;
92 int64_t lastSeqNumArbOn;
97 * Recalculate the new resize threshold
99 void setResizeThreshold();
100 bool sendToServer(NewKey *newKey);
101 NewKey * handlePartialSend(NewKey * newKey);
102 bool checkSend(Array<Slot *> * array, Slot *checkSlot);
104 bool updateFromLocal(int64_t machineId);
105 Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
106 bool sendSlotsToServer(Slot *slot, int newSize, bool isNewKey, bool * wasInserted, Array<Slot *> **array);
108 * Returns false if a resize was needed
110 bool fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey);
111 void doRejectedMessages(Slot *s);
113 ThreeTuple<bool, bool, int64_t> doMandatoryRescue(Slot *slot, bool resize);
115 void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
117 * Checks for malicious activity and updates the local copy of the block chain.
119 void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
121 void updateLiveStateFromServer();
123 void updateLiveStateFromLocal();
125 void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
127 void updateExpectedSize();
131 * Check the size of the block chain to make sure there are enough slots sent back by the server.
132 * This is only called when we have a gap between the slots that we have locally and the slots
133 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
136 void checkNumSlots(int numberOfSlots);
138 void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
142 * Update the size of of the local buffer if it is needed.
144 void commitNewMaxSize();
147 * Process the new transaction parts from this latest round of slots received from the server
149 void processNewTransactionParts();
153 void arbitrateFromServer();
155 Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
158 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
160 bool compactArbitrationData();
163 * Update all the commits and the committed tables, sets dead the dead transactions
165 bool updateCommittedTable();
168 * Create the speculative table from transactions that are still live and have come from the cloud
170 bool updateSpeculativeTable(bool didProcessNewCommits);
173 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
175 void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
178 * Set dead and remove from the live transaction tables the transactions that are dead
180 void updateLiveTransactionsAndStatus();
183 * Process this slot, entry by entry. Also update the latest message sent by slot
185 void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
188 * Update the last message that was sent for a machine Id
190 void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
193 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
195 void processEntry(NewKey *entry);
198 * Process new table status entries and set dead the old ones as new ones come in.
199 * keeps track of the largest and smallest table status seen in this current round
200 * of updating the local copy of the block chain
202 void processEntry(TableStatus *entry, int64_t seq);
205 * Check old messages to see if there is a block chain violation. Also
207 void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
210 * Check if this abort is live, if not then save it so we can kill it later.
211 * update the last transaction number that was arbitrated on.
213 void processEntry(Abort *entry);
216 * Set dead the transaction part if that transaction is dead and keep track of all new parts
218 void processEntry(TransactionPart *entry);
221 * Process new commit entries and save them for future use. Delete duplicates
223 void processEntry(CommitPart *entry);
226 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
227 * Updates the live aborts, removes those that are dead and sets them dead.
228 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
229 * other clients have not had a rollback on the last message.
231 void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
234 * Add a rejected message entry to the watch set to keep track of which clients have seen that
235 * rejected message entry and which have not.
237 void addWatchVector(int64_t machineId, RejectedMessage *entry);
240 * Check if the HMAC chain is not violated
242 void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
246 Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
247 Table(CloudComm *_cloud, int64_t _localMachineId);
251 * Initialize the table by inserting a table status as the first entry into the table status
252 * also initialize the crypto stuff.
257 * Rebuild the table from scratch by pulling the latest block chain from the server.
261 void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
262 int64_t getArbitrator(IoTString *key);
264 IoTString *getCommitted(IoTString *key);
265 IoTString *getSpeculative(IoTString *key);
266 IoTString *getCommittedAtomic(IoTString *key);
267 IoTString *getSpeculativeAtomic(IoTString *key);
269 bool createNewKey(IoTString *keyName, int64_t machineId);
270 void startTransaction();
271 void put(IoTString *key, IoTString *value);
272 TransactionStatus *commitTransaction();
275 * Get the machine ID for this client
277 int64_t getMachineId() { return localMachineId; }
280 * Decrement the number of live slots that we currently have
282 void decrementLiveCount() { liveSlotCount--; }
283 int64_t getLocalSequenceNumber();
284 Array<char> *acceptDataFromLocal(Array<char> *data);