1 #include "Transaction.h"
2 #include "TransactionPart.h"
4 #include "ByteBuffer.h"
6 #include "TransactionStatus.h"
8 Transaction::Transaction() :
9 parts(new MyVector<TransactionPart *>()),
12 partsPendingSend(new MyVector<int32_t>()),
15 keyValueGuardSet(new Hashset<KeyValue *>()),
16 keyValueUpdateSet(new Hashset<KeyValue *>()),
19 clientLocalSequenceNumber(-1),
22 transactionId(Pair<int64_t, int64_t>(0,0)),
24 flddidSendAPartToServer(false),
25 transactionStatus(NULL),
26 hadServerFailure(false) {
29 Transaction::~Transaction() {
33 uint Size = parts->size();
34 for(uint i=0; i<Size; i++)
35 parts->get(i)->releaseRef();
39 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
40 while (kvit->hasNext()) {
41 KeyValue *kvGuard = kvit->next();
45 delete keyValueGuardSet;
48 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
49 while (kvit->hasNext()) {
50 KeyValue *kvUpdate = kvit->next();
54 delete keyValueUpdateSet;
56 delete partsPendingSend;
59 void Transaction::addPartEncode(TransactionPart *newPart) {
60 newPart->acquireRef();
61 //printf("Add part %d\n", newPart->getPartNumber());
62 TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
68 partsPendingSend->add(newPart->getPartNumber());
70 sequenceNumber = newPart->getSequenceNumber();
71 arbitratorId = newPart->getArbitratorId();
72 transactionId = newPart->getTransactionId();
73 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
74 machineId = newPart->getMachineId();
78 void Transaction::addPartDecode(TransactionPart *newPart) {
80 // If dead then just kill this part and move on
84 newPart->acquireRef();
85 sequenceNumber = newPart->getSequenceNumber();
86 arbitratorId = newPart->getArbitratorId();
87 transactionId = newPart->getTransactionId();
88 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
89 machineId = newPart->getMachineId();
91 TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
92 if (previouslySeenPart == NULL)
95 if (previouslySeenPart != NULL) {
96 // Set dead the old one since the new one is a rescued version of this part
97 previouslySeenPart->releaseRef();
98 previouslySeenPart->setDead();
99 } else if (newPart->isLastPart()) {
100 missingParts = new Hashset<int32_t>();
103 for (int i = 0; i < newPart->getPartNumber(); i++) {
104 if (parts->get(i) == NULL) {
105 missingParts->add(i);
110 if (!fldisComplete && hasLastPart) {
112 // We have seen this part so remove it from the set of missing parts
113 missingParts->remove(newPart->getPartNumber());
115 // Check if all the parts have been seen
116 if (missingParts->size() == 0) {
118 // We have all the parts
119 fldisComplete = true;
121 // Decode all the parts and create the key value guard and update sets
122 decodeTransactionData();
127 void Transaction::addUpdateKV(KeyValue *kv) {
128 keyValueUpdateSet->add(kv);
131 void Transaction::addGuardKV(KeyValue *kv) {
132 keyValueGuardSet->add(kv);
136 int64_t Transaction::getSequenceNumber() {
137 return sequenceNumber;
140 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
141 sequenceNumber = _sequenceNumber;
143 for (uint32_t i = 0; i < parts->size(); i++) {
144 TransactionPart *tp = parts->get(i);
146 tp->setSequenceNumber(sequenceNumber);
150 int64_t Transaction::getClientLocalSequenceNumber() {
151 return clientLocalSequenceNumber;
154 MyVector<TransactionPart *> *Transaction::getParts() {
158 bool Transaction::didSendAPartToServer() {
159 return flddidSendAPartToServer;
162 void Transaction::resetNextPartToSend() {
166 TransactionPart *Transaction::getNextPartToSend() {
167 if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
170 TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
176 void Transaction::setServerFailure() {
177 hadServerFailure = true;
180 bool Transaction::getServerFailure() {
181 return hadServerFailure;
185 void Transaction::resetServerFailure() {
186 hadServerFailure = false;
190 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
191 transactionStatus = _transactionStatus;
194 TransactionStatus *Transaction::getTransactionStatus() {
195 return transactionStatus;
198 void Transaction::removeSentParts(MyVector<int32_t> *sentParts) {
200 bool changed = false;
201 uint lastusedindex = 0;
202 for (uint i = 0; i < partsPendingSend->size(); i++) {
203 int32_t parti = partsPendingSend->get(i);
204 for (uint j = 0; j < sentParts->size(); j++) {
205 int32_t partj = sentParts->get(j);
206 if (parti == partj) {
211 partsPendingSend->set(lastusedindex++, parti);
216 partsPendingSend->setSize(lastusedindex);
217 flddidSendAPartToServer = true;
218 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
222 bool Transaction::didSendAllParts() {
223 return partsPendingSend->isEmpty();
226 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
227 return keyValueUpdateSet;
230 int Transaction::getNumberOfParts() {
234 int64_t Transaction::getMachineId() {
238 int64_t Transaction::getArbitrator() {
242 bool Transaction::isComplete() {
243 return fldisComplete;
246 Pair<int64_t, int64_t> *Transaction::getId() {
247 return &transactionId;
250 void Transaction::setDead() {
254 // Make all the parts of this transaction dead
255 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
256 TransactionPart *part = parts->get(partNumber);
263 void Transaction::decodeTransactionData() {
264 // Calculate the size of the data section
266 for (uint i = 0; i < parts->size(); i++) {
267 TransactionPart *tp = parts->get(i);
268 dataSize += tp->getDataSize();
271 Array<char> *combinedData = new Array<char>(dataSize);
272 int currentPosition = 0;
274 // Stitch all the data sections together
275 for (uint i = 0; i < parts->size(); i++) {
276 TransactionPart *tp = parts->get(i);
277 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
278 currentPosition += tp->getDataSize();
282 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
284 // Decode how many key value pairs need to be decoded
285 int numberOfKVGuards = bbDecode->getInt();
286 int numberOfKVUpdates = bbDecode->getInt();
288 // Decode all the guard key values
289 for (int i = 0; i < numberOfKVGuards; i++) {
290 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
291 keyValueGuardSet->add(kv);
294 // Decode all the updates key values
295 for (int i = 0; i < numberOfKVUpdates; i++) {
296 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
297 keyValueUpdateSet->add(kv);
302 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
303 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
304 while (kvit->hasNext()) {
305 KeyValue *kvGuard = kvit->next();
306 // First check if the key is in the speculative table, this is the value of the latest assumption
309 // If we have a speculation table then use it first
310 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
311 kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
314 // If we have a speculation table then use it first
315 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
316 kv = speculatedKeyValueTable->get(kvGuard->getKey());
320 // if it is not in the speculative table then check the committed table and use that
321 // value as our latest assumption
322 kv = committedKeyValueTable->get(kvGuard->getKey());
325 if (kvGuard->getValue() != NULL) {
326 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
330 //printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
332 //printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());