4 parts(new Hashtable<int32_t, CommitPart *>()),
8 keyValueUpdateSet(new HashSet<KeyValue *>()),
12 transactionSequenceNumber(-1),
13 liveKeys(new Hashset<IoTString *>) {
17 Commit::Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) :
18 parts(new Hashtable<int32_t, CommitPart *>()),
22 keyValueUpdateSet(new HashSet<KeyValue *>()),
24 sequenceNumber(_sequenceNumber),
25 machineId(_machineId),
26 transactionSequenceNumber(_transactionSequenceNumber),
27 liveKeys(new Hashset<IoTString *>) {
30 void Commit::addPartDecode(CommitPart newPart) {
33 // If dead then just kill this part and move on
38 CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
40 if (previoslySeenPart != NULL) {
41 // Set dead the old one since the new one is a rescued version of this part
42 previoslySeenPart.setDead();
43 } else if (newPart.isLastPart()) {
44 missingParts = new HashSet<Integer>();
47 for (int i = 0; i < newPart.getPartNumber(); i++) {
48 if (parts.get(i) == NULL) {
54 if (!fldisComplete && hasLastPart) {
56 // We have seen this part so remove it from the set of missing parts
57 missingParts.remove(newPart.getPartNumber());
59 // Check if all the parts have been seen
60 if (missingParts.size() == 0) {
62 // We have all the parts
65 // Decode all the parts and create the key value guard and update sets
68 // Get the sequence number and arbitrator of this transaction
69 sequenceNumber = parts.get(0).getSequenceNumber();
70 machineId = parts.get(0).getMachineId();
71 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
76 int64_t getSequenceNumber() {
77 return sequenceNumber;
80 int64_t getTransactionSequenceNumber() {
81 return transactionSequenceNumber;
84 Hashtable<Integer, CommitPart> getParts() {
88 void addKV(KeyValue kv) {
89 keyValueUpdateSet.add(kv);
90 liveKeys.add(kv.getKey());
93 void invalidateKey(IoTString key) {
96 if (liveKeys.size() == 0) {
101 Set<KeyValue> getKeyValueUpdateSet() {
102 return keyValueUpdateSet;
105 int32_t getNumberOfParts() {
118 // Make all the parts of this transaction dead
119 for (Integer partNumber : parts.keySet()) {
120 CommitPart part = parts.get(partNumber);
125 CommitPart getPart(int index) {
126 return parts.get(index);
129 void createCommitParts() {
134 char[] charData = convertDataToBytes();
137 int commitPartCount = 0;
138 int currentPosition = 0;
139 int remaining = charData.length;
141 while (remaining > 0) {
143 Boolean isLastPart = false;
144 // determine how much to copy
145 int copySize = CommitPart.MAX_NON_HEADER_SIZE;
146 if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
147 copySize = remaining;
148 isLastPart = true; // last bit of data so last part
151 // Copy to a smaller version
152 char[] partData = new char[copySize];
153 System.arraycopy(charData, currentPosition, partData, 0, copySize);
155 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
156 parts.put(part.getPartNumber(), part);
158 // Update position, count and remaining
159 currentPosition += copySize;
161 remaining -= copySize;
165 void decodeCommitData() {
167 // Calculate the size of the data section
169 for (int i = 0; i < parts.keySet().size(); i++) {
170 CommitPart tp = parts.get(i);
171 dataSize += tp.getDataSize();
174 char[] combinedData = new char[dataSize];
175 int currentPosition = 0;
177 // Stitch all the data sections together
178 for (int i = 0; i < parts.keySet().size(); i++) {
179 CommitPart tp = parts.get(i);
180 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
181 currentPosition += tp.getDataSize();
185 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
187 // Decode how many key value pairs need to be decoded
188 int numberOfKVUpdates = bbDecode.getInt();
190 // Decode all the updates key values
191 for (int i = 0; i < numberOfKVUpdates; i++) {
192 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
193 keyValueUpdateSet.add(kv);
194 liveKeys.add(kv.getKey());
198 char[] convertDataToBytes() {
200 // Calculate the size of the data
201 int sizeOfData = sizeof(int32_t); // Number of Update KV's
202 for (KeyValue kv : keyValueUpdateSet) {
203 sizeOfData += kv.getSize();
206 // Data handlers and storage
207 char[] dataArray = new char[sizeOfData];
208 ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
210 // Encode the size of the updates and guard sets
211 bbEncode.putInt(keyValueUpdateSet.size());
213 // Encode all the updates
214 for (KeyValue kv : keyValueUpdateSet) {
218 return bbEncode.array();
221 void setKVsMap(Hashtable<IoTString, KeyValue> newKVs) {
222 keyValueUpdateSet.clear();
225 keyValueUpdateSet.addAll(newKVs.values());
226 liveKeys.addAll(newKVs.keySet());
231 static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
235 } else if (newer == NULL) {
239 Hashtable<IoTString, KeyValue> kvSet = new Hashtable<IoTString, KeyValue>();
240 for (KeyValue kv : older.getKeyValueUpdateSet()) {
241 kvSet.put(kv.getKey(), kv);
244 for (KeyValue kv : newer.getKeyValueUpdateSet()) {
245 kvSet.put(kv.getKey(), kv);
248 int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
250 if (transactionSequenceNumber == -1) {
251 transactionSequenceNumber = older.getTransactionSequenceNumber();
254 Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
256 newCommit.setKVsMap(kvSet);