*/
-class Abort extends Entry{
+class Abort extends Entry {
private long transactionClientLocalSequenceNumber = -1;
private long transactionSequenceNumber = -1;
private long sequenceNumber = -1;
private long transactionMachineId = -1;
private long transactionArbitrator = -1;
+ private long arbitratorLocalSequenceNumber = -1;
private Pair<Long, Long> abortId = null;
- public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber , long _transactionMachineId, long _transactionArbitrator) {
+ public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber , long _transactionMachineId, long _transactionArbitrator, long _arbitratorLocalSequenceNumber) {
super(slot);
transactionClientLocalSequenceNumber = _transactionClientLocalSequenceNumber;
transactionSequenceNumber = _transactionSequenceNumber;
transactionMachineId = _transactionMachineId;
transactionArbitrator = _transactionArbitrator;
+ arbitratorLocalSequenceNumber = _arbitratorLocalSequenceNumber;
abortId = new Pair<Long, Long>(transactionMachineId, transactionClientLocalSequenceNumber);
}
- public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber, long _sequenceNumber , long _transactionMachineId, long _transactionArbitrator) {
+ public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber, long _sequenceNumber , long _transactionMachineId, long _transactionArbitrator, long _arbitratorLocalSequenceNumber) {
super(slot);
transactionClientLocalSequenceNumber = _transactionClientLocalSequenceNumber;
transactionSequenceNumber = _transactionSequenceNumber;
sequenceNumber = _sequenceNumber;
transactionMachineId = _transactionMachineId;
transactionArbitrator = _transactionArbitrator;
+ arbitratorLocalSequenceNumber = _arbitratorLocalSequenceNumber;
+
abortId = new Pair<Long, Long>(transactionMachineId, transactionClientLocalSequenceNumber);
}
return transactionClientLocalSequenceNumber;
}
+ public long getArbitratorLocalSequenceNumber() {
+ return arbitratorLocalSequenceNumber;
+ }
+
+
public void setSlot(Slot s) {
parentslot = s;
}
long sequenceNumber = bb.getLong();
long transactionMachineId = bb.getLong();
long transactionArbitrator = bb.getLong();
- return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator);
+ long arbitratorLocalSequenceNumber = bb.getLong();
+
+ return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber);
}
public void encode(ByteBuffer bb) {
bb.putLong(sequenceNumber);
bb.putLong(transactionMachineId);
bb.putLong(transactionArbitrator);
+ bb.putLong(arbitratorLocalSequenceNumber);
}
public int getSize() {
- return (4 * Long.BYTES) + Byte.BYTES;
+ return (6 * Long.BYTES) + Byte.BYTES;
}
public byte getType() {
}
public Entry getCopy(Slot s) {
- return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator);
+ return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber);
}
}
\ No newline at end of file
--- /dev/null
+package iotcloud;
+
+import java.util.Set;
+import java.util.HashSet;
+
+import java.util.List;
+import java.util.ArrayList;
+
+
+class ArbitrationRound {
+
+ public static final int MAX_PARTS = 10;
+
+ Set<Abort> abortsBefore = null;
+ List<Entry> parts = null;
+ Commit commit = null;
+ int currentSize = 0;
+ boolean didSendPart = false;
+ boolean didGenerateParts = false;
+
+ public ArbitrationRound(Commit _commit, Set<Abort> _abortsBefore) {
+
+ parts = new ArrayList<Entry>();
+
+ commit = _commit;
+ abortsBefore = _abortsBefore;
+
+
+ if (commit != null) {
+ commit.createCommitParts();
+ currentSize += commit.getNumberOfParts();
+ }
+
+ currentSize += abortsBefore.size();
+ }
+
+ public void generateParts() {
+ if (didGenerateParts) {
+ return;
+ }
+ parts = new ArrayList<Entry>(abortsBefore);
+ if (commit != null) {
+ parts.addAll(commit.getParts().values());
+ }
+ }
+
+
+ public List<Entry> getParts() {
+ return parts;
+ }
+
+ public void removeParts(List<Entry> removeParts) {
+ parts.removeAll(removeParts);
+ didSendPart = true;
+ }
+
+ public boolean isDoneSending() {
+ if ((commit == null) && abortsBefore.isEmpty()) {
+ return true;
+ }
+
+ return parts.isEmpty();
+ }
+
+ public Commit getCommit() {
+ return commit;
+ }
+
+ public void setCommit(Commit _commit) {
+ if (commit != null) {
+ currentSize -= commit.getNumberOfParts();
+ }
+ commit = _commit;
+
+ if (commit != null) {
+ currentSize += commit.getNumberOfParts();
+ }
+ }
+
+ public void addAbort(Abort abort) {
+ abortsBefore.add(abort);
+ currentSize++;
+ }
+
+ public void addAborts(Set<Abort> aborts) {
+ abortsBefore.addAll(aborts);
+ currentSize += aborts.size();
+ }
+
+
+ public Set<Abort> getAborts() {
+ return abortsBefore;
+ }
+
+ public int getAbortsCount() {
+ return abortsBefore.size();
+ }
+
+ public int getCurrentSize() {
+ return currentSize;
+ }
+
+ public boolean isFull() {
+ return currentSize >= MAX_PARTS;
+ }
+
+ public boolean didSendPart() {
+ return didSendPart;
+ }
+}
\ No newline at end of file
class CloudComm {
- String baseurl;
- Cipher encryptCipher;
- Cipher decryptCipher;
- Mac mac;
- String password;
- SecureRandom random;
- static final int SALT_SIZE = 8;
- static final int TIMEOUT_MILLIS = 100;
- byte salt[];
- Table table;
+ private static final int SALT_SIZE = 8;
+ private static final int TIMEOUT_MILLIS = 100;
+
+ private String baseurl;
+ private Cipher encryptCipher;
+ private Cipher decryptCipher;
+ private Mac mac;
+ private String password;
+ private SecureRandom random;
+ private byte salt[];
+ private Table table;
+ private int listeningPort = -1;
+ private Thread localServerThread = null;
+ private boolean doEnd = false;
/**
* Empty Constructor needed for child class.
/**
* Constructor for actual use. Takes in the url and password.
*/
- CloudComm(Table _table, String _baseurl, String _password) {
+ CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) {
this.table = _table;
this.baseurl = _baseurl;
this.password = _password;
this.random = new SecureRandom();
+ this.listeningPort = _listeningPort;
+
+ if (this.listeningPort > 0) {
+ localServerThread = new Thread(new Runnable() {
+ public void run() {
+ localServerWorkerFunction();
+ }
+ });
+ localServerThread.start();
+ }
}
/**
*/
private SecretKeySpec initKey() {
try {
- PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), salt, 65536, 128);
+ PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(),
+ salt,
+ 65536,
+ 128);
SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
return new SecretKeySpec(tmpkey.getEncoded(), "AES");
} catch (Exception e) {
* Inits the HMAC generator.
*/
private void initCrypt() {
+
+ if (password == null) {
+ return;
+ }
+
try {
SecretKeySpec key = initKey();
password = null; // drop password
}
public void setSalt() throws ServerException {
+
+ if (salt != null) {
+ // Salt already sent to server so dont set it again
+ return;
+ }
+ byte[] saltTmp = new byte[SALT_SIZE];
+ random.nextBytes(saltTmp);
+
+ URL url = null;
+ URLConnection con = null;
+ HttpURLConnection http = null;
+
try {
- byte[] saltTmp = new byte[SALT_SIZE];
- random.nextBytes(saltTmp);
- URL url = new URL(baseurl + "?req=setsalt");
- URLConnection con = url.openConnection();
- HttpURLConnection http = (HttpURLConnection) con;
+ url = new URL(baseurl + "?req=setsalt");
+ con = url.openConnection();
+ http = (HttpURLConnection) con;
http.setRequestMethod("POST");
http.setFixedLengthStreamingMode(saltTmp.length);
http.setDoOutput(true);
int responsecode = http.getResponseCode();
if (responsecode != HttpURLConnection.HTTP_OK) {
// TODO: Remove this print
- // System.out.println(responsecode);
+ System.out.println(responsecode);
throw new Error("Invalid response");
}
- salt = saltTmp;
} catch (Exception e) {
throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout);
}
- initCrypt();
+
+
+ try {
+ InputStream is = http.getInputStream();
+ DataInputStream dis = new DataInputStream(is);
+ // byte [] tmp = new byte[1];
+ byte tmp = dis.readByte();
+
+ if (tmp == 0) {
+ salt = saltTmp;
+ initCrypt();
+ } else {
+ getSalt(); // there was already a salt so we need to get it
+ }
+
+ } catch (SocketTimeoutException e) {
+ throw new ServerException("setSalt failed", ServerException.TypeInputTimeout);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("setSlot failed");
+ }
}
private void getSalt() throws ServerException {
e.printStackTrace();
throw new Error("getSlot failed");
}
-
}
/*
OutputStream os = http.getOutputStream();
os.write(bytes);
os.flush();
+
+ // System.out.println("Bytes Sent: " + bytes.length);
} catch (SocketTimeoutException e) {
throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout);
} catch (Exception e) {
}
}
-
/**
* Request the server to send all slots with the given
* sequencenumber or newer.
http.setRequestMethod("POST");
http.setConnectTimeout(TIMEOUT_MILLIS);
http.setReadTimeout(TIMEOUT_MILLIS);
-
-
http.connect();
- } catch (ServerException e) {
+ } catch (SocketTimeoutException e) {
throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout);
+ } catch (ServerException e) {
+ throw e;
} catch (Exception e) {
e.printStackTrace();
throw new Error("getSlots failed");
throw new Error("Bad Response: " + new String(resptype));
else
return processSlots(dis);
- } catch (ServerException e) {
+ } catch (SocketTimeoutException e) {
throw new ServerException("getSlots failed", ServerException.TypeInputTimeout);
} catch (Exception e) {
e.printStackTrace();
private Slot[] processSlots(DataInputStream dis) throws Exception {
int numberofslots = dis.readInt();
int[] sizesofslots = new int[numberofslots];
+
+
+ // System.out.println("number of slots: " + numberofslots);
+
+
+
Slot[] slots = new Slot[numberofslots];
for (int i = 0; i < numberofslots; i++)
sizesofslots[i] = dis.readInt();
for (int i = 0; i < numberofslots; i++) {
+
+ // System.out.println("Size of slot: " + sizesofslots[i]);
+
byte[] data = new byte[sizesofslots[i]];
dis.readFully(data);
dis.close();
return slots;
}
+
+ public byte[] sendLocalData(byte[] sendData, String host, int port) {
+
+ if (salt == null) {
+ return null;
+ }
+ try {
+ // Encrypt the data for sending
+ byte[] encryptedData = encryptCipher.doFinal(sendData);
+
+ // Open a TCP socket connection to a local device
+ Socket socket = new Socket(host, port);
+ socket.setReuseAddress(true);
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+
+ // Send data to output (length of data, the data)
+ output.writeInt(encryptedData.length);
+ output.write(encryptedData, 0, encryptedData.length);
+ output.flush();
+
+ int lengthOfReturnData = input.readInt();
+ byte[] returnData = new byte[lengthOfReturnData];
+ input.readFully(returnData);
+ returnData = decryptCipher.doFinal(returnData);
+
+ // We are dont with this socket
+ socket.close();
+
+ return returnData;
+ } catch (SocketTimeoutException e) {
+
+ } catch (BadPaddingException e) {
+
+ } catch (IllegalBlockSizeException e) {
+
+ } catch (UnknownHostException e) {
+
+ } catch (IOException e) {
+
+ }
+
+ return null;
+ }
+
+ private void localServerWorkerFunction() {
+
+ ServerSocket inputSocket = null;
+
+ try {
+ // Local server socket
+ inputSocket = new ServerSocket(listeningPort);
+ inputSocket.setReuseAddress(true);
+ inputSocket.setSoTimeout(TIMEOUT_MILLIS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server setup failure...");
+ }
+
+ while (!doEnd) {
+
+ try {
+ // Accept incoming socket
+ Socket socket = inputSocket.accept();
+
+ DataInputStream input = new DataInputStream(socket.getInputStream());
+ DataOutputStream output = new DataOutputStream(socket.getOutputStream());
+
+ // Get the encrypted data from the server
+ int dataSize = input.readInt();
+ byte[] readData = new byte[dataSize];
+ input.readFully(readData);
+
+ // Decrypt the data
+ readData = decryptCipher.doFinal(readData);
+
+ // Process the data
+ byte[] sendData = table.acceptDataFromLocal(readData);
+
+ // Encrypt the data for sending
+ sendData = encryptCipher.doFinal(sendData);
+
+ // Send data to output (length of data, the data)
+ output.writeInt(sendData.length);
+ output.write(sendData, 0, sendData.length);
+ output.flush();
+
+ // close the socket
+ socket.close();
+ } catch (SocketTimeoutException e) {
+
+ } catch (BadPaddingException e) {
+
+ } catch (IllegalBlockSizeException e) {
+
+ } catch (UnknownHostException e) {
+
+ } catch (IOException e) {
+
+ }
+ }
+
+ if (inputSocket != null) {
+ try {
+ inputSocket.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local server close failure...");
+ }
+ }
+ }
+
+ public void close() {
+ doEnd = true;
+
+ try {
+ localServerThread.join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Error("Local Server thread join issue...");
+ }
+
+ System.out.println("Done Closing");
+ }
+
+ protected void finalize() throws Throwable {
+ try {
+ close(); // close open files
+ } finally {
+ super.finalize();
+ }
+ }
+
}
return transactionSequenceNumber;
}
-
public Map<Integer, CommitPart> getParts() {
return parts;
}
return bbEncode.array();
}
+
+ private void setKVsMap(Map<IoTString, KeyValue> newKVs) {
+ keyValueUpdateSet.clear();
+ liveKeys.clear();
+
+ keyValueUpdateSet.addAll(newKVs.values());
+ liveKeys.addAll(newKVs.keySet());
+
+ }
+
+
+ public static Commit merge(Commit newer, Commit older, long newSequenceNumber) {
+
+ if (older == null) {
+ return newer;
+ } else if (newer == null) {
+ return older;
+ }
+
+ Map<IoTString, KeyValue> kvSet = new HashMap<IoTString, KeyValue>();
+ for (KeyValue kv : older.getKeyValueUpdateSet()) {
+ kvSet.put(kv.getKey(), kv);
+ }
+
+ for (KeyValue kv : newer.getKeyValueUpdateSet()) {
+ kvSet.put(kv.getKey(), kv);
+ }
+
+ long transactionSequenceNumber = newer.getTransactionSequenceNumber();
+
+ if (transactionSequenceNumber == -1) {
+ transactionSequenceNumber = older.getTransactionSequenceNumber();
+ }
+
+ Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber);
+
+ newCommit.setKVsMap(kvSet);
+
+ return newCommit;
+ }
}
\ No newline at end of file
return sequenceNumber;
}
- public void setSequenceNumber(long _sequenceNumber) {
- sequenceNumber = _sequenceNumber;
- }
-
static Entry decode(Slot s, ByteBuffer bb) {
long machineId = bb.getLong();
long sequenceNumber = bb.getLong();
import java.util.HashSet;
import java.util.ArrayList;
import java.util.Collections;
+import java.nio.ByteBuffer;
/**
* IoTTable data structure. Provides client interface.
static final double RESIZE_THRESHOLD = 0.75;
static final int REJECTED_THRESHOLD = 5;
-
/* Helper Objects */
private SlotBuffer buffer = null;
private CloudComm cloud = null;
private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- private long localCommitSequenceNumber = 0;
+ private long localArbitrationSequenceNumber = 0;
+ private boolean hadPartialSendToServer = false;
+ private boolean attemptedToSendToServer = false;
/* Data Structures */
private Map<IoTString, KeyValue> committedKeyValueTable = null; // Table of committed key value pairs
private Map<IoTString, KeyValue> speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value
private Map<IoTString, KeyValue> pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
-
private Map<IoTString, NewKey> liveNewKeyTable = null; // Table of live new keys
private HashMap<Long, Pair<Long, Liveness>> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
private HashMap<Long, HashSet<RejectedMessage>> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet
private Map<IoTString, Commit> liveCommitsByKeyTable = null;
private Map<Long, Long> lastCommitSeenSequenceNumberByArbitratorTable = null;
private Vector<Long> rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server
-
private List<Transaction> pendingTransactionQueue = null;
- private List<Entry> pendingSendArbitrationEntries = null;
+ private List<ArbitrationRound> pendingSendArbitrationRounds = null;
private List<Entry> pendingSendArbitrationEntriesToDelete = null;
private Map<Transaction, List<Integer>> transactionPartsSent = null;
private Map<Long, TransactionStatus> outstandingTransactionStatus = null;
+ private Map<Long, Abort> liveAbortsGeneratedByLocal = null;
+ private Set<Pair<Long, Long>> offlineTransactionsCommittedAndAtServer = null;
+ private Map<Long, Pair<String, Integer>> localCommunicationTable = null;
+ private Map<Long, Long> lastTransactionSeenFromMachineFromServer = null;
+ private Map<Long, Long> lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null;
- public Table(String baseurl, String password, long _localMachineId) {
+ public Table(String baseurl, String password, long _localMachineId, int listeningPort) {
localMachineId = _localMachineId;
- cloud = new CloudComm(this, baseurl, password);
+ cloud = new CloudComm(this, baseurl, password, listeningPort);
init();
}
lastCommitSeenSequenceNumberByArbitratorTable = new HashMap<Long, Long>();
rejectedSlotList = new Vector<Long>();
pendingTransactionQueue = new ArrayList<Transaction>();
- pendingSendArbitrationEntries = new ArrayList<Entry>();
pendingSendArbitrationEntriesToDelete = new ArrayList<Entry>();
transactionPartsSent = new HashMap<Transaction, List<Integer>>();
outstandingTransactionStatus = new HashMap<Long, TransactionStatus>();
+ liveAbortsGeneratedByLocal = new HashMap<Long, Abort>();
+ offlineTransactionsCommittedAndAtServer = new HashSet<Pair<Long, Long>>();
+ localCommunicationTable = new HashMap<Long, Pair<String, Integer>>();
+ lastTransactionSeenFromMachineFromServer = new HashMap<Long, Long>();
+ pendingSendArbitrationRounds = new ArrayList<ArbitrationRound>();
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap<Long, Long>();
+
// Other init stuff
numberOfSlots = buffer.capacity();
setResizeThreshold();
}
+ // TODO: delete method
+ public synchronized void printSlots() {
+ long o = buffer.getOldestSeqNum();
+ long n = buffer.getNewestSeqNum();
+
+ int[] types = new int[10];
+
+ int num = 0;
+
+ int livec = 0;
+ int deadc = 0;
+ for (long i = o; i < (n + 1); i++) {
+ Slot s = buffer.getSlot(i);
+
+ Vector<Entry> entries = s.getEntries();
+
+ for (Entry e : entries) {
+ if (e.isLive()) {
+ int type = e.getType();
+ types[type] = types[type] + 1;
+ num++;
+ livec++;
+ } else {
+ deadc++;
+ }
+ }
+ }
+
+ for (int i = 0; i < 10; i++) {
+ System.out.println(i + " " + types[i]);
+ }
+ System.out.println("Live count: " + livec);
+ System.out.println("Dead count: " + deadc);
+ System.out.println("Old: " + o);
+ System.out.println("New: " + n);
+ System.out.println("Size: " + buffer.size());
+
+ // List<IoTString> strList = new ArrayList<IoTString>();
+ // for (int i = 0; i < 100; i++) {
+ // String keyA = "a" + i;
+ // String keyB = "b" + i;
+ // String keyC = "c" + i;
+ // String keyD = "d" + i;
+
+ // IoTString iKeyA = new IoTString(keyA);
+ // IoTString iKeyB = new IoTString(keyB);
+ // IoTString iKeyC = new IoTString(keyC);
+ // IoTString iKeyD = new IoTString(keyD);
+
+ // strList.add(iKeyA);
+ // strList.add(iKeyB);
+ // strList.add(iKeyC);
+ // strList.add(iKeyD);
+ // }
+
+
+ // for (Long l : commitMap.keySet()) {
+ // for (Long l2 : commitMap.get(l).keySet()) {
+ // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) {
+ // strList.remove(kv.getKey());
+ // System.out.print(kv.getKey() + " ");
+ // }
+ // }
+ // }
+
+ // System.out.println();
+ // System.out.println();
+
+ // for (IoTString s : strList) {
+ // System.out.print(s + " ");
+ // }
+ // System.out.println();
+ // System.out.println(strList.size());
+ }
+
/**
* Initialize the table by inserting a table status as the first entry into the table status
* also initialize the crypto stuff.
array = new Slot[] {s};
// update local block chain
validateAndUpdate(array, true);
+ } else if (array.length == 1) {
+ // in case we did push the slot BUT we failed to init it
+ validateAndUpdate(array, true);
} else {
throw new Error("Error on initialization");
}
validateAndUpdate(newslots, true);
}
- // public String toString() {
- // String retString = " Committed Table: \n";
- // retString += "---------------------------\n";
- // retString += commitedTable.toString();
+// public String toString() {
+// String retString = " Committed Table: \n";
+// retString += "---------------------------\n";
+// retString += commitedTable.toString();
- // retString += "\n\n";
+// retString += "\n\n";
- // retString += " Speculative Table: \n";
- // retString += "---------------------------\n";
- // retString += speculativeTable.toString();
+// retString += " Speculative Table: \n";
+// retString += "---------------------------\n";
+// retString += speculativeTable.toString();
- // return retString;
- // }
+// return retString;
+// }
+
+ public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) {
+ localCommunicationTable.put(arbitrator, new Pair<String, Integer>(hostName, portNumber));
+ }
public synchronized Long getArbitrator(IoTString key) {
return arbitratorTable.get(key);
}
+ public synchronized void close() {
+ cloud.close();
+ }
+
public synchronized IoTString getCommitted(IoTString key) {
KeyValue kv = committedKeyValueTable.get(key);
}
}
- public synchronized void update() {
+ public synchronized boolean update() {
try {
Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newSlots, false);
sendToServer(null);
+
+ return true;
} catch (Exception e) {
- e.printStackTrace();
+ // e.printStackTrace();
}
+
+ return false;
}
- public synchronized boolean createNewKey(IoTString keyName, long machineId) {
+ public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
while (true) {
if (arbitratorTable.get(keyName) != null) {
// There is already an arbitrator
}
}
- public void startTransaction() {
+ public synchronized void startTransaction() {
// Create a new transaction, invalidates any old pending transactions.
pendingTransactionBuilder = new PendingTransaction(localMachineId);
}
pendingTransactionBuilder = new PendingTransaction(localMachineId);
- sendToServer(null);
+ try {
+ sendToServer(null);
+ } catch (ServerException e) {
+
+ Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+ for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
+ Transaction transaction = iter.next();
+
+ if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) {
+ // Already contacted this client so ignore all attempts to contact this client
+ // to preserve ordering for arbitrator
+ continue;
+ }
+
+ Pair<Boolean, Boolean> sendReturn = sendTransactionToLocal(transaction);
+
+ if (sendReturn.getFirst()) {
+ // Failed to contact over local
+ arbitratorTriedAndFailed.add(transaction.getArbitrator());
+ } else {
+ // Successful contact or should not contact
+
+ if (sendReturn.getSecond()) {
+ // did arbitrate
+ iter.remove();
+ }
+ }
+ }
+ }
return transactionStatus;
}
bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower);
}
- private boolean sendToServer(NewKey newKey) {
+ private boolean sendToServer(NewKey newKey) throws ServerException {
try {
// While we have stuff that needs inserting into the block chain
- while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) {
-
- // try {
- // Thread.sleep(300);
- // } catch (Exception e) {
-
- // }
+ while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) {
// Create the slot
Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC());
}
// Try to send to the server
- Pair<Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize);
+ ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null);
+
+ // if (sendSlotsReturn.getSecond()) {
+ // System.out.println("Second was true");
+ // }
+
- if (sendSlotsReturn.getFirst()) {
+ if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) {
// Did insert into the block chain
- // New Key was successfully inserted into the block chain so dont want to insert it again
- newKey = null;
+ if (sendSlotsReturn.getFirst()) {
+ // This slot was what was inserted not a previous slot
+
+ // New Key was successfully inserted into the block chain so dont want to insert it again
+ newKey = null;
+ }
// Remove the aborts and commit parts that were sent from the pending to send queue
- pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete);
+ for (Iterator<ArbitrationRound> iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) {
+ ArbitrationRound round = iter.next();
+ round.removeParts(pendingSendArbitrationEntriesToDelete);
+
+ if (round.isDoneSending()) {
+ // Sent all the parts
+ iter.remove();
+ }
+ }
+
for (Transaction transaction : transactionPartsSent.keySet()) {
pendingSendArbitrationEntriesToDelete.clear();
transactionPartsSent.clear();
- if (sendSlotsReturn.getSecond().length != 0) {
+ if (sendSlotsReturn.getThird().length != 0) {
// insert into the local block chain
- validateAndUpdate(sendSlotsReturn.getSecond(), true);
+ validateAndUpdate(sendSlotsReturn.getThird(), true);
}
}
} catch (ServerException e) {
if (e.getType() != ServerException.TypeInputTimeout) {
- e.printStackTrace();
+ // e.printStackTrace();
// Nothing was able to be sent to the server so just clear these data structures
for (Transaction transaction : transactionPartsSent.keySet()) {
transaction.setSequenceNumber(-1);
}
}
-
- pendingSendArbitrationEntriesToDelete.clear();
- transactionPartsSent.clear();
} else {
// There was a partial send to the server
+ hadPartialSendToServer = true;
}
+
+ pendingSendArbitrationEntriesToDelete.clear();
+ transactionPartsSent.clear();
+
+ throw e;
}
return newKey == null;
}
- private Pair<Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize) throws ServerException {
+ private Pair<Boolean, Boolean> sendTransactionToLocal(Transaction transaction) {
+
+ // Get the devices local communications
+ Pair<String, Integer> localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator());
+
+ if (localCommunicationInformation == null) {
+ // Cant talk to that device locally so do nothing
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+
+ // Get the size of the send data
+ int sendDataSize = Integer.BYTES + Long.BYTES;
+ for (TransactionPart part : transaction.getParts().values()) {
+ sendDataSize += part.getSize();
+ }
+
+ Long lastArbitrationDataLocalSequenceNumber = (long) - 1;
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) {
+ lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator());
+ }
+
+ // Make the send data size
+ byte[] sendData = new byte[sendDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(sendData);
+
+ // Encode the data
+ bbEncode.putLong(lastArbitrationDataLocalSequenceNumber);
+ bbEncode.putInt(transaction.getParts().size());
+ for (TransactionPart part : transaction.getParts().values()) {
+ part.encode(bbEncode);
+ }
+
+
+
+
+
+
+
+
+
+
+
+
+ System.out.println("================================");
+ System.out.println("Sending Locally");
+ for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
+ System.out.println(kv);
+ }
+
+
+
+
+
+
+
+
+
+
+
+ // Send by local
+ byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond());
+
+
+ System.out.println("--------------------------------");
+ System.out.println();
+
+ if (returnData == null) {
+ // Could not contact server
+ return new Pair<Boolean, Boolean>(true, false);
+ }
+
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(returnData);
+ boolean didCommit = bbDecode.get() == 1;
+ boolean couldArbitrate = bbDecode.get() == 1;
+ int numberOfEntries = bbDecode.getInt();
+ boolean foundAbort = false;
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ byte type = bbDecode.get();
+ if (type == Entry.TypeAbort) {
+ Abort abort = (Abort)Abort.decode(null, bbDecode);
+
+ if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) {
+ foundAbort = true;
+ }
+
+ processEntry(abort);
+ } else if (type == Entry.TypeCommitPart) {
+ CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode);
+ processEntry(commitPart);
+ }
+ }
+
+ updateLiveStateFromLocal();
+
+ if (couldArbitrate) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (didCommit) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ } else {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ } else {
+ if (foundAbort) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ status.setStatus(TransactionStatus.StatusAborted);
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+ }
+
+ return new Pair<Boolean, Boolean>(false, true);
+ }
+
+ public synchronized byte[] acceptDataFromLocal(byte[] data) {
+ // Decode the data
+ ByteBuffer bbDecode = ByteBuffer.wrap(data);
+ long lastArbitratedSequenceNumberSeen = bbDecode.getLong();
+ int numberOfParts = bbDecode.getInt();
+
+ // If we did commit a transaction or not
+ boolean didCommit = false;
+ boolean couldArbitrate = false;
+
+ if (numberOfParts != 0) {
+
+ // decode the transaction
+ Transaction transaction = new Transaction();
+ for (int i = 0; i < numberOfParts; i++) {
+ bbDecode.get();
+ TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode);
+ transaction.addPartDecode(newPart);
+ }
+
+ // Arbitrate on transaction and pull relevant return data
+ Pair<Boolean, Boolean> localArbitrateReturn = arbitrateOnLocalTransaction(transaction);
+ couldArbitrate = localArbitrateReturn.getFirst();
+ didCommit = localArbitrateReturn.getSecond();
+
+ updateLiveStateFromLocal();
+
+ // Transaction was sent to the server so keep track of it to prevent double commit
+ if (transaction.getSequenceNumber() != -1) {
+ offlineTransactionsCommittedAndAtServer.add(transaction.getId());
+ }
+ }
+
+ // The data to send back
+ int returnDataSize = 0;
+ List<Entry> unseenArbitrations = new ArrayList<Entry>();
+
+ // Get the aborts to send back
+ List<Long> abortLocalSequenceNumbers = new ArrayList<Long >(liveAbortsGeneratedByLocal.keySet());
+ Collections.sort(abortLocalSequenceNumbers);
+ for (Long localSequenceNumber : abortLocalSequenceNumbers) {
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber);
+ unseenArbitrations.add(abort);
+ returnDataSize += abort.getSize();
+ }
+
+ // Get the commits to send back
+ Map<Long, Commit> commitForClientTable = liveCommitsTable.get(localMachineId);
+ if (commitForClientTable != null) {
+ List<Long> commitLocalSequenceNumbers = new ArrayList<Long>(commitForClientTable.keySet());
+ Collections.sort(commitLocalSequenceNumbers);
+
+ for (Long localSequenceNumber : commitLocalSequenceNumbers) {
+ Commit commit = commitForClientTable.get(localSequenceNumber);
+
+ if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) {
+ continue;
+ }
+
+ System.out.println("---");
+ for (KeyValue kv : commit.getKeyValueUpdateSet()) {
+ System.out.println("Sending Commit Locally: " + kv);
+ }
+ System.out.println("---");
+
+ unseenArbitrations.addAll(commit.getParts().values());
+
+ for (CommitPart commitPart : commit.getParts().values()) {
+ returnDataSize += commitPart.getSize();
+ }
+ }
+ }
+
+ // Number of arbitration entries to decode
+ returnDataSize += 2 * Integer.BYTES;
+
+ // Boolean of did commit or not
+ if (numberOfParts != 0) {
+ returnDataSize += Byte.BYTES;
+ }
+
+ // Data to send Back
+ byte[] returnData = new byte[returnDataSize];
+ ByteBuffer bbEncode = ByteBuffer.wrap(returnData);
+
+ if (numberOfParts != 0) {
+ if (didCommit) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ if (couldArbitrate) {
+ bbEncode.put((byte)1);
+ } else {
+ bbEncode.put((byte)0);
+ }
+ }
+
+ bbEncode.putInt(unseenArbitrations.size());
+ for (Entry entry : unseenArbitrations) {
+ entry.encode(bbEncode);
+ }
+
+ return returnData;
+ }
+
+ private ThreeTuple<Boolean, Boolean, Slot[]> sendSlotsToServer(Slot slot, int newSize, boolean isNewKey) throws ServerException {
- boolean inserted = true;
+ boolean attemptedToSendToServerTmp = attemptedToSendToServer;
+ attemptedToSendToServer = true;
+
+ boolean inserted = false;
+ boolean lastTryInserted = false;
Slot[] array = cloud.putSlot(slot, newSize);
if (array == null) {
array = new Slot[] {slot};
rejectedSlotList.clear();
+ inserted = true;
} else {
if (array.length == 0) {
throw new Error("Server Error: Did not send any slots");
}
- rejectedSlotList.add(slot.getSequenceNumber());
- inserted = false;
+
+ // if (attemptedToSendToServerTmp) {
+ if (hadPartialSendToServer) {
+
+ boolean isInserted = false;
+ for (Slot s : array) {
+ if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) {
+ isInserted = true;
+ break;
+ }
+ }
+
+ for (Slot s : array) {
+ if (isInserted) {
+ break;
+ }
+
+ // Process each entry in the slot
+ for (Entry entry : s.getEntries()) {
+
+ if (entry.getType() == Entry.TypeLastMessage) {
+ LastMessage lastMessage = (LastMessage)entry;
+
+ if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) {
+ isInserted = true;
+ break;
+ }
+ }
+ }
+ }
+
+ if (!isInserted) {
+ rejectedSlotList.add(slot.getSequenceNumber());
+ lastTryInserted = false;
+ } else {
+ lastTryInserted = true;
+ }
+ } else {
+ rejectedSlotList.add(slot.getSequenceNumber());
+ lastTryInserted = false;
+ }
}
- return new Pair<Boolean, Slot[]>(inserted, array);
+ return new ThreeTuple<Boolean, Boolean, Slot[]>(inserted, lastTryInserted, array);
}
/**
transactionPartsSent.clear();
pendingSendArbitrationEntriesToDelete.clear();
- // Insert pending arbitration data
- for (Entry arbitrationData : pendingSendArbitrationEntries) {
+ for (ArbitrationRound round : pendingSendArbitrationRounds) {
+ boolean isFull = false;
+ round.generateParts();
+ List<Entry> parts = round.getParts();
+
+ // Insert pending arbitration data
+ for (Entry arbitrationData : parts) {
+
+ // If it is an abort then we need to set some information
+ if (arbitrationData instanceof Abort) {
+ ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
+ }
+
+ if (!slot.hasSpace(arbitrationData)) {
+ // No space so cant do anything else with these data entries
+ isFull = true;
+ break;
+ }
- // If it is an abort then we need to set some information
- if (arbitrationData instanceof Abort) {
- ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber());
+ // Add to this current slot and add it to entries to delete
+ slot.addEntry(arbitrationData);
+ pendingSendArbitrationEntriesToDelete.add(arbitrationData);
}
- if (!slot.hasSpace(arbitrationData)) {
- // No space so cant do anything else with these data entries
+ if (isFull) {
break;
}
-
- // Add to this current slot and add it to entries to delete
- slot.addEntry(arbitrationData);
- pendingSendArbitrationEntriesToDelete.add(arbitrationData);
}
// Insert as many transactions as possible while keeping order
sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber();
updateLiveStateFromServer();
+
+ // No Need to remember after we pulled from the server
+ offlineTransactionsCommittedAndAtServer.clear();
+
+ // This is invalidated now
+ hadPartialSendToServer = false;
}
private void updateLiveStateFromServer() {
newTransactionParts.clear();
}
- public void arbitrateFromServer() {
+ private void arbitrateFromServer() {
if (liveTransactionBySequenceNumberTable.size() == 0) {
// Nothing to arbitrate on so move on
// The last transaction arbitrated on
long lastTransactionCommitted = -1;
+ Set<Abort> generatedAborts = new HashSet<Abort>();
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
continue;
}
+ if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) {
+ // We have seen this already locally so dont commit again
+ continue;
+ }
+
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
break;
}
+ // update the largest transaction seen by arbitrator from server
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) {
+ lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
+ } else {
+ Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId());
+ if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) {
+ lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber());
+ }
+ }
if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) {
// Guard evaluated as true
transaction.getClientLocalSequenceNumber(),
transaction.getSequenceNumber(),
transaction.getMachineId(),
- transaction.getArbitrator());
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
- // Add the abort to the queue of aborts to send out
- pendingSendArbitrationEntries.add(newAbort);
+ generatedAborts.add(newAbort);
// Insert the abort so we can process
processEntry(newAbort);
}
}
+ Commit newCommit = null;
+
// If there is something to commit
if (speculativeTableTmp.size() != 0) {
// Create the commit and increment the commit sequence number
- Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted);
- localCommitSequenceNumber++;
+ newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted);
+ localArbitrationSequenceNumber++;
// Add all the new keys to the commit
for (KeyValue kv : speculativeTableTmp.values()) {
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
// Insert the commit so we can process it
for (CommitPart commitPart : newCommit.getParts().values()) {
processEntry(commitPart);
}
}
+
+
+ if ((newCommit != null) || (generatedAborts.size() > 0)) {
+ ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts);
+ pendingSendArbitrationRounds.add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ }
+ }
}
- public void arbitrateOnLocalTransaction(Transaction transaction) {
+ private Pair<Boolean, Boolean> arbitrateOnLocalTransaction(Transaction transaction) {
// Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction
if (transaction.getArbitrator() != localMachineId) {
- return;
+ return new Pair<Boolean, Boolean>(false, false);
}
if (!transaction.isComplete()) {
// Will arbitrate in incorrect order if we continue so just break
// Most likely this
- return;
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+
+ if (transaction.getMachineId() != localMachineId) {
+ // dont do this check for local transactions
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) {
+ if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) {
+ // We've have already seen this from the server
+
+ System.out.println("Local Arbitrate Seen Already from server, rejected");
+ return new Pair<Boolean, Boolean>(false, false);
+ }
+ }
}
if (transaction.evaluateGuard(committedKeyValueTable, null, null)) {
// Guard evaluated as true
// Create the commit and increment the commit sequence number
- Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1);
- localCommitSequenceNumber++;
+ Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1);
+ localArbitrationSequenceNumber++;
// Update the local changes so we can make the commit
for (KeyValue kv : transaction.getKeyValueUpdateSet()) {
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- pendingSendArbitrationEntries.addAll(newCommit.getParts().values());
+ ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+ pendingSendArbitrationRounds.add(arbitrationRound);
- // Insert the commit so we can process it
- for (CommitPart commitPart : newCommit.getParts().values()) {
- processEntry(commitPart);
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ } else {
+ // Insert the commit so we can process it
+ for (CommitPart commitPart : newCommit.getParts().values()) {
+ processEntry(commitPart);
+ }
}
- TransactionStatus status = transaction.getTransactionStatus();
- status.setStatus(TransactionStatus.StatusCommitted);
+ if (transaction.getMachineId() == localMachineId) {
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusCommitted);
+ }
+ }
+ updateLiveStateFromLocal();
+ return new Pair<Boolean, Boolean>(true, true);
} else {
- // Guard evaluated was false so create abort
- TransactionStatus status = transaction.getTransactionStatus();
- status.setStatus(TransactionStatus.StatusAborted);
+
+ if (transaction.getMachineId() == localMachineId) {
+ // For locally created messages update the status
+
+ // Guard evaluated was false so create abort
+ TransactionStatus status = transaction.getTransactionStatus();
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ } else {
+
+ Set addAbortSet = new HashSet<Abort>();
+
+
+ // Create the abort
+ Abort newAbort = new Abort(null,
+ transaction.getClientLocalSequenceNumber(),
+ -1,
+ transaction.getMachineId(),
+ transaction.getArbitrator(),
+ localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+
+ addAbortSet.add(newAbort);
+
+
+ // Append all the commit parts to the end of the pending queue waiting for sending to the server
+ ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet);
+ pendingSendArbitrationRounds.add(arbitrationRound);
+
+ if (compactArbitrationData()) {
+ ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) {
+ processEntry(commitPart);
+ }
+ }
+ }
+
+ updateLiveStateFromLocal();
+ return new Pair<Boolean, Boolean>(true, false);
+ }
+ }
+
+ /**
+ * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates
+ */
+ private boolean compactArbitrationData() {
+
+
+ if (pendingSendArbitrationRounds.size() < 2) {
+ // Nothing to compact so do nothing
+ return false;
+ }
+
+ ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1);
+ if (lastRound.didSendPart()) {
+ return false;
+ }
+
+ boolean hadCommit = (lastRound.getCommit() == null);
+ boolean gotNewCommit = false;
+
+ int numberToDelete = 1;
+ while (numberToDelete < pendingSendArbitrationRounds.size()) {
+ ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1);
+
+ if (round.isFull() || round.didSendPart()) {
+ // Stop since there is a part that cannot be compacted and we need to compact in order
+ break;
+ }
+
+ if (round.getCommit() == null) {
+
+ // Try compacting aborts only
+ int newSize = round.getCurrentSize() + lastRound.getAbortsCount();
+ if (newSize > ArbitrationRound.MAX_PARTS) {
+ // Cant compact since it would be too large
+ break;
+ }
+ lastRound.addAborts(round.getAborts());
+ } else {
+
+ // Create a new larger commit
+ Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber);
+ localArbitrationSequenceNumber++;
+
+ // Create the commit parts so that we can count them
+ newCommit.createCommitParts();
+
+ // Calculate the new size of the parts
+ int newSize = newCommit.getNumberOfParts();
+ newSize += lastRound.getAbortsCount();
+ newSize += round.getAbortsCount();
+
+ if (newSize > ArbitrationRound.MAX_PARTS) {
+ // Cant compact since it would be too large
+ break;
+ }
+
+ // Set the new compacted part
+ lastRound.setCommit(newCommit);
+ lastRound.addAborts(round.getAborts());
+ gotNewCommit = true;
+ }
+
+ numberToDelete++;
+ }
+
+ if (numberToDelete != 1) {
+ // If there is a compaction
+
+ // Delete the previous pieces that are now in the new compacted piece
+ if (numberToDelete == pendingSendArbitrationRounds.size()) {
+ pendingSendArbitrationRounds.clear();
+ } else {
+ for (int i = 0; i < numberToDelete; i++) {
+ pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1);
+ }
+ }
+
+ // Add the new compacted into the pending to send list
+ pendingSendArbitrationRounds.add(lastRound);
+
+ // Should reinsert into the commit processor
+ if (hadCommit && gotNewCommit) {
+ return true;
+ }
}
+
+ return false;
}
/**
lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId());
}
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) {
-
-
-
+ long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId());
+ if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
+ }
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber());
+ }
// We have already seen this commit before so need to do the full processing on this commit
if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) {
continue;
}
-
// If we got here then this is a brand new commit and needs full processing
// Get what commits should be edited, these are the commits that have live values for their keys
}
// Update the last seen sequence number from this arbitrator
- lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
+ if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) {
+ if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) {
+ lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber());
+ }
+ }
+
+
// Update the last transaction that was updated if we can
if (commit.getTransactionSequenceNumber() != -1) {
// We processed a new commit that we havent seen before
didProcessANewCommit = true;
+
+
+ System.out.println("============");
// Update the committed table of keys and which commit is using which key
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
+ System.out.println("Committing: " + kv);
committedKeyValueTable.put(kv.getKey(), kv);
liveCommitsByKeyTable.put(kv.getKey(), commit);
}
+ System.out.println("--------------");
+ System.out.println();
}
}
}
}
-
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version
}
+ if (entry.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry);
+ }
+
if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) {
// The machine already saw this so it is dead
entry.setDead();
- liveAbortTable.remove(entry);
+ liveAbortTable.remove(entry.getAbortId());
+
+ if (entry.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber());
+ }
+
return;
}
- // update the transaction status
- TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
- if (status != null) {
- status.setStatus(TransactionStatus.StatusAborted);
+ if (entry.getTransactionSequenceNumber() != -1) {
+ // update the transaction status if it was sent to the server
+ TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber());
+ if (status != null) {
+ status.setStatus(TransactionStatus.StatusAborted);
+ }
+ }
+
+ // Update the last arbitration data that we have seen so far
+ if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) {
+
+ long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator());
+ if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) {
+ // Is larger
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
+ }
+
+ } else {
+ // Never seen any data from this arbitrator so record the first one
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber());
}
Map<Pair<Long, Integer>, CommitPart> commitPart = newCommitParts.get(entry.getMachineId());
if (commitPart == null) {
- // Dont have a table for this machine Id yet so make one
+ // Don't have a table for this machine Id yet so make one
commitPart = new HashMap<Pair<Long, Integer>, CommitPart>();
newCommitParts.put(entry.getMachineId(), commitPart);
}
if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) {
abort.setDead();
i.remove();
+
+ if (abort.getTransactionArbitrator() == localMachineId) {
+ liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber());
+ }
}
}
// Make sure the server is not playing any games
if (machineId == localMachineId) {
- // We were not making any updates and we had a machine mismatch
- if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
- throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqNum + " got: " + lastMessageSeqNum);
+ if (hadPartialSendToServer) {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum);
+ }
+
+ } else {
+ // We were not making any updates and we had a machine mismatch
+ if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) {
+ throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum);
+ }
}
} else {
if (lastMessageSeqNum > seqNum) {
test6();
} else if (args[0].equals("7")) {
test7();
+ } else if (args[0].equals("8")) {
+ test8();
}
}
+ static void test8() {
+
+ boolean foundError = false;
+ List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
+
+ // Setup the 2 clients
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000);
+
+ System.out.println("Init Table t1s");
+
+ while (true) {
+ try {
+ System.out.println("-==-=-=-=-=-=-=-==-=-");
+ t1.initTable();
+ break;
+ } catch (Exception e) {}
+ }
+
+
+ System.out.println("Update Table t2");
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001);
+ while (t2.update() == false) {}
+
+ t1.addLocalCommunication(351, "127.0.0.1", 6001);
+ t2.addLocalCommunication(321, "127.0.0.1", 6000);
+
+ // Make the Keys
+ System.out.println("Setting up keys");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+
+ String a = "a" + i;
+ String b = "b" + i;
+ String c = "c" + i;
+ String d = "d" + i;
+ IoTString ia = new IoTString(a);
+ IoTString ib = new IoTString(b);
+ IoTString ic = new IoTString(c);
+ IoTString id = new IoTString(d);
+
+ while (true) {
+ try {
+ t1.createNewKey(ia, 321);
+ break;
+ } catch (Exception e) { }
+ }
+
+ while (true) {
+ try {
+ t1.createNewKey(ib, 351);
+ break;
+ } catch (Exception e) { }
+ }
+
+ while (true) {
+ try {
+ t2.createNewKey(ic, 321);
+ break;
+ } catch (Exception e) { }
+ }
+
+ while (true) {
+ try {
+ t2.createNewKey(id, 351);
+ break;
+ } catch (Exception e) { }
+ }
+ }
+
+ // Do Updates for the keys
+ System.out.println("Setting Key-Values...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
+ String keyA = "a" + i;
+ String keyB = "b" + i;
+ String keyC = "c" + i;
+ String keyD = "d" + i;
+ String valueA = "a" + i;
+ String valueB = "b" + i;
+ String valueC = "c" + i;
+ String valueD = "d" + i;
+
+ IoTString iKeyA = new IoTString(keyA);
+ IoTString iKeyB = new IoTString(keyB);
+ IoTString iKeyC = new IoTString(keyC);
+ IoTString iKeyD = new IoTString(keyD);
+ IoTString iValueA = new IoTString(valueA);
+ IoTString iValueB = new IoTString(valueB);
+ IoTString iValueC = new IoTString(valueC);
+ IoTString iValueD = new IoTString(valueD);
+
+
+ String keyAPrev = "a" + (i - 1);
+ String keyBPrev = "b" + (i - 1);
+ String keyCPrev = "c" + (i - 1);
+ String keyDPrev = "d" + (i - 1);
+ String valueAPrev = "a" + (i - 1);
+ String valueBPrev = "b" + (i - 1);
+ String valueCPrev = "c" + (i - 1);
+ String valueDPrev = "d" + (i - 1);
+
+ IoTString iKeyAPrev = new IoTString(keyAPrev);
+ IoTString iKeyBPrev = new IoTString(keyBPrev);
+ IoTString iKeyCPrev = new IoTString(keyCPrev);
+ IoTString iKeyDPrev = new IoTString(keyDPrev);
+ IoTString iValueAPrev = new IoTString(valueAPrev);
+ IoTString iValueBPrev = new IoTString(valueBPrev);
+ IoTString iValueCPrev = new IoTString(valueCPrev);
+ IoTString iValueDPrev = new IoTString(valueDPrev);
+
+ t1.startTransaction();
+ t1.addKV(iKeyA, iValueA);
+ transStatusList.add(t1.commitTransaction());
+
+ t1.startTransaction();
+ t1.addKV(iKeyB, iValueB);
+ transStatusList.add(t1.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyC, iValueC);
+ transStatusList.add(t2.commitTransaction());
+
+ t2.startTransaction();
+ t2.addKV(iKeyD, iValueD);
+ transStatusList.add(t2.commitTransaction());
+ }
+
+ System.out.println("Updating...");
+ while (t1.update() == false) {}
+ while (t2.update() == false) {}
+ while (t1.update() == false) {}
+
+ System.out.println("Checking Key-Values...");
+ for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+
+ String keyA = "a" + i;
+ String keyB = "b" + i;
+ String keyC = "c" + i;
+ String keyD = "d" + i;
+ String valueA = "a" + i;
+ String valueB = "b" + i;
+ String valueC = "c" + i;
+ String valueD = "d" + i;
+
+ IoTString iKeyA = new IoTString(keyA);
+ IoTString iKeyB = new IoTString(keyB);
+ IoTString iKeyC = new IoTString(keyC);
+ IoTString iKeyD = new IoTString(keyD);
+ IoTString iValueA = new IoTString(valueA);
+ IoTString iValueB = new IoTString(valueB);
+ IoTString iValueC = new IoTString(valueC);
+ IoTString iValueD = new IoTString(valueD);
+
+
+ IoTString testValA1 = t1.getCommitted(iKeyA);
+ IoTString testValB1 = t1.getCommitted(iKeyB);
+ IoTString testValC1 = t1.getCommitted(iKeyC);
+ IoTString testValD1 = t1.getCommitted(iKeyD);
+
+ IoTString testValA2 = t2.getCommitted(iKeyA);
+ IoTString testValB2 = t2.getCommitted(iKeyB);
+ IoTString testValC2 = t2.getCommitted(iKeyC);
+ IoTString testValD2 = t2.getCommitted(iKeyD);
+
+ if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) {
+ System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1);
+ foundError = true;
+ }
+
+ if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) {
+ System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1);
+ foundError = true;
+ }
+
+ if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) {
+ System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1);
+ foundError = true;
+ }
+
+ if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) {
+ System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1);
+ foundError = true;
+ }
+
+
+ if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) {
+ System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2);
+ foundError = true;
+ }
+
+ if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) {
+ System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2);
+ foundError = true;
+ }
+
+ if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) {
+ System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2);
+ foundError = true;
+ }
+
+ if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) {
+ System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2);
+ foundError = true;
+ }
+ }
+
+ for (TransactionStatus status : transStatusList) {
+ if (status.getStatus() != TransactionStatus.StatusCommitted) {
+ foundError = true;
+ }
+ }
+
+ if (foundError) {
+ System.out.println("Found Errors...");
+ } else {
+ System.out.println("No Errors Found...");
+ }
+
+ t1.close();
+ t2.close();
+ }
+
static void test7() throws ServerException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
// Make the Keys
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
// Make the Keys
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
// Make the Keys
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
// Setup the 2 clients
- Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1);
t1.initTable();
System.out.println("T1 Ready");
- Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1);
t2.update();
System.out.println("T2 Ready");
System.out.println("Setting up keys");
startTime = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
String a = "a" + i;
String b = "b" + i;
String c = "c" + i;
System.out.println("Setting Key-Values...");
startTime = System.currentTimeMillis();
for (int i = 0; i < NUMBER_OF_TESTS; i++) {
+ System.out.println(i);
String keyA = "a" + i;
String keyB = "b" + i;
String keyC = "c" + i;
IoTString iValueC = new IoTString(valueC);
IoTString iValueD = new IoTString(valueD);
+
+ System.out.println("===============================================================================");
+ System.out.println("AAAAAAAA");
+ System.out.println("===============================================================================");
t1.startTransaction();
t1.addKV(iKeyA, iValueA);
transStatusList.add(t1.commitTransaction());
+ System.out.println();
+ System.out.println("===============================================================================");
+ System.out.println("BBBBBBB");
+ System.out.println("===============================================================================");
t1.startTransaction();
t1.addKV(iKeyB, iValueB);
transStatusList.add(t1.commitTransaction());
+ System.out.println();
+ System.out.println("===============================================================================");
+ System.out.println("CCCCCCC");
+ System.out.println("===============================================================================");
t2.startTransaction();
t2.addKV(iKeyC, iValueC);
transStatusList.add(t2.commitTransaction());
+ System.out.println();
+ System.out.println("===============================================================================");
+ System.out.println("DDDDDDDDDD");
+ System.out.println("===============================================================================");
t2.startTransaction();
t2.addKV(iKeyD, iValueD);
transStatusList.add(t2.commitTransaction());
+ System.out.println();
+
}
endTime = System.currentTimeMillis();
System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) );
} else {
System.out.println("No Errors Found...");
}
+
+ System.out.println();
+ System.out.println();
+ t1.printSlots();
+
+ System.out.println();
+ System.out.println();
+ t2.printSlots();
}
}
return parts;
}
-
public boolean didSendAPartToServer() {
return didSendAPartToServer;
}
return partsPendingSend.isEmpty();
}
-
public Set<KeyValue> getKeyValueUpdateSet() {
return keyValueUpdateSet;
}
using namespace std;
-const char * query_str="QUERY_STRING";
-const char * uri_str="REQUEST_URI";
-const char * method_str="REQUEST_METHOD";
-const char * iotcloudroot_str="IOTCLOUD_ROOT";
-const char * length_str="CONTENT_LENGTH";
+const char * query_str = "QUERY_STRING";
+const char * uri_str = "REQUEST_URI";
+const char * method_str = "REQUEST_METHOD";
+const char * iotcloudroot_str = "IOTCLOUD_ROOT";
+const char * length_str = "CONTENT_LENGTH";
IoTQuery::IoTQuery(FCGX_Request *request) :
request(request),
bool IoTQuery::checkDirectory() {
struct stat s;
- int err=stat(directory, &s);
+ int err = stat(directory, &s);
if (-1 == err)
return false;
return S_ISDIR(s.st_mode);
*/
void IoTQuery::decodeQuery() {
- int len=strlen(query);
- char * str=new char[len+1];
- memcpy(str, query, len+1);
- char *tok_ptr=str;
-
+ int len = strlen(query);
+ char * str = new char[len + 1];
+ memcpy(str, query, len + 1);
+ char *tok_ptr = str;
+
/* Parse commands */
- char *command=strsep(&tok_ptr, "&");
+ char *command = strsep(&tok_ptr, "&");
if (strncmp(command, "req=putslot", 11) == 0)
reqPutSlot = true;
else if (strncmp(command, "req=getslot", 11) == 0)
/* Load Sequence Number for request */
char *sequencenumber_str = strsep(&tok_ptr, "&");
if (sequencenumber_str != NULL &&
- strncmp(sequencenumber_str, "seq=", 4) == 0) {
+ strncmp(sequencenumber_str, "seq=", 4) == 0) {
sequencenumber_str = strchr(sequencenumber_str, '=');
if (sequencenumber_str != NULL) {
requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10);
/* Update size if we get request */
char * numqueueentries_str = tok_ptr;
if (numqueueentries_str != NULL &&
- strncmp(numqueueentries_str, "max=", 4) == 0) {
+ strncmp(numqueueentries_str, "max=", 4) == 0) {
numqueueentries_str = strchr(numqueueentries_str, '=') + 1;
numqueueentries = strtoll(numqueueentries_str, NULL, 10);
}
*/
void doWrite(int fd, char *data, long long length) {
- long long offset=0;
+ long long offset = 0;
do {
- long long byteswritten=write(fd, &data[offset], length);
+ long long byteswritten = write(fd, &data[offset], length);
if (byteswritten > 0) {
length -= byteswritten;
offset += byteswritten;
}
return;
}
- } while(length != 0);
+ } while (length != 0);
}
/** Helper function to read data from file. */
bool doRead(int fd, void *buf, int numbytes) {
- int offset=0;
- char *ptr=(char *)buf;
+ int offset = 0;
+ char *ptr = (char *)buf;
do {
- int bytesread=read(fd, ptr+offset, numbytes);
+ int bytesread = read(fd, ptr + offset, numbytes);
if (bytesread > 0) {
offset += bytesread;
numbytes -= bytesread;
} else
return false;
- } while (numbytes!=0);
+ } while (numbytes != 0);
return true;
}
*/
void IoTQuery::getSlot() {
- int numrequeststosend = (int)((newestentry-requestsequencenumber)+1);
+ int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1);
if (numrequeststosend < 0)
numrequeststosend = 0;
long long numbytes = 0;
int filesizes[numrequeststosend];
int fdarray[numrequeststosend];
- int index=0;
- for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
+ int index = 0;
+ for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) {
struct stat st;
- char *filename=getSlotFileName(seqn);
+ char *filename = getSlotFileName(seqn);
if (stat(filename, &st) == 0) {
- fdarray[index]=open(filename, O_RDONLY);
- filesizes[index]=st.st_size;
- numbytes+=filesizes[index];
+ fdarray[index] = open(filename, O_RDONLY);
+ filesizes[index] = st.st_size;
+ numbytes += filesizes[index];
} else {
- fdarray[index]=-1;
- filesizes[index]=0;
+ fdarray[index] = -1;
+ filesizes[index] = 0;
}
delete filename;
}
- const char header[]="getslot";
+ const char header[] = "getslot";
/* Size is the header + the payload + space for number of requests
plus sizes of each slot */
- long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes;
+ long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes;
char * response = new char[size];
- long long offset=0;
- memcpy(response, header, sizeof(header)-1);
- offset+=sizeof(header)-1;
- int numreq=htonl(numrequeststosend);
+ long long offset = 0;
+ memcpy(response, header, sizeof(header) - 1);
+ offset += sizeof(header) - 1;
+ int numreq = htonl(numrequeststosend);
memcpy(response + offset, &numreq, sizeof(numreq));
- offset+=sizeof(numrequeststosend);
- for(int i=0; i<numrequeststosend; i++) {
- int filesize=htonl(filesizes[i]);
+ offset += sizeof(numrequeststosend);
+ for (int i = 0; i < numrequeststosend; i++) {
+ int filesize = htonl(filesizes[i]);
memcpy(response + offset, &filesize, sizeof(filesize));
- offset+=sizeof(int);
+ offset += sizeof(int);
}
/* Read the file data into the buffer */
- for(int i=0; i<numrequeststosend; i++) {
- if (fdarray[i]>=0) {
- doRead(fdarray[i], response+offset, filesizes[i]);
- offset+=filesizes[i];
+ for (int i = 0; i < numrequeststosend; i++) {
+ if (fdarray[i] >= 0) {
+ doRead(fdarray[i], response + offset, filesizes[i]);
+ offset += filesizes[i];
}
}
/* Delete the response buffer and close the files. */
delete response;
- for(int i=0; i<numrequeststosend; i++) {
+ for (int i = 0; i < numrequeststosend; i++) {
if (fdarray[i] >= 0)
close(fdarray[i]);
}
void IoTQuery::setSalt() {
/* Write the slot data we received to a SLOT file */
char *filename = getSaltFileName();
- int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
- doWrite(saltfd, data, length);
- char response[0];
- sendResponse(response, 0);
- close(saltfd);
+ char * response = new char[1];
+
+ if (access(filename, F_OK) == 0)
+ {
+ /* Already Exists */
+ response[0] = 1;
+ }
+ else
+ {
+ /* Does not exist so create it */
+ int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
+ doWrite(saltfd, data, length);
+ close(saltfd);
+ response[0] = 0;
+ }
+
+
+ sendResponse(response, 1);
+
delete filename;
+ delete response;
}
/**
int filesize = 0;
struct stat st;
if (stat(filename, &st) == 0) {
- filesize=st.st_size;
+ filesize = st.st_size;
} else {
delete filename;
return;
int saltfd = open(filename, O_RDONLY);
int responsesize = filesize + sizeof(int);
char * response = new char[responsesize];
- doRead(saltfd, response+ sizeof(int), filesize);
- int n_filesize=htonl(filesize);
+ doRead(saltfd, response + sizeof(int), filesize);
+ int n_filesize = htonl(filesize);
*((int*) response) = n_filesize;
sendResponse(response, responsesize);
close(saltfd);
void IoTQuery::putSlot() {
/* Check if the request is stale and send update in that case. This
servers as an implicit failure of the request. */
- if (requestsequencenumber!=(newestentry+1)) {
+ if (requestsequencenumber != (newestentry + 1)) {
getSlot();
return;
}
/* See if we have too many slots and if so, delete the old one */
- int numberofliveslots=(int) ((newestentry-oldestentry)+1);
+ int numberofliveslots = (int) ((newestentry - oldestentry) + 1);
if (numberofliveslots >= numqueueentries) {
removeOldestSlot();
}
/* Write the slot data we received to a SLOT file */
char *filename = getSlotFileName(requestsequencenumber);
- int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR);
+ int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR);
doWrite(slotfd, data, length);
close(slotfd);
delete filename;
updateStatusFile();
/* Send response acknowledging success */
- char command[]="putslot";
- sendResponse(command, sizeof(command)-1);
+ char command[] = "putslot";
+ sendResponse(command, sizeof(command) - 1);
}
/**
void IoTQuery::sendResponse(char * bytes, int len) {
cout << "Accept-Ranges: bytes\r\n"
- << "Content-Length: " << len << "\r\n"
- << "\r\n";
+ << "Content-Length: " << len << "\r\n"
+ << "\r\n";
cout.write(bytes, len);
}
*/
char * IoTQuery::getSlotFileName(long long seqnum) {
- int directorylen=strlen(directory);
+ int directorylen = strlen(directory);
/* Size is 19 digits for ASCII representation of a long + 4
characters for SLOT string + 1 character for null termination +
directory size*/
- char * filename=new char[25+directorylen];
- snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum);
+ char * filename = new char[25 + directorylen];
+ snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum);
return filename;
}
*/
char * IoTQuery::getSaltFileName() {
- int directorylen=strlen(directory);
+ int directorylen = strlen(directory);
/* Size is 4 characters for SALT string + 1 character for null
termination + directory size*/
- char * filename=new char[6+directorylen];
- snprintf(filename, 6+directorylen, "%s/SALT", directory);
+ char * filename = new char[6 + directorylen];
+ snprintf(filename, 6 + directorylen, "%s/SALT", directory);
return filename;
}
*/
void IoTQuery::removeOldestSlot() {
- if (oldestentry!=0) {
- char * filename=getSlotFileName(oldestentry);
+ if (oldestentry != 0) {
+ char * filename = getSlotFileName(oldestentry);
unlink(filename);
delete filename;
}
void IoTQuery::processQuery() {
getQuery();
getDirectory();
- readData();
+ if (!readData())
+ {
+ return;
+ }
+
/* Verify that we receive a post request. */
if (strncmp(method, "POST", 4) != 0) {
/* Make sure the directory is okay. */
if (directory == NULL ||
- !checkDirectory()) {
+ !checkDirectory()) {
cerr << "Directory " << directory << " does not exist" << endl;
return;
}
/* Decode query. */
decodeQuery();
-
+
/* Handle request. */
if (reqGetSlot)
getSlot();
* inserted.
*/
-void IoTQuery::readData() {
+bool IoTQuery::readData() {
if (length) {
- data = new char[length+1];
- memset(data, 0, length+1);
+ data = new char[length + 1];
+ memset(data, 0, length + 1);
cin.read(data, length);
}
do {
char dummy;
cin >> dummy;
} while (!cin.eof());
+
+ if (length)
+ {
+ if (cin.fail())
+ {
+ return false;
+ }
+ }
+
+ return true;
}
/** We require the content-length header to be sent. */
char * reqlength = FCGX_GetParam(length_str, request->envp);
if (reqlength) {
- length=strtoll(reqlength, NULL, 10);
+ length = strtoll(reqlength, NULL, 10);
} else {
- length=0;
+ length = 0;
}
}
char * split = strchr((char *)uri, '?');
if (split == NULL)
return;
- int split_len = (int) (split-uri);
+ int split_len = (int) (split - uri);
int rootdir_len = strlen(iotcloudroot);
int directory_len = split_len + rootdir_len + 1;
directory = new char[directory_len];
memcpy(directory, iotcloudroot, rootdir_len);
memcpy(directory + rootdir_len, uri, split_len);
- directory[directory_len-1]=0;
+ directory[directory_len - 1] = 0;
}
/**
int doread(int fd, void *ptr, size_t count, off_t offset) {
do {
- size_t bytesread=pread(fd, ptr, count, offset);
- if (bytesread==count) {
+ size_t bytesread = pread(fd, ptr, count, offset);
+ if (bytesread == count) {
return 1;
- } else if (bytesread==0) {
+ } else if (bytesread == 0) {
return 0;
}
- } while(1);
+ } while (1);
}
*/
bool IoTQuery::openStatusFile() {
- char statusfile[]="queuestatus";
- int len=strlen(directory);
+ char statusfile[] = "queuestatus";
+ int len = strlen(directory);
- char * filename=new char[len+sizeof(statusfile)+2];
+ char * filename = new char[len + sizeof(statusfile) + 2];
memcpy(filename, directory, len);
- filename[len]='/';
- memcpy(filename+len+1, statusfile, sizeof(statusfile));
- filename[len+sizeof(statusfile)+1]=0;
- fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR);
+ filename[len] = '/';
+ memcpy(filename + len + 1, statusfile, sizeof(statusfile));
+ filename[len + sizeof(statusfile) + 1] = 0;
+ fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
delete filename;
if (fd < 0) {
/* Read in queue size, oldest sequence number, and newest sequence number. */
int size;
- int needwrite=0;
+ int needwrite = 0;
if (doread(fd, &size, sizeof(size), OFFSET_MAX))
- numqueueentries=size;
+ numqueueentries = size;
else
- needwrite=1;
+ needwrite = 1;
long long entry;
if (doread(fd, &entry, sizeof(entry), OFFSET_OLD))
- oldestentry=entry;
+ oldestentry = entry;
else
- needwrite=1;
+ needwrite = 1;
if (doread(fd, &entry, sizeof(entry), OFFSET_NEW))
- newestentry=entry;
+ newestentry = entry;
else
- needwrite=1;
+ needwrite = 1;
if (needwrite)
updateStatusFile();
void sendResponse(char *data, int length);
void getQuery();
void getDirectory();
- void readData();
+ bool readData();
bool checkDirectory();
bool openStatusFile();
void updateStatusFile();