5 Hashtable<int32_t, TransactionPart> parts = NULL;
6 Set<int32_t> missingParts = NULL;
7 Vector<int32_t> partsPendingSend = NULL;
8 bool isComplete = false;
9 bool hasLastPart = false;
10 Set<KeyValue> keyValueGuardSet = NULL;
11 Set<KeyValue> keyValueUpdateSet = NULL;
13 int64_t sequenceNumber = -1;
14 int64_t clientLocalSequenceNumber = -1;
15 int64_t arbitratorId = -1;
16 int64_t machineId = -1;
17 Pair<int64_t, int64_t> transactionId = NULL;
19 int nextPartToSend = 0;
20 bool didSendAPartToServer = false;
22 TransactionStatus transactionStatus = NULL;
24 bool hadServerFailure = false;
27 parts = new Hashtable<int32_t, TransactionPart>();
28 keyValueGuardSet = new HashSet<KeyValue>();
29 keyValueUpdateSet = new HashSet<KeyValue>();
30 partsPendingSend = new Vector<int32_t>();
33 void addPartEncode(TransactionPart newPart) {
34 parts.put(newPart.getPartNumber(), newPart);
35 partsPendingSend.add(newPart.getPartNumber());
37 sequenceNumber = newPart.getSequenceNumber();
38 arbitratorId = newPart.getArbitratorId();
39 transactionId = newPart.getTransactionId();
40 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
41 machineId = newPart.getMachineId();
46 void addPartDecode(TransactionPart newPart) {
49 // If dead then just kill this part and move on
54 sequenceNumber = newPart.getSequenceNumber();
55 arbitratorId = newPart.getArbitratorId();
56 transactionId = newPart.getTransactionId();
57 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
58 machineId = newPart.getMachineId();
60 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
62 if (previoslySeenPart != NULL) {
63 // Set dead the old one since the new one is a rescued version of this part
64 previoslySeenPart.setDead();
65 } else if (newPart.isLastPart()) {
66 missingParts = new HashSet<int32_t>();
69 for (int i = 0; i < newPart.getPartNumber(); i++) {
70 if (parts.get(i) == NULL) {
76 if (!isComplete && hasLastPart) {
78 // We have seen this part so remove it from the set of missing parts
79 missingParts.remove(newPart.getPartNumber());
81 // Check if all the parts have been seen
82 if (missingParts.size() == 0) {
84 // We have all the parts
87 // Decode all the parts and create the key value guard and update sets
88 decodeTransactionData();
93 void addUpdateKV(KeyValue kv) {
94 keyValueUpdateSet.add(kv);
97 void addGuardKV(KeyValue kv) {
98 keyValueGuardSet.add(kv);
102 int64_t getSequenceNumber() {
103 return sequenceNumber;
106 void setSequenceNumber(int64_t _sequenceNumber) {
107 sequenceNumber = _sequenceNumber;
109 for (int32_t i : parts.keySet()) {
110 parts.get(i).setSequenceNumber(sequenceNumber);
114 int64_t getClientLocalSequenceNumber() {
115 return clientLocalSequenceNumber;
118 Hashtable<int32_t, TransactionPart> getParts() {
122 bool didSendAPartToServer() {
123 return didSendAPartToServer;
126 void resetNextPartToSend() {
130 TransactionPart getNextPartToSend() {
131 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
134 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
140 void setServerFailure() {
141 hadServerFailure = true;
144 bool getServerFailure() {
145 return hadServerFailure;
149 void resetServerFailure() {
150 hadServerFailure = false;
154 void setTransactionStatus(TransactionStatus _transactionStatus) {
155 transactionStatus = _transactionStatus;
158 TransactionStatus getTransactionStatus() {
159 return transactionStatus;
162 void removeSentParts(Vector<int32_t> sentParts) {
164 if(partsPendingSend.removeAll(sentParts))
166 didSendAPartToServer = true;
167 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
171 bool didSendAllParts() {
172 return partsPendingSend.isEmpty();
175 Set<KeyValue> getKeyValueUpdateSet() {
176 return keyValueUpdateSet;
179 int getNumberOfParts() {
183 int64_t getMachineId() {
187 int64_t getArbitrator() {
195 Pair<int64_t, int64_t> getId() {
196 return transactionId;
208 // Make all the parts of this transaction dead
209 for (int32_t partNumber : parts.keySet()) {
210 TransactionPart part = parts.get(partNumber);
215 TransactionPart getPart(int index) {
216 return parts.get(index);
219 void decodeTransactionData() {
221 // Calculate the size of the data section
223 for (int i = 0; i < parts.keySet().size(); i++) {
224 TransactionPart tp = parts.get(i);
225 dataSize += tp.getDataSize();
228 char[] combinedData = new char[dataSize];
229 int currentPosition = 0;
231 // Stitch all the data sections together
232 for (int i = 0; i < parts.keySet().size(); i++) {
233 TransactionPart tp = parts.get(i);
234 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
235 currentPosition += tp.getDataSize();
239 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
241 // Decode how many key value pairs need to be decoded
242 int numberOfKVGuards = bbDecode.getInt();
243 int numberOfKVUpdates = bbDecode.getInt();
245 // Decode all the guard key values
246 for (int i = 0; i < numberOfKVGuards; i++) {
247 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
248 keyValueGuardSet.add(kv);
251 // Decode all the updates key values
252 for (int i = 0; i < numberOfKVUpdates; i++) {
253 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
254 keyValueUpdateSet.add(kv);
258 bool evaluateGuard(Hashtable<IoTString, KeyValue> committedKeyValueTable, Hashtable<IoTString, KeyValue> speculatedKeyValueTable, Hashtable<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
259 for (KeyValue kvGuard : keyValueGuardSet) {
261 // First check if the key is in the speculative table, this is the value of the latest assumption
264 // If we have a speculation table then use it first
265 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
266 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
269 // If we have a speculation table then use it first
270 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
271 kv = speculatedKeyValueTable.get(kvGuard.getKey());
275 // if it is not in the speculative table then check the committed table and use that
276 // value as our latest assumption
277 kv = committedKeyValueTable.get(kvGuard.getKey());
280 if (kvGuard.getValue() != NULL) {
281 if ((kv == NULL) || (!kvGuard.getValue().equals(kv.getValue()))) {
285 System.out.println(kvGuard.getValue() + " " + kv.getValue());
287 System.out.println(kvGuard.getValue() + " " + kv);