4 import java.util.HashMap;
6 import java.util.HashSet;
7 import java.nio.ByteBuffer;
11 private Map<Integer, CommitPart> parts = null;
12 private Set<Integer> missingParts = null;
13 private boolean isComplete = false;
14 private boolean hasLastPart = false;
15 private Set<KeyValue> keyValueUpdateSet = null;
16 private boolean isDead = false;
17 private long sequenceNumber = -1;
18 private long machineId = -1;
19 private long transactionSequenceNumber = -1;
21 private Set<IoTString> liveKeys = null;
24 parts = new HashMap<Integer, CommitPart>();
25 keyValueUpdateSet = new HashSet<KeyValue>();
27 liveKeys = new HashSet<IoTString>();
30 public Commit(long _sequenceNumber, long _machineId, long _transactionSequenceNumber) {
31 parts = new HashMap<Integer, CommitPart>();
32 keyValueUpdateSet = new HashSet<KeyValue>();
34 liveKeys = new HashSet<IoTString>();
36 sequenceNumber = _sequenceNumber;
37 machineId = _machineId;
38 transactionSequenceNumber = _transactionSequenceNumber;
43 public void addPartDecode(CommitPart newPart) {
46 // If dead then just kill this part and move on
51 CommitPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
53 if (previoslySeenPart != null) {
54 // Set dead the old one since the new one is a rescued version of this part
55 previoslySeenPart.setDead();
56 } else if (newPart.isLastPart()) {
57 missingParts = new HashSet<Integer>();
60 for (int i = 0; i < newPart.getPartNumber(); i++) {
61 if (parts.get(i) == null) {
67 if (!isComplete && hasLastPart) {
69 // We have seen this part so remove it from the set of missing parts
70 missingParts.remove(newPart.getPartNumber());
72 // Check if all the parts have been seen
73 if (missingParts.size() == 0) {
75 // We have all the parts
78 // Decode all the parts and create the key value guard and update sets
81 // Get the sequence number and arbitrator of this transaction
82 sequenceNumber = parts.get(0).getSequenceNumber();
83 machineId = parts.get(0).getMachineId();
84 transactionSequenceNumber = parts.get(0).getTransactionSequenceNumber();
89 public long getSequenceNumber() {
90 return sequenceNumber;
93 public long getTransactionSequenceNumber() {
94 return transactionSequenceNumber;
98 public Map<Integer, CommitPart> getParts() {
102 public void addKV(KeyValue kv) {
103 keyValueUpdateSet.add(kv);
104 liveKeys.add(kv.getKey());
107 public void invalidateKey(IoTString key) {
108 liveKeys.remove(key);
110 if (liveKeys.size() == 0) {
115 public Set<KeyValue> getKeyValueUpdateSet() {
116 return keyValueUpdateSet;
119 public int getNumberOfParts() {
123 public long getMachineId() {
127 public boolean isComplete() {
131 public boolean isLive() {
135 public void setDead() {
144 // Make all the parts of this transaction dead
145 for (Integer partNumber : parts.keySet()) {
146 CommitPart part = parts.get(partNumber);
151 public CommitPart getPart(int index) {
152 return parts.get(index);
155 public void createCommitParts() {
160 byte[] byteData = convertDataToBytes();
163 int commitPartCount = 0;
164 int currentPosition = 0;
165 int remaining = byteData.length;
167 while (remaining > 0) {
169 Boolean isLastPart = false;
170 // determine how much to copy
171 int copySize = CommitPart.MAX_NON_HEADER_SIZE;
172 if (remaining <= CommitPart.MAX_NON_HEADER_SIZE) {
173 copySize = remaining;
174 isLastPart = true; // last bit of data so last part
177 // Copy to a smaller version
178 byte[] partData = new byte[copySize];
179 System.arraycopy(byteData, currentPosition, partData, 0, copySize);
181 CommitPart part = new CommitPart(null, machineId, sequenceNumber, transactionSequenceNumber, commitPartCount, partData, isLastPart);
182 parts.put(part.getPartNumber(), part);
184 // Update position, count and remaining
185 currentPosition += copySize;
187 remaining -= copySize;
191 private void decodeCommitData() {
193 // Calculate the size of the data section
195 for (int i = 0; i < parts.keySet().size(); i++) {
196 CommitPart tp = parts.get(i);
197 dataSize += tp.getDataSize();
200 byte[] combinedData = new byte[dataSize];
201 int currentPosition = 0;
203 // Stitch all the data sections together
204 for (int i = 0; i < parts.keySet().size(); i++) {
205 CommitPart tp = parts.get(i);
206 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
207 currentPosition += tp.getDataSize();
211 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
213 // Decode how many key value pairs need to be decoded
214 int numberOfKVUpdates = bbDecode.getInt();
216 // Decode all the updates key values
217 for (int i = 0; i < numberOfKVUpdates; i++) {
218 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
219 keyValueUpdateSet.add(kv);
220 liveKeys.add(kv.getKey());
224 private byte[] convertDataToBytes() {
226 // Calculate the size of the data
227 int sizeOfData = Integer.BYTES; // Number of Update KV's
228 for (KeyValue kv : keyValueUpdateSet) {
229 sizeOfData += kv.getSize();
232 // Data handlers and storage
233 byte[] dataArray = new byte[sizeOfData];
234 ByteBuffer bbEncode = ByteBuffer.wrap(dataArray);
236 // Encode the size of the updates and guard sets
237 bbEncode.putInt(keyValueUpdateSet.size());
239 // Encode all the updates
240 for (KeyValue kv : keyValueUpdateSet) {
244 return bbEncode.array();