5 private Map<Integer, CommitPart> parts = NULL;
6 private Set<Integer> missingParts = NULL;
7 private bool isComplete = false;
8 private bool hasLastPart = false;
9 private Set<KeyValue> keyValueUpdateSet = NULL;
10 private bool isDead = false;
11 private int64_t sequenceNumber = -1;
12 private int64_t machineId = -1;
13 private int64_t transactionSequenceNumber = -1;
15 private Set<IoTString> liveKeys = NULL;
18 parts = new HashMap<Integer, CommitPart>();
19 keyValueUpdateSet = new HashSet<KeyValue>();
21 liveKeys = new HashSet<IoTString>();
24 public Commit(int64_t _sequenceNumber, int64_t _machineId, int64_t _transactionSequenceNumber) {
25 parts = new HashMap<Integer, CommitPart>();
26 keyValueUpdateSet = new HashSet<KeyValue>();
28 liveKeys = new HashSet<IoTString>();
30 sequenceNumber = _sequenceNumber;
31 machineId = _machineId;
32 transactionSequenceNumber = _transactionSequenceNumber;
37 public void addPartDecode(CommitPart newPart) {
40 // If dead then just kill this part and move on
45 CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
47 if (previoslySeenPart != NULL) {
48 // Set dead the old one since the new one is a rescued version of this part
49 previoslySeenPart.setDead();
50 } else if (newPart.isLastPart()) {
51 missingParts = new HashSet<Integer>();
54 for (int i = 0; i < newPart.getPartNumber(); i++) {
55 if (parts.get(i) == NULL) {
61 if (!isComplete && hasLastPart) {
63 // We have seen this part so remove it from the set of missing parts
64 missingParts.remove(newPart.getPartNumber());
66 // Check if all the parts have been seen
67 if (missingParts.size() == 0) {
69 // We have all the parts
72 // Decode all the parts and create the key value guard and update sets
75 // Get the sequence number and arbitrator of this transaction
76 sequenceNumber = parts.get(0).getSequenceNumber();
77 machineId = parts.get(0).getMachineId();
78 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
83 public int64_t getSequenceNumber() {
84 return sequenceNumber;
87 public int64_t getTransactionSequenceNumber() {
88 return transactionSequenceNumber;
91 public Map<Integer, CommitPart> getParts() {
95 public void addKV(KeyValue kv) {
96 keyValueUpdateSet.add(kv);
97 liveKeys.add(kv.getKey());
100 public void invalidateKey(IoTString key) {
101 liveKeys.remove(key);
103 if (liveKeys.size() == 0) {
108 public Set<KeyValue> getKeyValueUpdateSet() {
109 return keyValueUpdateSet;
112 public int getNumberOfParts() {
116 public int64_t getMachineId() {
120 public bool isComplete() {
124 public bool isLive() {
128 public void setDead() {
137 // Make all the parts of this transaction dead
138 for (Integer partNumber : parts.keySet()) {
139 CommitPart part = parts.get(partNumber);
144 public CommitPart getPart(int index) {
145 return parts.get(index);
148 public void createCommitParts() {
153 char[] charData = convertDataToBytes();
156 int commitPartCount = 0;
157 int currentPosition = 0;
158 int remaining = charData.length;
160 while (remaining > 0) {
162 Boolean isLastPart = false;
163 // determine how much to copy
164 int copySize = CommitPart.MAX_NON_HEADER_SIZE;
165 if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
166 copySize = remaining;
167 isLastPart = true; // last bit of data so last part
170 // Copy to a smaller version
171 char[] partData = new char[copySize];
172 System.arraycopy(charData, currentPosition, partData, 0, copySize);
174 CommitPart part = new CommitPart(NULL, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
175 parts.put(part.getPartNumber(), part);
177 // Update position, count and remaining
178 currentPosition += copySize;
180 remaining -= copySize;
184 private void decodeCommitData() {
186 // Calculate the size of the data section
188 for (int i = 0; i < parts.keySet().size(); i++) {
189 CommitPart tp = parts.get(i);
190 dataSize += tp.getDataSize();
193 char[] combinedData = new char[dataSize];
194 int currentPosition = 0;
196 // Stitch all the data sections together
197 for (int i = 0; i < parts.keySet().size(); i++) {
198 CommitPart tp = parts.get(i);
199 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
200 currentPosition += tp.getDataSize();
204 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
206 // Decode how many key value pairs need to be decoded
207 int numberOfKVUpdates = bbDecode.getInt();
209 // Decode all the updates key values
210 for (int i = 0; i < numberOfKVUpdates; i++) {
211 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
212 keyValueUpdateSet.add(kv);
213 liveKeys.add(kv.getKey());
217 private char[] convertDataToBytes() {
219 // Calculate the size of the data
220 int sizeOfData = sizeof(int32_t); // Number of Update KV's
221 for (KeyValue kv : keyValueUpdateSet) {
222 sizeOfData += kv.getSize();
225 // Data handlers and storage
226 char[] dataArray = new char[sizeOfData];
227 ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
229 // Encode the size of the updates and guard sets
230 bbEncode.putInt(keyValueUpdateSet.size());
232 // Encode all the updates
233 for (KeyValue kv : keyValueUpdateSet) {
237 return bbEncode.array();
240 private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
241 keyValueUpdateSet.clear();
244 keyValueUpdateSet.addAll(newKVs.values());
245 liveKeys.addAll(newKVs.keySet());
250 public static Commit merge(Commit newer, Commit older, int64_t newSequenceNumber) {
254 } else if (newer == NULL) {
258 Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
259 for (KeyValue kv : older.getKeyValueUpdateSet()) {
260 kvSet.put(kv.getKey(), kv);
263 for (KeyValue kv : newer.getKeyValueUpdateSet()) {
264 kvSet.put(kv.getKey(), kv);
267 int64_t transactionSequenceNumber = newer.getTransactionSequenceNumber();
269 if (transactionSequenceNumber == -1) {
270 transactionSequenceNumber = older.getTransactionSequenceNumber();
273 Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
275 newCommit.setKVsMap(kvSet);