1 #include "Transaction.h"
2 #include "TransactionPart.h"
4 #include "ByteBuffer.h"
6 #include "TransactionStatus.h"
8 Transaction::Transaction() :
9 parts(new Vector<TransactionPart *>()),
12 partsPendingSend(new Vector<int32_t>()),
15 keyValueGuardSet(new Hashset<KeyValue *>()),
16 keyValueUpdateSet(new Hashset<KeyValue *>()),
19 clientLocalSequenceNumber(-1),
22 transactionId(Pair<int64_t, int64_t>(0,0)),
24 flddidSendAPartToServer(false),
25 transactionStatus(NULL),
26 hadServerFailure(false) {
29 Transaction::~Transaction() {
36 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
37 while (kvit->hasNext()) {
38 KeyValue *kvGuard = kvit->next();
42 delete keyValueGuardSet;
45 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueUpdateSet->iterator();
46 while (kvit->hasNext()) {
47 KeyValue *kvUpdate = kvit->next();
51 delete keyValueUpdateSet;
53 delete partsPendingSend;
56 void Transaction::addPartEncode(TransactionPart *newPart) {
57 TransactionPart *old = parts->setExpand(newPart->getPartNumber(), newPart);
62 partsPendingSend->add(newPart->getPartNumber());
64 sequenceNumber = newPart->getSequenceNumber();
65 arbitratorId = newPart->getArbitratorId();
66 transactionId = newPart->getTransactionId();
67 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
68 machineId = newPart->getMachineId();
72 void Transaction::addPartDecode(TransactionPart *newPart) {
74 // If dead then just kill this part and move on
79 sequenceNumber = newPart->getSequenceNumber();
80 arbitratorId = newPart->getArbitratorId();
81 transactionId = newPart->getTransactionId();
82 clientLocalSequenceNumber = newPart->getClientLocalSequenceNumber();
83 machineId = newPart->getMachineId();
85 TransactionPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
86 if (previouslySeenPart == NULL)
89 if (previouslySeenPart != NULL) {
90 // Set dead the old one since the new one is a rescued version of this part
91 previouslySeenPart->setDead();
92 } else if (newPart->isLastPart()) {
93 missingParts = new Hashset<int32_t>();
96 for (int i = 0; i < newPart->getPartNumber(); i++) {
97 if (parts->get(i) == NULL) {
103 if (!fldisComplete && hasLastPart) {
105 // We have seen this part so remove it from the set of missing parts
106 missingParts->remove(newPart->getPartNumber());
108 // Check if all the parts have been seen
109 if (missingParts->size() == 0) {
111 // We have all the parts
112 fldisComplete = true;
114 // Decode all the parts and create the key value guard and update sets
115 decodeTransactionData();
120 void Transaction::addUpdateKV(KeyValue *kv) {
121 keyValueUpdateSet->add(kv);
124 void Transaction::addGuardKV(KeyValue *kv) {
125 keyValueGuardSet->add(kv);
129 int64_t Transaction::getSequenceNumber() {
130 return sequenceNumber;
133 void Transaction::setSequenceNumber(int64_t _sequenceNumber) {
134 sequenceNumber = _sequenceNumber;
136 for (uint32_t i = 0; i < parts->size(); i++) {
137 TransactionPart *tp = parts->get(i);
139 tp->setSequenceNumber(sequenceNumber);
143 int64_t Transaction::getClientLocalSequenceNumber() {
144 return clientLocalSequenceNumber;
147 Vector<TransactionPart *> *Transaction::getParts() {
151 bool Transaction::didSendAPartToServer() {
152 return flddidSendAPartToServer;
155 void Transaction::resetNextPartToSend() {
159 TransactionPart *Transaction::getNextPartToSend() {
160 if ((partsPendingSend->size() == 0) || (partsPendingSend->size() == nextPartToSend)) {
163 TransactionPart *part = parts->get(partsPendingSend->get(nextPartToSend));
169 void Transaction::setServerFailure() {
170 hadServerFailure = true;
173 bool Transaction::getServerFailure() {
174 return hadServerFailure;
178 void Transaction::resetServerFailure() {
179 hadServerFailure = false;
183 void Transaction::setTransactionStatus(TransactionStatus *_transactionStatus) {
184 transactionStatus = _transactionStatus;
187 TransactionStatus *Transaction::getTransactionStatus() {
188 return transactionStatus;
191 void Transaction::removeSentParts(Vector<int32_t> *sentParts) {
193 bool changed = false;
194 uint lastusedindex = 0;
195 for (uint i = 0; i < partsPendingSend->size(); i++) {
196 int32_t parti = partsPendingSend->get(i);
197 for (uint j = 0; j < sentParts->size(); j++) {
198 int32_t partj = sentParts->get(j);
199 if (parti == partj) {
204 partsPendingSend->set(lastusedindex++, parti);
209 partsPendingSend->setSize(lastusedindex);
210 flddidSendAPartToServer = true;
211 transactionStatus->setTransactionSequenceNumber(sequenceNumber);
215 bool Transaction::didSendAllParts() {
216 return partsPendingSend->isEmpty();
219 Hashset<KeyValue *> *Transaction::getKeyValueUpdateSet() {
220 return keyValueUpdateSet;
223 int Transaction::getNumberOfParts() {
227 int64_t Transaction::getMachineId() {
231 int64_t Transaction::getArbitrator() {
235 bool Transaction::isComplete() {
236 return fldisComplete;
239 Pair<int64_t, int64_t> *Transaction::getId() {
240 return &transactionId;
243 void Transaction::setDead() {
247 // Make all the parts of this transaction dead
248 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
249 TransactionPart *part = parts->get(partNumber);
256 TransactionPart *Transaction::getPart(int index) {
257 return parts->get(index);
260 void Transaction::decodeTransactionData() {
261 // Calculate the size of the data section
263 for (uint i = 0; i < parts->size(); i++) {
264 TransactionPart *tp = parts->get(i);
265 dataSize += tp->getDataSize();
268 Array<char> *combinedData = new Array<char>(dataSize);
269 int currentPosition = 0;
271 // Stitch all the data sections together
272 for (uint i = 0; i < parts->size(); i++) {
273 TransactionPart *tp = parts->get(i);
274 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
275 currentPosition += tp->getDataSize();
279 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
281 // Decode how many key value pairs need to be decoded
282 int numberOfKVGuards = bbDecode->getInt();
283 int numberOfKVUpdates = bbDecode->getInt();
285 // Decode all the guard key values
286 for (int i = 0; i < numberOfKVGuards; i++) {
287 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
288 keyValueGuardSet->add(kv);
291 // Decode all the updates key values
292 for (int i = 0; i < numberOfKVUpdates; i++) {
293 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
294 keyValueUpdateSet->add(kv);
299 bool Transaction::evaluateGuard(Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *committedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *speculatedKeyValueTable, Hashtable<IoTString *, KeyValue *, uintptr_t, 0, hashString, StringEquals> *pendingTransactionSpeculatedKeyValueTable) {
300 SetIterator<KeyValue *, KeyValue *> *kvit = keyValueGuardSet->iterator();
301 while (kvit->hasNext()) {
302 KeyValue *kvGuard = kvit->next();
303 // First check if the key is in the speculative table, this is the value of the latest assumption
306 // If we have a speculation table then use it first
307 if (pendingTransactionSpeculatedKeyValueTable != NULL) {
308 kv = pendingTransactionSpeculatedKeyValueTable->get(kvGuard->getKey());
311 // If we have a speculation table then use it first
312 if ((kv == NULL) && (speculatedKeyValueTable != NULL)) {
313 kv = speculatedKeyValueTable->get(kvGuard->getKey());
317 // if it is not in the speculative table then check the committed table and use that
318 // value as our latest assumption
319 kv = committedKeyValueTable->get(kvGuard->getKey());
322 if (kvGuard->getValue() != NULL) {
323 if ((kv == NULL) || (!kvGuard->getValue()->equals(kv->getValue()))) {
327 printf("%s %s\n", kvGuard->getKey()->internalBytes()->internalArray(), kv->getValue()->internalBytes()->internalArray());
329 printf("%s null\n", kvGuard->getValue()->internalBytes()->internalArray());