2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
7 parts(new Vector<CommitPart *>()),
12 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
16 transactionSequenceNumber(-1),
17 liveKeys(new Hashset<IoTString *>()) {
20 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
21 parts(new Vector<CommitPart *>()),
26 keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
28 sequenceNumber(_sequenceNumber),
29 machineId(_machineId),
30 transactionSequenceNumber(_transactionSequenceNumber),
31 liveKeys(new Hashset<IoTString *>()) {
37 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
38 while(keyit->hasNext()) {
42 delete keyValueUpdateSet;
45 if (missingParts != NULL)
49 void Commit::addPartDecode(CommitPart *newPart) {
51 // If dead then just kill this part and move on
56 CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
57 if (previouslySeenPart == NULL)
60 if (previouslySeenPart != NULL) {
61 // Set dead the old one since the new one is a rescued version of this part
62 previouslySeenPart->setDead();
63 } else if (newPart->isLastPart()) {
64 missingParts = new Hashset<int32_t>();
67 for (int i = 0; i < newPart->getPartNumber(); i++) {
68 if (parts->get(i) == NULL) {
74 if (!fldisComplete && hasLastPart) {
76 // We have seen this part so remove it from the set of missing parts
77 missingParts->remove(newPart->getPartNumber());
79 // Check if all the parts have been seen
80 if (missingParts->size() == 0) {
82 // We have all the parts
85 // Decode all the parts and create the key value guard and update sets
88 // Get the sequence number and arbitrator of this transaction
89 sequenceNumber = parts->get(0)->getSequenceNumber();
90 machineId = parts->get(0)->getMachineId();
91 transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
96 int64_t Commit::getSequenceNumber() {
97 return sequenceNumber;
100 int64_t Commit::getTransactionSequenceNumber() {
101 return transactionSequenceNumber;
104 Vector<CommitPart *> *Commit::getParts() {
108 void Commit::addKV(KeyValue *kv) {
109 KeyValue * kvcopy = kv->getCopy();
110 keyValueUpdateSet->add(kvcopy);
111 liveKeys->add(kvcopy->getKey());
114 void Commit::invalidateKey(IoTString *key) {
115 liveKeys->remove(key);
117 if (liveKeys->size() == 0) {
122 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
123 return keyValueUpdateSet;
126 int32_t Commit::getNumberOfParts() {
130 void Commit::setDead() {
133 // Make all the parts of this transaction dead
134 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
135 CommitPart *part = parts->get(partNumber);
142 CommitPart *Commit::getPart(int index) {
143 return parts->get(index);
146 void Commit::createCommitParts() {
150 Array<char> *charData = convertDataToBytes();
152 int commitPartCount = 0;
153 int currentPosition = 0;
154 int remaining = charData->length();
156 while (remaining > 0) {
157 bool isLastPart = false;
158 // determine how much to copy
159 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
160 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
161 copySize = remaining;
162 isLastPart = true;// last bit of data so last part
165 // Copy to a smaller version
166 Array<char> *partData = new Array<char>(copySize);
167 System_arraycopy(charData, currentPosition, partData, 0, copySize);
169 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
170 parts->setExpand(part->getPartNumber(), part);
172 // Update position, count and remaining
173 currentPosition += copySize;
175 remaining -= copySize;
180 void Commit::decodeCommitData() {
181 // Calculate the size of the data section
183 for (uint i = 0; i < parts->size(); i++) {
184 CommitPart *tp = parts->get(i);
186 dataSize += tp->getDataSize();
189 Array<char> *combinedData = new Array<char>(dataSize);
190 int currentPosition = 0;
192 // Stitch all the data sections together
193 for (uint i = 0; i < parts->size(); i++) {
194 CommitPart *tp = parts->get(i);
196 System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
197 currentPosition += tp->getDataSize();
202 ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
204 // Decode how many key value pairs need to be decoded
205 int numberOfKVUpdates = bbDecode->getInt();
207 // Decode all the updates key values
208 for (int i = 0; i < numberOfKVUpdates; i++) {
209 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
210 keyValueUpdateSet->add(kv);
211 liveKeys->add(kv->getKey());
216 Array<char> *Commit::convertDataToBytes() {
217 // Calculate the size of the data
218 int sizeOfData = sizeof(int32_t); // Number of Update KV's
219 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
220 while (kvit->hasNext()) {
221 KeyValue *kv = kvit->next();
222 sizeOfData += kv->getSize();
226 // Data handlers and storage
227 Array<char> *dataArray = new Array<char>(sizeOfData);
228 ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
230 // Encode the size of the updates and guard sets
231 bbEncode->putInt(keyValueUpdateSet->size());
233 // Encode all the updates
234 kvit = keyValueUpdateSet->iterator();
235 while (kvit->hasNext()) {
236 KeyValue *kv = kvit->next();
237 kv->encode(bbEncode);
240 Array<char> * array = bbEncode->array();
241 bbEncode->releaseArray();
246 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
247 keyValueUpdateSet->clear();
249 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
250 while (kvit->hasNext()) {
251 KeyValue *kv = kvit->next();
252 KeyValue *kvcopy = kv->getCopy();
253 liveKeys->add(kvcopy->getKey());
254 keyValueUpdateSet->add(kvcopy);
259 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
262 } else if (newer == NULL) {
265 Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
266 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
267 while (kvit->hasNext()) {
268 KeyValue *kv = kvit->next();
272 kvit = newer->getKeyValueUpdateSet()->iterator();
273 while (kvit->hasNext()) {
274 KeyValue *kv = kvit->next();
279 int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
280 if (transactionSequenceNumber == -1) {
281 transactionSequenceNumber = older->getTransactionSequenceNumber();
284 Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
285 newCommit->setKVsMap(kvSet);