5 #include "ThreeTuple.h"
9 * IoTTable data structure. Provides client interface.
10 * @author Brian Demsky
15 #define Table_FREE_SLOTS 2
16 // Number of slots that should be kept free // 10
17 #define Table_SKIP_THRESHOLD 10
18 #define Table_RESIZE_MULTIPLE ((double)1.2)
19 #define Table_RESIZE_THRESHOLD ((double)0.75)
20 #define Table_REJECTED_THRESHOLD 5
28 TableStatus *liveTableStatus;
29 PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
30 Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
31 Transaction *firstPendingTransaction; // first transaction in the pending transaction list
34 int numberOfSlots; // Number of slots stored in buffer
35 int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
36 int64_t liveSlotCount;// Number of currently live slots
37 int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
38 int64_t localMachineId; // Machine ID of this client device
39 int64_t sequenceNumber; // Largest sequence number a client has received
40 int64_t localSequenceNumber;
42 // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
43 // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
44 int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
45 int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
46 int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
47 int64_t localArbitrationSequenceNumber;
48 bool hadPartialSendToServer;
49 bool attemptedToSendToServer;
51 bool didFindTableStatus;
54 Slot *lastSlotAttemptedToSend;
57 Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
58 Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
63 Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
64 Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
65 Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
66 Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
67 Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
68 Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
69 Hashtable<IoTString *, int64_t, uintptr_t, 0, hashString, StringEquals> *arbitratorTable;// Table of keys and their arbitrators
70 Hashtable<Pair<int64_t, int64_t> *, Abort *, uintptr_t, 0, pairHashFunction, pairEquals> *liveAbortTable;// Table live abort messages
71 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
72 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *, uintptr_t, 0, pairHashFunction, pairEquals> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
73 Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
74 Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
75 Hashtable<Pair<int64_t, int64_t> *, Transaction *, uintptr_t, 0, pairHashFunction, pairEquals> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
76 Hashtable<int64_t, Hashtable<int64_t, Commit *> *> *liveCommitsTable;
77 Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
78 Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
79 Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
80 Vector<Transaction *> *pendingTransactionQueue;
81 Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
82 Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
83 Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
84 Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
85 Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
86 Hashset<Pair<int64_t, int64_t> *, uintptr_t, 0, pairHashFunction, pairEquals> *offlineTransactionsCommittedAndAtServer;
87 Hashtable<int64_t, Pair<IoTString *, int32_t> *> *localCommunicationTable;
88 Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
89 Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
90 bool lastInsertedNewKey;
91 int64_t lastSeqNumArbOn;
96 * Recalculate the new resize threshold
98 void setResizeThreshold();
99 bool sendToServer(NewKey *newKey);
100 bool updateFromLocal(int64_t machineId);
101 Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
102 ThreeTuple<bool, bool, Array<Slot *> *> sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
104 * Returns false if a resize was needed
106 ThreeTuple<bool, int32_t, bool> fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
107 void doRejectedMessages(Slot *s);
109 ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
111 void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
113 * Checks for malicious activity and updates the local copy of the block chain.
115 void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
117 void updateLiveStateFromServer();
119 void updateLiveStateFromLocal();
121 void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
123 void updateExpectedSize();
127 * Check the size of the block chain to make sure there are enough slots sent back by the server.
128 * This is only called when we have a gap between the slots that we have locally and the slots
129 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
132 void checkNumSlots(int numberOfSlots);
134 void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
138 * Update the size of of the local buffer if it is needed.
140 void commitNewMaxSize();
143 * Process the new transaction parts from this latest round of slots received from the server
145 void processNewTransactionParts();
149 void arbitrateFromServer();
151 Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
154 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
156 bool compactArbitrationData();
159 * Update all the commits and the committed tables, sets dead the dead transactions
161 bool updateCommittedTable();
164 * Create the speculative table from transactions that are still live and have come from the cloud
166 bool updateSpeculativeTable(bool didProcessNewCommits);
169 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
171 void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
174 * Set dead and remove from the live transaction tables the transactions that are dead
176 void updateLiveTransactionsAndStatus();
179 * Process this slot, entry by entry. Also update the latest message sent by slot
181 void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
184 * Update the last message that was sent for a machine Id
186 void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
189 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
191 void processEntry(NewKey *entry);
194 * Process new table status entries and set dead the old ones as new ones come in.
195 * keeps track of the largest and smallest table status seen in this current round
196 * of updating the local copy of the block chain
198 void processEntry(TableStatus *entry, int64_t seq);
201 * Check old messages to see if there is a block chain violation. Also
203 void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
206 * Check if this abort is live, if not then save it so we can kill it later.
207 * update the last transaction number that was arbitrated on.
209 void processEntry(Abort *entry);
212 * Set dead the transaction part if that transaction is dead and keep track of all new parts
214 void processEntry(TransactionPart *entry);
217 * Process new commit entries and save them for future use. Delete duplicates
219 void processEntry(CommitPart *entry);
222 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
223 * Updates the live aborts, removes those that are dead and sets them dead.
224 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
225 * other clients have not had a rollback on the last message.
227 void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
230 * Add a rejected message entry to the watch set to keep track of which clients have seen that
231 * rejected message entry and which have not.
233 void addWatchVector(int64_t machineId, RejectedMessage *entry);
236 * Check if the HMAC chain is not violated
238 void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
242 Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
243 Table(CloudComm *_cloud, int64_t _localMachineId);
247 * Initialize the table by inserting a table status as the first entry into the table status
248 * also initialize the crypto stuff.
253 * Rebuild the table from scratch by pulling the latest block chain from the server.
257 void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
258 int64_t getArbitrator(IoTString *key);
260 IoTString *getCommitted(IoTString *key);
261 IoTString *getSpeculative(IoTString *key);
262 IoTString *getCommittedAtomic(IoTString *key);
263 IoTString *getSpeculativeAtomic(IoTString *key);
265 bool createNewKey(IoTString *keyName, int64_t machineId);
266 void startTransaction();
267 void addKV(IoTString *key, IoTString *value);
268 TransactionStatus *commitTransaction();
271 * Get the machine ID for this client
273 int64_t getMachineId() { return localMachineId; }
276 * Decrement the number of live slots that we currently have
278 void decrementLiveCount() { liveSlotCount--; }
279 int64_t getLocalSequenceNumber();
280 Array<char> *acceptDataFromLocal(Array<char> *data);