package iotcloud;
-import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
+import java.nio.ByteBuffer;
-class Transaction extends Entry {
+class Transaction {
- private long seqnum;
- private long machineid;
+ private Map<Integer, TransactionPart> parts = null;
+ private Set<Integer> missingParts = null;
+ private List<Integer> partsPendingSend = null;
+ private boolean isComplete = false;
+ private Set<KeyValue> keyValueGuardSet = null;
private Set<KeyValue> keyValueUpdateSet = null;
- private Guard guard;
+ private boolean isDead = false;
+ private long sequenceNumber = -1;
+ private long clientLocalSequenceNumber = -1;
+ private long arbitratorId = -1;
+ private long machineId = -1;
+ private Pair<Long, Long> transactionId = null;
- public Transaction(Slot slot, long _seqnum, long _machineid, Set<KeyValue> _keyValueUpdateSet, Guard _guard) {
- super(slot);
- seqnum = _seqnum;
- machineid = _machineid;
+ private int nextPartToSend = 0;
+ private boolean didSendAPartToServer = false;
+ private TransactionStatus transactionStatus = null;
+
+ public Transaction() {
+ parts = new HashMap<Integer, TransactionPart>();
+ keyValueGuardSet = new HashSet<KeyValue>();
keyValueUpdateSet = new HashSet<KeyValue>();
+ partsPendingSend = new ArrayList<Integer>();
+ }
+
+ public void addPartEncode(TransactionPart newPart) {
+ parts.put(newPart.getPartNumber(), newPart);
+ partsPendingSend.add(newPart.getPartNumber());
+
+ // Get the sequence number and other important information
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ isComplete = true;
+ }
+
+ public void addPartDecode(TransactionPart newPart) {
+
+ if (isDead) {
+ // If dead then just kill this part and move on
+ newPart.setDead();
+ return;
+ }
+
+ // Get the sequence number and other important information
+ sequenceNumber = newPart.getSequenceNumber();
+ arbitratorId = newPart.getArbitratorId();
+ transactionId = newPart.getTransactionId();
+ clientLocalSequenceNumber = newPart.getClientLocalSequenceNumber();
+ machineId = newPart.getMachineId();
+
+ TransactionPart previoslySeenPart = parts.put(newPart.getPartNumber(), newPart);
+
+ if (previoslySeenPart != null) {
+ // Set dead the old one since the new one is a rescued version of this part
+ previoslySeenPart.setDead();
+ } else if (newPart.isLastPart()) {
+ missingParts = new HashSet<Integer>();
+
+ for (int i = 0; i < newPart.getPartNumber(); i++) {
+ if (parts.get(i) == null) {
+ missingParts.add(i);
+ }
+ }
+ }
- for (KeyValue kv : _keyValueUpdateSet) {
- KeyValue kvCopy = kv.getCopy();
- keyValueUpdateSet.add(kvCopy);
+ if (!isComplete) {
+
+ // We have seen this part so remove it from the set of missing parts
+ missingParts.remove(newPart.getPartNumber());
+
+ // Check if all the parts have been seen
+ if (missingParts.size() == 0) {
+
+ // We have all the parts
+ isComplete = true;
+
+ // Decode all the parts and create the key value guard and update sets
+ decodeTransactionData();
+ }
}
+ }
- guard = _guard.getCopy();
+ public void addUpdateKV(KeyValue kv) {
+ keyValueUpdateSet.add(kv);
}
- public long getMachineID() {
- return machineid;
+ public void addGuardKV(KeyValue kv) {
+ keyValueGuardSet.add(kv);
}
+
public long getSequenceNumber() {
- return seqnum;
+ return sequenceNumber;
}
- public Set<KeyValue> getkeyValueUpdateSet() {
- return keyValueUpdateSet;
+ public void setSequenceNumber(long _sequenceNumber) {
+ sequenceNumber = _sequenceNumber;
+
+ for (Integer i : parts.keySet()) {
+ parts.get(i).setSequenceNumber(sequenceNumber);
+ }
+ }
+
+ public long getClientLocalSequenceNumber() {
+ return clientLocalSequenceNumber;
}
- public Guard getGuard() {
- return guard;
+ public Map<Integer, TransactionPart> getParts() {
+ return parts;
}
- public byte getType() {
- return Entry.TypeTransaction;
+
+ public boolean didSendAPartToServer() {
+ return didSendAPartToServer;
}
- public int getSize() {
- int size = 2 * Long.BYTES + Byte.BYTES; // seq, machine id, entry type
- size += Integer.BYTES; // number of KV's
+ public void resetNextPartToSend() {
+ nextPartToSend = 0;
+ }
- // Size of each KV
- for (KeyValue kv : keyValueUpdateSet) {
- size += kv.getSize();
+ public TransactionPart getNextPartToSend() {
+ if ((partsPendingSend.size() == 0) || (partsPendingSend.size() == nextPartToSend)) {
+ return null;
}
+ TransactionPart part = parts.get(partsPendingSend.get(nextPartToSend));
+ nextPartToSend++;
+ return part;
+ }
+
+ public void setTransactionStatus(TransactionStatus _transactionStatus) {
+ transactionStatus = _transactionStatus;
+ }
- // Size of the guard
- size += guard.getSize();
+ public TransactionStatus getTransactionStatus() {
+ return transactionStatus;
+ }
+
+ public void removeSentParts(List<Integer> sentParts) {
+ nextPartToSend = 0;
+ partsPendingSend.removeAll(sentParts);
+ didSendAPartToServer = true;
+ transactionStatus.setTransactionSequenceNumber(sequenceNumber);
+ }
+
+ public boolean didSendAllParts() {
+ return partsPendingSend.isEmpty();
+ }
+
+
+ public Set<KeyValue> getKeyValueUpdateSet() {
+ return keyValueUpdateSet;
+ }
+
+ public int getNumberOfParts() {
+ return parts.size();
+ }
+
+ public long getMachineId() {
+ return machineId;
+ }
+
+ public long getArbitrator() {
+ return arbitratorId;
+ }
+
+ public boolean isComplete() {
+ return isComplete;
+ }
- return size;
+ public Pair<Long, Long> getId() {
+ return transactionId;
}
+ public void setDead() {
+ if (isDead) {
+ // Already dead
+ return;
+ }
- public void encode(ByteBuffer bb) {
- bb.put(Entry.TypeTransaction);
- bb.putLong(seqnum);
- bb.putLong(machineid);
+ // Set dead
+ isDead = true;
- bb.putInt(keyValueUpdateSet.size());
- for (KeyValue kv : keyValueUpdateSet) {
- kv.encode(bb);
+ // Make all the parts of this transaction dead
+ for (Integer partNumber : parts.keySet()) {
+ TransactionPart part = parts.get(partNumber);
+ part.setDead();
}
+ }
- guard.encode(bb);
+ public TransactionPart getPart(int index) {
+ return parts.get(index);
}
- static Entry decode(Slot slot, ByteBuffer bb) {
- long seqnum = bb.getLong();
- long machineid = bb.getLong();
- int numberOfKeys = bb.getInt();
+ private void decodeTransactionData() {
+
+ // Calculate the size of the data section
+ int dataSize = 0;
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ dataSize += tp.getDataSize();
+ }
+
+ byte[] combinedData = new byte[dataSize];
+ int currentPosition = 0;
- Set<KeyValue> kvSet = new HashSet<KeyValue>();
- for (int i = 0; i < numberOfKeys; i++) {
- KeyValue kv = KeyValue.decode(bb);
- kvSet.add(kv);
+ // Stitch all the data sections together
+ for (int i = 0; i < parts.keySet().size(); i++) {
+ TransactionPart tp = parts.get(i);
+ System.arraycopy(tp.getData(), 0, combinedData, currentPosition, tp.getDataSize());
+ currentPosition += tp.getDataSize();
}
- Guard guard = Guard.decode(bb);
+ // Decoder Object
+ ByteBuffer bbDecode = ByteBuffer.wrap(combinedData);
+
+ // Decode how many key value pairs need to be decoded
+ int numberOfKVGuards = bbDecode.getInt();
+ int numberOfKVUpdates = bbDecode.getInt();
+
+ // Decode all the guard key values
+ for (int i = 0; i < numberOfKVGuards; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueGuardSet.add(kv);
+ }
- return new Transaction(slot, seqnum, machineid, kvSet, guard);
+ // Decode all the updates key values
+ for (int i = 0; i < numberOfKVUpdates; i++) {
+ KeyValue kv = (KeyValue)KeyValue.decode(bbDecode);
+ keyValueUpdateSet.add(kv);
+ }
}
- public Entry getCopy(Slot s) {
- return new Transaction(s, seqnum, machineid, keyValueUpdateSet, guard);
+ public boolean evaluateGuard(Map<IoTString, KeyValue> committedKeyValueTable, Map<IoTString, KeyValue> speculatedKeyValueTable, Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable) {
+ for (KeyValue kvGuard : keyValueGuardSet) {
+
+ // First check if the key is in the speculative table, this is the value of the latest assumption
+ KeyValue kv = null;
+
+ // If we have a speculation table then use it first
+ if (pendingTransactionSpeculatedKeyValueTable != null) {
+ kv = pendingTransactionSpeculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ // If we have a speculation table then use it first
+ if ((kv == null) && (speculatedKeyValueTable != null)) {
+ kv = speculatedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kv == null) {
+ // if it is not in the speculative table then check the committed table and use that
+ // value as our latest assumption
+ kv = committedKeyValueTable.get(kvGuard.getKey());
+ }
+
+ if (kvGuard.getValue() != null) {
+ if ((kv == null) || (!kvGuard.getValue().equals(kv.getValue()))) {
+ return false;
+ }
+ } else {
+ if (kv != null) {
+ return false;
+ }
+ }
+ }
+ return true;
}
}
\ No newline at end of file