1 #include "Transaction.h"
3 Transaction::Transaction() :
4 parts(new Hashtable<int32_t, TransactionPart *>()),
6 partsPendingSend(new Vector<int32_t>()),
9 keyValueGuardSet(new Hashset<KeyValue *>()),
10 keyValueUpdateSet(new Hashset<KeyValue *>()),
13 clientLocalSequenceNumber(-1),
17 hadServerFailure(false) {
20 void Transaction::addPartEncode(TransactionPart *newPart) {
21 parts->put(newPart->getPartNumber(), newPart);
22 partsPendingSend->add(newPart->getPartNumber());
24 sequenceNumber = newPart->getSequenceNumber();
25 arbitratorId = newPart->getArbitratorId();
26 transactionId = newPart->getTransactionId();
27 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
28 machineId = newPart->getMachineId();
33 void Transaction::addPartDecode(TransactionPart *newPart) {
35 // If dead then just kill this part and move on
40 sequenceNumber = newPart->getSequenceNumber();
41 arbitratorId = newPart->getArbitratorId();
42 transactionId = newPart->getTransactionId();
43 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
44 machineId = newPart->getMachineId();
46 TransactionPart previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
48 if (previoslySeenPart != NULL) {
49 // Set dead the old one since the new one is a rescued version of this part
50 previoslySeenPart->setDead();
51 } else if (newPart->isLastPart()) {
52 missingParts = new Hashset<int32_t>();
55 for (int i = 0; i < newPart->getPartNumber(); i++) {
56 if (parts->get(i) == NULL) {
62 if (!fldisComplete && hasLastPart) {
64 // We have seen this part so remove it from the set of missing parts
65 missingParts->remove(newPart->getPartNumber());
67 // Check if all the parts have been seen
68 if (missingParts->size() == 0) {
70 // We have all the parts
73 // Decode all the parts and create the key value guard and update sets
74 decodeTransactionData();
79 void Transaction::addUpdateKV(KeyValue *kv) {
80 keyValueUpdateSet->add(kv);
83 void Transaction::addGuardKV(KeyValue *kv) {
84 keyValueGuardSet->add(kv);
88 int64_t Transaction::getSequenceNumber() {
89 return sequenceNumber;
92 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
93 sequenceNumber = _sequenceNumber;
95 for (int32_t i : parts->keySet()) {
96 parts->get(i)->setSequenceNumber(sequenceNumber);
100 int64_t Transaction::getClientLocalSequenceNumber() {
101 return clientLocalSequenceNumber;
104 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
108 bool Transaction::didSendAPartToServer() {
109 return flddidSendAPartToServer;
112 void Transaction::resetNextPartToSend() {
116 TransactionPart *Transaction::getNextPartToSend() {
117 if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
120 TransactionPart part = parts->get(partsPendingSend->get(nextPartToSend));
126 void Transaction::setServerFailure() {
127 hadServerFailure = true;
130 bool Transaction::getServerFailure() {
131 return hadServerFailure;
135 void Transaction::resetServerFailure() {
136 hadServerFailure = false;
140 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
141 transactionStatus = _transactionStatus;
144 TransactionStatus *Transaction::getTransactionStatus() {
145 return transactionStatus;
148 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
150 if (partsPendingSend->removeAll(sentParts))
152 flddidSendAPartToServer = true;
153 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
157 bool Transaction::didSendAllParts() {
158 return partsPendingSend->isEmpty();
161 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
162 return keyValueUpdateSet;
165 int Transaction::getNumberOfParts() {
166 return parts->size();
169 int64_t Transaction::getMachineId() {
173 int64_t Transaction::getArbitrator() {
177 bool Transaction::isComplete() {
178 return fldisComplete;
181 Pair<int64_t, int64_t> *Transaction::getId() {
182 return transactionId;
185 void Transaction::setDead() {
194 // Make all the parts of this transaction dead
195 for (int32_t partNumber : parts->keySet()) {
196 TransactionPart part = parts->get(partNumber);
201 TransactionPart *Transaction::getPart(int index) {
202 return parts->get(index);
205 void Transaction::decodeTransactionData() {
207 // Calculate the size of the data section
209 for (int i = 0; i < parts->keySet()->size(); i++) {
210 TransactionPart tp = parts->get(i);
211 dataSize += tp->getDataSize();
214 Array<char> *combinedData = new char[dataSize];
215 int currentPosition = 0;
217 // Stitch all the data sections together
218 for (int i = 0; i < parts->keySet()->size(); i++) {
219 TransactionPart tp = parts->get(i);
220 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
221 currentPosition += tp->getDataSize();
225 ByteBuffer bbDecode = ByteBuffer_wrap(combinedData);
227 // Decode how many key value pairs need to be decoded
228 int numberOfKVGuards = bbDecode->getInt();
229 int numberOfKVUpdates = bbDecode->getInt();
231 // Decode all the guard key values
232 for (int i = 0; i < numberOfKVGuards; i++) {
233 KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
234 keyValueGuardSet->add(kv);
237 // Decode all the updates key values
238 for (int i = 0; i < numberOfKVUpdates; i++) {
239 KeyValue * kv = (KeyValue *)KeyValue_decode(bbDecode);
240 keyValueUpdateSet->add(kv);
244 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
245 for (KeyValue *kvGuard : keyValueGuardSet) {
247 // First check if the key is in the speculative table, this is the value of the latest assumption
248 KeyValue * kv = NULL;
250 // If we have a speculation table then use it first
251 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
252 kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
255 // If we have a speculation table then use it first
256 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
257 kv = speculatedKeyValueTable->get(kvGuard->getKey());
261 // if it is not in the speculative table then check the committed table and use that
262 // value as our latest assumption
263 kv = committedKeyValueTable->get(kvGuard->getKey());
266 if (kvGuard->getValue() != NULL) {
267 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
271 System.out.println(kvGuard->getValue() + " " + kv->getValue());
273 System.out.println(kvGuard->getValue() + " " + kv);