5 #include "ThreeTuple.h"
7 * IoTTable data structure. Provides client interface.
13 #define Table_FREE_SLOTS 2
14 // Number of slots that should be kept free // 10
15 #define Table_SKIP_THRESHOLD 10
16 #define Table_RESIZE_MULTIPLE ((double)1.2)
17 #define Table_RESIZE_THRESHOLD ((double)0.75)
18 #define Table_REJECTED_THRESHOLD 5
26 TableStatus *liveTableStatus;
27 PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
28 Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
29 Transaction *firstPendingTransaction; // first transaction in the pending transaction list
32 int numberOfSlots; // Number of slots stored in buffer
33 int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
34 int64_t liveSlotCount;// Number of currently live slots
35 int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
36 int64_t localMachineId; // Machine ID of this client device
37 int64_t sequenceNumber; // Largest sequence number a client has received
38 int64_t localSequenceNumber;
40 // int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
41 // int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
42 int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
43 int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
44 int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
45 int64_t localArbitrationSequenceNumber;
46 bool hadPartialSendToServer;
47 bool attemptedToSendToServer;
49 bool didFindTableStatus;
52 Slot *lastSlotAttemptedToSend;
55 Hashtable<Transaction *, Vector<int32_t> *> *lastTransactionPartsSent;
56 Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
61 Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
62 Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
63 Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
64 Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
65 Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
66 Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
67 Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
68 Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
69 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
70 Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
71 Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
72 Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
73 Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
74 Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
75 Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
76 Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
77 Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
78 Vector<Transaction *> *pendingTransactionQueue;
79 Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
80 Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
81 Hashtable<Transaction *, Vector<int32_t> *> *transactionPartsSent;
82 Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
83 Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
84 Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
85 Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
86 Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
87 Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
88 bool lastInsertedNewKey;
89 int64_t lastSeqNumArbOn;
94 * Recalculate the new resize threshold
96 void setResizeThreshold();
97 bool sendToServer(NewKey *newKey);
98 bool updateFromLocal(int64_t machineId);
99 Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
100 ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
102 * Returns false if a resize was needed
104 ThreeTuple<bool, int32_t, bool> *fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry);
105 void doRejectedMessages(Slot *s);
107 ThreeTuple<bool, bool, int64_t> doMandatoryResuce(Slot *slot, bool resize);
109 void doOptionalRescue(Slot *s, bool seenliveslot, int64_t seqn, bool resize);
111 * Checks for malicious activity and updates the local copy of the block chain.
113 void validateAndUpdate(Array<Slot *> *newSlots, bool acceptUpdatesToLocal);
115 void updateLiveStateFromServer();
117 void updateLiveStateFromLocal();
119 void initExpectedSize(int64_t firstSequenceNumber, int64_t numberOfSlots);
121 void updateExpectedSize();
125 * Check the size of the block chain to make sure there are enough slots sent back by the server.
126 * This is only called when we have a gap between the slots that we have locally and the slots
127 * sent by the server therefore in the slots sent by the server there will be at least 1 Table
130 void checkNumSlots(int numberOfSlots);
132 void updateCurrMaxSize(int newmaxsize) { currMaxSize = newmaxsize; }
136 * Update the size of of the local buffer if it is needed.
138 void commitNewMaxSize();
141 * Process the new transaction parts from this latest round of slots received from the server
143 void processNewTransactionParts();
147 void arbitrateFromServer();
149 Pair<bool, bool> arbitrateOnLocalTransaction(Transaction *transaction);
152 * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
154 bool compactArbitrationData();
157 * Update all the commits and the committed tables, sets dead the dead transactions
159 bool updateCommittedTable();
162 * Create the speculative table from transactions that are still live and have come from the cloud
164 bool updateSpeculativeTable(bool didProcessNewCommits);
167 * Create the pending transaction speculative table from transactions that are still in the pending transaction buffer
169 void updatePendingTransactionSpeculativeTable(bool didProcessNewCommitsOrSpeculate);
172 * Set dead and remove from the live transaction tables the transactions that are dead
174 void updateLiveTransactionsAndStatus();
177 * Process this slot, entry by entry. Also update the latest message sent by slot
179 void processSlot(SlotIndexer *indexer, Slot *slot, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
182 * Update the last message that was sent for a machine Id
184 void processEntry(LastMessage *entry, Hashset<int64_t> *machineSet);
187 * Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
189 void processEntry(NewKey *entry);
192 * Process new table status entries and set dead the old ones as new ones come in.
193 * keeps track of the largest and smallest table status seen in this current round
194 * of updating the local copy of the block chain
196 void processEntry(TableStatus *entry, int64_t seq);
199 * Check old messages to see if there is a block chain violation. Also
201 void processEntry(RejectedMessage *entry, SlotIndexer *indexer);
204 * Check if this abort is live, if not then save it so we can kill it later.
205 * update the last transaction number that was arbitrated on.
207 void processEntry(Abort *entry);
210 * Set dead the transaction part if that transaction is dead and keep track of all new parts
212 void processEntry(TransactionPart *entry);
215 * Process new commit entries and save them for future use. Delete duplicates
217 void processEntry(CommitPart *entry);
220 * Update the last message seen table. Update and set dead the appropriate RejectedMessages as clients see them.
221 * Updates the live aborts, removes those that are dead and sets them dead.
222 * Check that the last message seen is correct and that there is no mismatch of our own last message or that
223 * other clients have not had a rollback on the last message.
225 void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness *liveness, bool acceptUpdatesToLocal, Hashset<int64_t> *machineSet);
228 * Add a rejected message entry to the watch set to keep track of which clients have seen that
229 * rejected message entry and which have not.
231 void addWatchVector(int64_t machineId, RejectedMessage *entry);
234 * Check if the HMAC chain is not violated
236 void checkHMACChain(SlotIndexer *indexer, Array<Slot *> *newSlots);
240 Table(IoTString *baseurl, IoTString *password, int64_t _localMachineId, int listeningPort);
241 Table(CloudComm *_cloud, int64_t _localMachineId);
244 * Initialize the table by inserting a table status as the first entry into the table status
245 * also initialize the crypto stuff.
250 * Rebuild the table from scratch by pulling the latest block chain from the server.
254 void addLocalCommunication(int64_t arbitrator, IoTString *hostName, int portNumber);
255 int64_t getArbitrator(IoTString *key);
257 IoTString *getCommitted(IoTString *key);
258 IoTString *getSpeculative(IoTString *key);
259 IoTString *getCommittedAtomic(IoTString *key);
260 IoTString *getSpeculativeAtomic(IoTString *key);
262 bool createNewKey(IoTString *keyName, int64_t machineId);
263 void startTransaction();
264 void addKV(IoTString *key, IoTString *value);
265 TransactionStatus *commitTransaction();
268 * Get the machine ID for this client
270 int64_t getMachineId() { return localMachineId; }
273 * Decrement the number of live slots that we currently have
275 void decrementLiveCount() { liveSlotCount--; }
276 int64_t getLocalSequenceNumber();
277 Array<char> *acceptDataFromLocal(Array<char> *data);