Block Chain Transactions, Commits multiple parts version
[iotcloud.git] / version2 / src / java / iotcloud / Commit.java
1 package iotcloud;
2
3 import java.util.Map;
4 import java.util.HashMap;
5 import java.util.Set;
6 import java.util.HashSet;
7 import java.nio.ByteBuffer;
8
9 class Commit {
10
11     private Map<Integer, CommitPart> parts = null;
12     private Set<Integer> missingParts = null;
13     private boolean isComplete = false;
14     private boolean hasLastPart = false;
15     private Set<KeyValue> keyValueUpdateSet = null;
16     private boolean isDead = false;
17     private long sequenceNumber = -1;
18     private long machineId = -1;
19     private long transactionSequenceNumber = -1;
20
21     private Set<IoTString> liveKeys = null;
22
23     public Commit() {
24         parts = new HashMap<Integer, CommitPart>();
25         keyValueUpdateSet = new HashSet<KeyValue>();
26
27         liveKeys = new HashSet<IoTString>();
28     }
29
30     public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
31         parts = new HashMap<Integer, CommitPart>();
32         keyValueUpdateSet = new HashSet<KeyValue>();
33
34         liveKeys = new HashSet<IoTString>();
35
36         sequenceNumber = _sequenceNumber;
37         machineId = _machineId;
38         transactionSequenceNumber = _transactionSequenceNumber;
39         isComplete = true;
40     }
41
42
43     public void addPartDecode(CommitPart newPart) {
44
45         if (isDead) {
46             // If dead then just kill this part and move on
47             newPart.setDead();
48             return;
49         }
50
51         CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
52
53         if (previoslySeenPart != null) {
54             // Set dead the old one since the new one is a rescued version of this part
55             previoslySeenPart.setDead();
56         } else if (newPart.isLastPart()) {
57             missingParts = new HashSet<Integer>();
58             hasLastPart = true;
59
60             for (int i = 0; i < newPart.getPartNumber(); i++) {
61                 if (parts.get(i) == null) {
62                     missingParts.add(i);
63                 }
64             }
65         }
66
67         if (!isComplete && hasLastPart) {
68
69             // We have seen this part so remove it from the set of missing parts
70             missingParts.remove(newPart.getPartNumber());
71
72             // Check if all the parts have been seen
73             if (missingParts.size() == 0) {
74
75                 // We have all the parts
76                 isComplete = true;
77
78                 // Decode all the parts and create the key value guard and update sets
79                 decodeCommitData();
80
81                 // Get the sequence number and arbitrator of this transaction
82                 sequenceNumber = parts.get(0).getSequenceNumber();
83                 machineId = parts.get(0).getMachineId();
84                 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
85             }
86         }
87     }
88
89     public long getSequenceNumber() {
90         return sequenceNumber;
91     }
92
93     public long getTransactionSequenceNumber() {
94         return transactionSequenceNumber;
95     }
96
97
98     public Map<Integer, CommitPart> getParts() {
99         return parts;
100     }
101
102     public void addKV(KeyValue kv) {
103         keyValueUpdateSet.add(kv);
104         liveKeys.add(kv.getKey());
105     }
106
107     public void invalidateKey(IoTString key) {
108         liveKeys.remove(key);
109
110         if (liveKeys.size() == 0) {
111             setDead();
112         }
113     }
114
115     public Set<KeyValue> getKeyValueUpdateSet() {
116         return keyValueUpdateSet;
117     }
118
119     public int getNumberOfParts() {
120         return parts.size();
121     }
122
123     public long getMachineId() {
124         return machineId;
125     }
126
127     public boolean isComplete() {
128         return isComplete;
129     }
130
131     public boolean isLive() {
132         return !isDead;
133     }
134
135     public void setDead() {
136         if (isDead) {
137             // Already dead
138             return;
139         }
140
141         // Set dead
142         isDead = true;
143
144         // Make all the parts of this transaction dead
145         for (Integer partNumber : parts.keySet()) {
146             CommitPart part = parts.get(partNumber);
147             part.setDead();
148         }
149     }
150
151     public CommitPart getPart(int index) {
152         return parts.get(index);
153     }
154
155     public void createCommitParts() {
156
157         parts.clear();
158
159         // Convert to bytes
160         byte[] byteData = convertDataToBytes();
161
162
163         int commitPartCount = 0;
164         int currentPosition = 0;
165         int remaining = byteData.length;
166
167         while (remaining > 0) {
168
169             Boolean isLastPart = false;
170             // determine how much to copy
171             int copySize = CommitPart.MAX_NON_HEADER_SIZE;
172             if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
173                 copySize = remaining;
174                 isLastPart = true; // last bit of data so last part
175             }
176
177             // Copy to a smaller version
178             byte[] partData = new byte[copySize];
179             System.arraycopy(byteData, currentPosition, partData, 0, copySize);
180
181             CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
182             parts.put(part.getPartNumber(), part);
183
184             // Update position, count and remaining
185             currentPosition += copySize;
186             commitPartCount++;
187             remaining -= copySize;
188         }
189     }
190
191     private void decodeCommitData() {
192
193         // Calculate the size of the data section
194         int dataSize = 0;
195         for (int i = 0; i < parts.keySet().size(); i++) {
196             CommitPart tp = parts.get(i);
197             dataSize += tp.getDataSize();
198         }
199
200         byte[] combinedData = new byte[dataSize];
201         int currentPosition = 0;
202
203         // Stitch all the data sections together
204         for (int i = 0; i < parts.keySet().size(); i++) {
205             CommitPart tp = parts.get(i);
206             System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
207             currentPosition += tp.getDataSize();
208         }
209
210         // Decoder Object
211         ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
212
213         // Decode how many key value pairs need to be decoded
214         int numberOfKVUpdates = bbDecode.getInt();
215
216         // Decode all the updates key values
217         for (int i = 0; i < numberOfKVUpdates; i++) {
218             KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
219             keyValueUpdateSet.add(kv);
220             liveKeys.add(kv.getKey());
221         }
222     }
223
224     private byte[] convertDataToBytes() {
225
226         // Calculate the size of the data
227         int sizeOfData = Integer.BYTES; // Number of Update KV's
228         for (KeyValue kv : keyValueUpdateSet) {
229             sizeOfData += kv.getSize();
230         }
231
232         // Data handlers and storage
233         byte[] dataArray = new byte[sizeOfData];
234         ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
235
236         // Encode the size of the updates and guard sets
237         bbEncode.putInt(keyValueUpdateSet.size());
238
239         // Encode all the updates
240         for (KeyValue kv : keyValueUpdateSet) {
241             kv.encode(bbEncode);
242         }
243
244         return bbEncode.array();
245     }
246 }