6 import java.util.ArrayList;
7 import java.util.HashMap;
8 import java.util.HashSet;
9 import java.nio.ByteBuffer;
13 private Map<Integer, TransactionPart> parts = null;
14 private Set<Integer> missingParts = null;
15 private List<Integer> partsPendingSend = null;
16 private boolean isComplete = false;
17 private boolean hasLastPart = false;
18 private Set<KeyValue> keyValueGuardSet = null;
19 private Set<KeyValue> keyValueUpdateSet = null;
20 private boolean isDead = false;
21 private long sequenceNumber = -1;
22 private long clientLocalSequenceNumber = -1;
23 private long arbitratorId = -1;
24 private long machineId = -1;
25 private Pair<Long, Long> transactionId = null;
27 private int nextPartToSend = 0;
28 private boolean didSendAPartToServer = false;
30 private TransactionStatus transactionStatus = null;
32 private boolean hadServerFailure = false;
34 public Transaction() {
35 parts = new HashMap<Integer, TransactionPart>();
36 keyValueGuardSet = new HashSet<KeyValue>();
37 keyValueUpdateSet = new HashSet<KeyValue>();
38 partsPendingSend = new ArrayList<Integer>();
41 public void addPartEncode(TransactionPart newPart) {
42 parts.put(newPart.getPartNumber(), newPart);
43 partsPendingSend.add(newPart.getPartNumber());
45 // Get the sequence number and other important information
46 sequenceNumber = newPart.getSequenceNumber();
47 arbitratorId = newPart.getArbitratorId();
48 transactionId = newPart.getTransactionId();
49 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
50 machineId = newPart.getMachineId();
55 public void addPartDecode(TransactionPart newPart) {
58 // If dead then just kill this part and move on
63 // Get the sequence number and other important information
64 sequenceNumber = newPart.getSequenceNumber();
65 arbitratorId = newPart.getArbitratorId();
66 transactionId = newPart.getTransactionId();
67 clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
68 machineId = newPart.getMachineId();
70 TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
72 if (previoslySeenPart != null) {
73 // Set dead the old one since the new one is a rescued version of this part
74 previoslySeenPart.setDead();
75 } else if (newPart.isLastPart()) {
76 missingParts = new HashSet<Integer>();
79 for (int i = 0; i < newPart.getPartNumber(); i++) {
80 if (parts.get(i) == null) {
86 if (!isComplete && hasLastPart) {
88 // We have seen this part so remove it from the set of missing parts
89 missingParts.remove(newPart.getPartNumber());
91 // Check if all the parts have been seen
92 if (missingParts.size() == 0) {
94 // We have all the parts
97 // Decode all the parts and create the key value guard and update sets
98 decodeTransactionData();
103 public void addUpdateKV(KeyValue kv) {
104 keyValueUpdateSet.add(kv);
107 public void addGuardKV(KeyValue kv) {
108 keyValueGuardSet.add(kv);
112 public long getSequenceNumber() {
113 return sequenceNumber;
116 public void setSequenceNumber(long _sequenceNumber) {
117 sequenceNumber = _sequenceNumber;
119 for (Integer i : parts.keySet()) {
120 parts.get(i).setSequenceNumber(sequenceNumber);
124 public long getClientLocalSequenceNumber() {
125 return clientLocalSequenceNumber;
128 public Map<Integer, TransactionPart> getParts() {
132 public boolean didSendAPartToServer() {
133 return didSendAPartToServer;
136 public void resetNextPartToSend() {
140 public TransactionPart getNextPartToSend() {
141 if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
144 TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
150 public void setServerFailure() {
151 hadServerFailure = true;
154 public boolean getServerFailure() {
155 return hadServerFailure;
159 public void resetServerFailure() {
160 hadServerFailure = false;
164 public void setTransactionStatus(TransactionStatus _transactionStatus) {
165 transactionStatus = _transactionStatus;
168 public TransactionStatus getTransactionStatus() {
169 return transactionStatus;
172 public void removeSentParts(List<Integer> sentParts) {
174 if(partsPendingSend.removeAll(sentParts))
176 didSendAPartToServer = true;
177 transactionStatus.setTransactionSequenceNumber(sequenceNumber);
181 public boolean didSendAllParts() {
182 return partsPendingSend.isEmpty();
185 public Set<KeyValue> getKeyValueUpdateSet() {
186 return keyValueUpdateSet;
189 public int getNumberOfParts() {
193 public long getMachineId() {
197 public long getArbitrator() {
201 public boolean isComplete() {
205 public Pair<Long, Long> getId() {
206 return transactionId;
209 public void setDead() {
218 // Make all the parts of this transaction dead
219 for (Integer partNumber : parts.keySet()) {
220 TransactionPart part = parts.get(partNumber);
225 public TransactionPart getPart(int index) {
226 return parts.get(index);
229 private void decodeTransactionData() {
231 // Calculate the size of the data section
233 for (int i = 0; i < parts.keySet().size(); i++) {
234 TransactionPart tp = parts.get(i);
235 dataSize += tp.getDataSize();
238 byte[] combinedData = new byte[dataSize];
239 int currentPosition = 0;
241 // Stitch all the data sections together
242 for (int i = 0; i < parts.keySet().size(); i++) {
243 TransactionPart tp = parts.get(i);
244 System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
245 currentPosition += tp.getDataSize();
249 ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
251 // Decode how many key value pairs need to be decoded
252 int numberOfKVGuards = bbDecode.getInt();
253 int numberOfKVUpdates = bbDecode.getInt();
255 // Decode all the guard key values
256 for (int i = 0; i < numberOfKVGuards; i++) {
257 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
258 keyValueGuardSet.add(kv);
261 // Decode all the updates key values
262 for (int i = 0; i < numberOfKVUpdates; i++) {
263 KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
264 keyValueUpdateSet.add(kv);
268 public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
269 for (KeyValue kvGuard : keyValueGuardSet) {
271 // First check if the key is in the speculative table, this is the value of the latest assumption
274 // If we have a speculation table then use it first
275 if (pendingTransactionSpeculatedKeyValueTable != null) {
276 kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
279 // If we have a speculation table then use it first
280 if ((kv == null) && (speculatedKeyValueTable != null)) {
281 kv = speculatedKeyValueTable.get(kvGuard.getKey());
285 // if it is not in the speculative table then check the committed table and use that
286 // value as our latest assumption
287 kv = committedKeyValueTable.get(kvGuard.getKey());
290 if (kvGuard.getValue() != null) {
291 if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
295 System.out.println(kvGuard.getValue() + " " + kv.getValue());
297 System.out.println(kvGuard.getValue() + " " + kv);