1 #include "Transaction.h"
2 #include "TransactionPart.h"
4 #include "ByteBuffer.h"
6 #include "TransactionStatus.h"
8 Transaction::Transaction() :
9 parts(new Vector<TransactionPart *>()),
12 partsPendingSend(new Vector<int32_t>()),
15 keyValueGuardSet(new Hashset<KeyValue *>()),
16 keyValueUpdateSet(new Hashset<KeyValue *>()),
19 clientLocalSequenceNumber(-1),
22 transactionId(Pair<int64_t, int64_t>(0,0)),
23 hadServerFailure(false) {
26 void Transaction::addPartEncode(TransactionPart *newPart) {
27 TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
30 partsPendingSend->add(newPart->getPartNumber());
32 sequenceNumber = newPart->getSequenceNumber();
33 arbitratorId = newPart->getArbitratorId();
34 transactionId = newPart->getTransactionId();
35 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
36 machineId = newPart->getMachineId();
41 void Transaction::addPartDecode(TransactionPart *newPart) {
43 // If dead then just kill this part and move on
48 sequenceNumber = newPart->getSequenceNumber();
49 arbitratorId = newPart->getArbitratorId();
50 transactionId = newPart->getTransactionId();
51 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
52 machineId = newPart->getMachineId();
54 TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
55 if (previouslySeenPart == NULL)
58 if (previouslySeenPart != NULL) {
59 // Set dead the old one since the new one is a rescued version of this part
60 previouslySeenPart->setDead();
61 } else if (newPart->isLastPart()) {
62 missingParts = new Hashset<int32_t>();
65 for (int i = 0; i < newPart->getPartNumber(); i++) {
66 if (parts->get(i) == NULL) {
72 if (!fldisComplete && hasLastPart) {
74 // We have seen this part so remove it from the set of missing parts
75 missingParts->remove(newPart->getPartNumber());
77 // Check if all the parts have been seen
78 if (missingParts->size() == 0) {
80 // We have all the parts
83 // Decode all the parts and create the key value guard and update sets
84 decodeTransactionData();
89 void Transaction::addUpdateKV(KeyValue *kv) {
90 keyValueUpdateSet->add(kv);
93 void Transaction::addGuardKV(KeyValue *kv) {
94 keyValueGuardSet->add(kv);
98 int64_t Transaction::getSequenceNumber() {
99 return sequenceNumber;
102 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
103 sequenceNumber = _sequenceNumber;
105 for (int32_t i = 0; i < parts->size(); i++) {
106 TransactionPart *tp = parts->get(i);
108 tp->setSequenceNumber(sequenceNumber);
112 int64_t Transaction::getClientLocalSequenceNumber() {
113 return clientLocalSequenceNumber;
116 Vector<TransactionPart *> *Transaction::getParts() {
120 bool Transaction::didSendAPartToServer() {
121 return flddidSendAPartToServer;
124 void Transaction::resetNextPartToSend() {
128 TransactionPart *Transaction::getNextPartToSend() {
129 if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
132 TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
138 void Transaction::setServerFailure() {
139 hadServerFailure = true;
142 bool Transaction::getServerFailure() {
143 return hadServerFailure;
147 void Transaction::resetServerFailure() {
148 hadServerFailure = false;
152 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
153 transactionStatus = _transactionStatus;
156 TransactionStatus *Transaction::getTransactionStatus() {
157 return transactionStatus;
160 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
162 bool changed = false;
163 uint lastusedindex = 0;
164 for (uint i = 0; i < partsPendingSend->size(); i++) {
165 int32_t parti = partsPendingSend->get(i);
166 for (uint j = 0; j < sentParts->size(); j++) {
167 int32_t partj = sentParts->get(j);
168 if (parti == partj) {
173 partsPendingSend->set(lastusedindex++, parti);
178 partsPendingSend->setSize(lastusedindex);
179 flddidSendAPartToServer = true;
180 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
184 bool Transaction::didSendAllParts() {
185 return partsPendingSend->isEmpty();
188 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
189 return keyValueUpdateSet;
192 int Transaction::getNumberOfParts() {
196 int64_t Transaction::getMachineId() {
200 int64_t Transaction::getArbitrator() {
204 bool Transaction::isComplete() {
205 return fldisComplete;
208 Pair<int64_t, int64_t> *Transaction::getId() {
209 return &transactionId;
212 void Transaction::setDead() {
216 // Make all the parts of this transaction dead
217 for (int32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
218 TransactionPart *part = parts->get(partNumber);
225 TransactionPart *Transaction::getPart(int index) {
226 return parts->get(index);
229 void Transaction::decodeTransactionData() {
230 // Calculate the size of the data section
232 for (int i = 0; i < parts->size(); i++) {
233 TransactionPart *tp = parts->get(i);
234 dataSize += tp->getDataSize();
237 Array<char> *combinedData = new Array<char>(dataSize);
238 int currentPosition = 0;
240 // Stitch all the data sections together
241 for (int i = 0; i < parts->size(); i++) {
242 TransactionPart *tp = parts->get(i);
243 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
244 currentPosition += tp->getDataSize();
248 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
250 // Decode how many key value pairs need to be decoded
251 int numberOfKVGuards = bbDecode->getInt();
252 int numberOfKVUpdates = bbDecode->getInt();
254 // Decode all the guard key values
255 for (int i = 0; i < numberOfKVGuards; i++) {
256 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
257 keyValueGuardSet->add(kv);
260 // Decode all the updates key values
261 for (int i = 0; i < numberOfKVUpdates; i++) {
262 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
263 keyValueUpdateSet->add(kv);
267 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable) {
268 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
269 while (kvit->hasNext()) {
270 KeyValue *kvGuard = kvit->next();
271 // First check if the key is in the speculative table, this is the value of the latest assumption
274 // If we have a speculation table then use it first
275 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
276 kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
279 // If we have a speculation table then use it first
280 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
281 kv = speculatedKeyValueTable->get(kvGuard->getKey());
285 // if it is not in the speculative table then check the committed table and use that
286 // value as our latest assumption
287 kv = committedKeyValueTable->get(kvGuard->getKey());
290 if (kvGuard->getValue() != NULL) {
291 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
295 printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
297 printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());