766584175f86a82c00442dd0bb3adf119e1a7420
[iotcloud.git] / version2 / src / C / Commit.cc
1 #include "Commit.h"
2 #include "CommitPart.h"
3 #include "ByteBuffer.h"
4 #include "IoTString.h"
5
6 Commit::Commit() :
7         parts(new Vector<CommitPart *>()),
8         partCount(0),
9         missingParts(NULL),
10         fldisComplete(false),
11         hasLastPart(false),
12         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
13         isDead(false),
14         sequenceNumber(-1),
15         machineId(-1),
16         transactionSequenceNumber(-1),
17         dataBytes(NULL),
18         liveKeys(new Hashset<IoTString *>()) {
19 }
20
21 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
22         parts(new Vector<CommitPart *>()),
23         partCount(0),
24         missingParts(NULL),
25         fldisComplete(true),
26         hasLastPart(false),
27         keyValueUpdateSet(new Hashset<KeyValue *, uintptr_t, 0>()),
28         isDead(false),
29         sequenceNumber(_sequenceNumber),
30         machineId(_machineId),
31         transactionSequenceNumber(_transactionSequenceNumber),
32         dataBytes(NULL),
33         liveKeys(new Hashset<IoTString *>()) {
34 }
35
36 Commit::~Commit() {
37         {
38                 uint Size = parts->size();
39                 for(uint i=0;i<Size; i++)
40                         parts->get(i)->releaseRef();
41                 delete parts;
42         }
43         {
44                 SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> * keyit = keyValueUpdateSet->iterator();
45                 while(keyit->hasNext()) {
46                         delete keyit->next();
47                 }
48                 delete keyit;
49                 delete keyValueUpdateSet;
50         }
51         delete liveKeys;
52         if (missingParts != NULL)
53                 delete missingParts;
54         if (dataBytes != NULL)
55                 delete dataBytes;
56 }
57
58 void Commit::addPartDecode(CommitPart *newPart) {
59         if (isDead) {
60                 // If dead then just kill this part and move on
61                 newPart->setDead();
62                 return;
63         }
64
65         newPart->acquireRef();
66         CommitPart *previouslySeenPart = parts->setExpand(newPart->getPartNumber(), newPart);
67         if (previouslySeenPart == NULL)
68                 partCount++;
69
70         if (previouslySeenPart != NULL) {
71                 // Set dead the old one since the new one is a rescued version of this part
72                 previouslySeenPart->setDead();
73                 previouslySeenPart->releaseRef();
74         } else if (newPart->isLastPart()) {
75                 missingParts = new Hashset<int32_t>();
76                 hasLastPart = true;
77
78                 for (int i = 0; i < newPart->getPartNumber(); i++) {
79                         if (parts->get(i) == NULL) {
80                                 missingParts->add(i);
81                         }
82                 }
83         }
84
85         if (!fldisComplete && hasLastPart) {
86
87                 // We have seen this part so remove it from the set of missing parts
88                 missingParts->remove(newPart->getPartNumber());
89
90                 // Check if all the parts have been seen
91                 if (missingParts->size() == 0) {
92
93                         // We have all the parts
94                         fldisComplete = true;
95
96                         // Decode all the parts and create the key value guard and update sets
97                         decodeCommitData();
98
99                         // Get the sequence number and arbitrator of this transaction
100                         sequenceNumber = parts->get(0)->getSequenceNumber();
101                         machineId = parts->get(0)->getMachineId();
102                         transactionSequenceNumber = parts->get(0)->getTransactionSequenceNumber();
103                 }
104         }
105 }
106
107 int64_t Commit::getSequenceNumber() {
108         return sequenceNumber;
109 }
110
111 int64_t Commit::getTransactionSequenceNumber() {
112         return transactionSequenceNumber;
113 }
114
115 Vector<CommitPart *> *Commit::getParts() {
116         return parts;
117 }
118
119 void Commit::addKV(KeyValue *kv) {
120         KeyValue * kvcopy = kv->getCopy();
121         keyValueUpdateSet->add(kvcopy);
122         liveKeys->add(kvcopy->getKey());
123 }
124
125 void Commit::invalidateKey(IoTString *key) {
126         liveKeys->remove(key);
127
128         if (liveKeys->size() == 0) {
129                 setDead();
130         }
131 }
132
133 Hashset<KeyValue *, uintptr_t, 0> *Commit::getKeyValueUpdateSet() {
134         return keyValueUpdateSet;
135 }
136
137 int32_t Commit::getNumberOfParts() {
138         return partCount;
139 }
140
141 void Commit::setDead() {
142         if (!isDead) {
143                 isDead = true;
144                 // Make all the parts of this transaction dead
145                 for (uint32_t partNumber = 0; partNumber < parts->size(); partNumber++) {
146                         CommitPart *part = parts->get(partNumber);
147                         part->setDead();
148                 }
149         }
150 }
151
152 void Commit::createCommitParts() {
153         uint Size = parts->size();
154         for(uint i=0;i < Size; i++) {
155                 Entry * e=parts->get(i);
156                 e->releaseRef();
157         }
158         parts->clear();
159         partCount = 0;
160         // Convert to chars
161         Array<char> *charData = convertDataToBytes();
162
163         int commitPartCount = 0;
164         int currentPosition = 0;
165         int remaining = charData->length();
166
167         while (remaining > 0) {
168                 bool isLastPart = false;
169                 // determine how much to copy
170                 int copySize = CommitPart_MAX_NON_HEADER_SIZE;
171                 if (remaining <= CommitPart_MAX_NON_HEADER_SIZE) {
172                         copySize = remaining;
173                         isLastPart = true;// last bit of data so last part
174                 }
175
176                 // Copy to a smaller version
177                 Array<char> *partData = new Array<char>(copySize);
178                 System_arraycopy(charData, currentPosition, partData, 0, copySize);
179
180                 CommitPart *part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
181                 parts->setExpand(part->getPartNumber(), part);
182
183                 // Update position, count and remaining
184                 currentPosition += copySize;
185                 commitPartCount++;
186                 remaining -= copySize;
187         }
188         delete charData;
189 }
190
191 void Commit::decodeCommitData() {
192         // Calculate the size of the data section
193         int dataSize = 0;
194         for (uint i = 0; i < parts->size(); i++) {
195                 CommitPart *tp = parts->get(i);
196                 if (tp != NULL)
197                         dataSize += tp->getDataSize();
198         }
199
200         Array<char> *combinedData = new Array<char>(dataSize);
201         int currentPosition = 0;
202
203         // Stitch all the data sections together
204         for (uint i = 0; i < parts->size(); i++) {
205                 CommitPart *tp = parts->get(i);
206                 if (tp != NULL) {
207                         System_arraycopy(tp->getData(), 0, combinedData, currentPosition, tp->getDataSize());
208                         currentPosition += tp->getDataSize();
209                 }
210         }
211
212         // Decoder Object
213         ByteBuffer *bbDecode = ByteBuffer_wrap(combinedData);
214
215         // Decode how many key value pairs need to be decoded
216         int numberOfKVUpdates = bbDecode->getInt();
217
218         // Decode all the updates key values
219         for (int i = 0; i < numberOfKVUpdates; i++) {
220                 KeyValue *kv = (KeyValue *)KeyValue_decode(bbDecode);
221                 keyValueUpdateSet->add(kv);
222                 liveKeys->add(kv->getKey());
223         }
224         delete bbDecode;
225 }
226
227 Array<char> *Commit::convertDataToBytes() {
228         // Calculate the size of the data
229         int sizeOfData = sizeof(int32_t);       // Number of Update KV's
230         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = keyValueUpdateSet->iterator();
231         while (kvit->hasNext()) {
232                 KeyValue *kv = kvit->next();
233                 sizeOfData += kv->getSize();
234         }
235         delete kvit;
236
237         // Data handlers and storage
238         Array<char> *dataArray = new Array<char>(sizeOfData);
239         ByteBuffer *bbEncode = ByteBuffer_wrap(dataArray);
240
241         // Encode the size of the updates and guard sets
242         bbEncode->putInt(keyValueUpdateSet->size());
243
244         // Encode all the updates
245         kvit = keyValueUpdateSet->iterator();
246         while (kvit->hasNext()) {
247                 KeyValue *kv = kvit->next();
248                 kv->encode(bbEncode);
249         }
250         delete kvit;
251         Array<char> * array = bbEncode->array();
252         bbEncode->releaseArray();
253         delete bbEncode;
254         return array;
255 }
256
257 void Commit::setKVsMap(Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *newKVs) {
258         keyValueUpdateSet->clear();
259         liveKeys->clear();
260         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvit = newKVs->iterator();
261         while (kvit->hasNext()) {
262                 KeyValue *kv = kvit->next();
263                 KeyValue *kvcopy = kv->getCopy();
264                 liveKeys->add(kvcopy->getKey());
265                 keyValueUpdateSet->add(kvcopy);
266         }
267         delete kvit;
268 }
269
270 Commit *Commit_merge(Commit *newer, Commit *older, int64_t newSequenceNumber) {
271         if (older == NULL) {
272                 return newer;
273         } else if (newer == NULL) {
274                 return older;
275         }
276         Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals> *kvSet = new Hashset<KeyValue *, uintptr_t, 0, hashKeyValue, KeyValueEquals>();
277         SetIterator<KeyValue *, KeyValue *, uintptr_t, 0> *kvit = older->getKeyValueUpdateSet()->iterator();
278         while (kvit->hasNext()) {
279                 KeyValue *kv = kvit->next();
280                 kvSet->add(kv);
281         }
282         delete kvit;
283         kvit = newer->getKeyValueUpdateSet()->iterator();
284         while (kvit->hasNext()) {
285                 KeyValue *kv = kvit->next();
286                 kvSet->add(kv);
287         }
288         delete kvit;
289
290         int64_t transactionSequenceNumber = newer->getTransactionSequenceNumber();
291         if (transactionSequenceNumber == -1) {
292                 transactionSequenceNumber = older->getTransactionSequenceNumber();
293         }
294
295         Commit *newCommit = new Commit(newSequenceNumber, newer->getMachineId(), transactionSequenceNumber);
296         newCommit->setKVsMap(kvSet);
297
298         delete kvSet;
299         return newCommit;
300 }