1 #include "Transaction.h"
2 #include "TransactionPart.h"
4 #include "ByteBuffer.h"
6 #include "TransactionStatus.h"
8 Transaction::Transaction() :
9 parts(new Hashtable<int32_t, TransactionPart *>()),
11 partsPendingSend(new Vector<int32_t>()),
14 keyValueGuardSet(new Hashset<KeyValue *>()),
15 keyValueUpdateSet(new Hashset<KeyValue *>()),
18 clientLocalSequenceNumber(-1),
22 hadServerFailure(false) {
25 void Transaction::addPartEncode(TransactionPart *newPart) {
26 parts->put(newPart->getPartNumber(), newPart);
27 partsPendingSend->add(newPart->getPartNumber());
29 sequenceNumber = newPart->getSequenceNumber();
30 arbitratorId = newPart->getArbitratorId();
31 transactionId = newPart->getTransactionId();
32 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
33 machineId = newPart->getMachineId();
38 void Transaction::addPartDecode(TransactionPart *newPart) {
40 // If dead then just kill this part and move on
45 sequenceNumber = newPart->getSequenceNumber();
46 arbitratorId = newPart->getArbitratorId();
47 transactionId = newPart->getTransactionId();
48 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
49 machineId = newPart->getMachineId();
51 TransactionPart * previoslySeenPart = parts->put(newPart->getPartNumber(), newPart);
53 if (previoslySeenPart != NULL) {
54 // Set dead the old one since the new one is a rescued version of this part
55 previoslySeenPart->setDead();
56 } else if (newPart->isLastPart()) {
57 missingParts = new Hashset<int32_t>();
60 for (int i = 0; i < newPart->getPartNumber(); i++) {
61 if (parts->get(i) == NULL) {
67 if (!fldisComplete && hasLastPart) {
69 // We have seen this part so remove it from the set of missing parts
70 missingParts->remove(newPart->getPartNumber());
72 // Check if all the parts have been seen
73 if (missingParts->size() == 0) {
75 // We have all the parts
78 // Decode all the parts and create the key value guard and update sets
79 decodeTransactionData();
84 void Transaction::addUpdateKV(KeyValue *kv) {
85 keyValueUpdateSet->add(kv);
88 void Transaction::addGuardKV(KeyValue *kv) {
89 keyValueGuardSet->add(kv);
93 int64_t Transaction::getSequenceNumber() {
94 return sequenceNumber;
97 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
98 sequenceNumber = _sequenceNumber;
100 for (int32_t i : parts->keySet()) {
101 parts->get(i)->setSequenceNumber(sequenceNumber);
105 int64_t Transaction::getClientLocalSequenceNumber() {
106 return clientLocalSequenceNumber;
109 Hashtable<int32_t, TransactionPart *> *Transaction::getParts() {
113 bool Transaction::didSendAPartToServer() {
114 return flddidSendAPartToServer;
117 void Transaction::resetNextPartToSend() {
121 TransactionPart *Transaction::getNextPartToSend() {
122 if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
125 TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
131 void Transaction::setServerFailure() {
132 hadServerFailure = true;
135 bool Transaction::getServerFailure() {
136 return hadServerFailure;
140 void Transaction::resetServerFailure() {
141 hadServerFailure = false;
145 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
146 transactionStatus = _transactionStatus;
149 TransactionStatus *Transaction::getTransactionStatus() {
150 return transactionStatus;
153 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
155 if (partsPendingSend->removeAll(sentParts))
157 flddidSendAPartToServer = true;
158 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
162 bool Transaction::didSendAllParts() {
163 return partsPendingSend->isEmpty();
166 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
167 return keyValueUpdateSet;
170 int Transaction::getNumberOfParts() {
171 return parts->size();
174 int64_t Transaction::getMachineId() {
178 int64_t Transaction::getArbitrator() {
182 bool Transaction::isComplete() {
183 return fldisComplete;
186 Pair<int64_t, int64_t> *Transaction::getId() {
187 return transactionId;
190 void Transaction::setDead() {
199 // Make all the parts of this transaction dead
200 for (int32_t partNumber : parts->keySet()) {
201 TransactionPart* part = parts->get(partNumber);
206 TransactionPart *Transaction::getPart(int index) {
207 return parts->get(index);
210 void Transaction::decodeTransactionData() {
212 // Calculate the size of the data section
214 for (int i = 0; i < parts->keySet()->size(); i++) {
215 TransactionPart *tp = parts->get(i);
216 dataSize += tp->getDataSize();
219 Array<char> *combinedData = new Array<char>(dataSize);
220 int currentPosition = 0;
222 // Stitch all the data sections together
223 for (int i = 0; i < parts->keySet()->size(); i++) {
224 TransactionPart *tp = parts->get(i);
225 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
226 currentPosition += tp->getDataSize();
230 ByteBuffer* bbDecode = ByteBuffer_wrap(combinedData);
232 // Decode how many key value pairs need to be decoded
233 int numberOfKVGuards = bbDecode->getInt();
234 int numberOfKVUpdates = bbDecode->getInt();
236 // Decode all the guard key values
237 for (int i = 0; i < numberOfKVGuards; i++) {
238 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
239 keyValueGuardSet->add(kv);
242 // Decode all the updates key values
243 for (int i = 0; i < numberOfKVUpdates; i++) {
244 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
245 keyValueUpdateSet->add(kv);
249 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
250 for (KeyValue *kvGuard : keyValueGuardSet) {
252 // First check if the key is in the speculative table, this is the value of the latest assumption
255 // If we have a speculation table then use it first
256 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
257 kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
260 // If we have a speculation table then use it first
261 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
262 kv = speculatedKeyValueTable->get(kvGuard->getKey());
266 // if it is not in the speculative table then check the committed table and use that
267 // value as our latest assumption
268 kv = committedKeyValueTable->get(kvGuard->getKey());
271 if (kvGuard->getValue() != NULL) {
272 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
276 printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
278 printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());