From 687b0282b4f0810b78de247d3f99b2ace038bf25 Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Thu, 12 Jan 2017 00:13:10 -0800 Subject: [PATCH] Local communication support --- version2/src/java/iotcloud/Abort.java | 24 +- .../src/java/iotcloud/ArbitrationRound.java | 110 +++ version2/src/java/iotcloud/CloudComm.java | 245 ++++- version2/src/java/iotcloud/Commit.java | 41 +- version2/src/java/iotcloud/CommitPart.java | 4 - version2/src/java/iotcloud/Table.java | 854 ++++++++++++++++-- version2/src/java/iotcloud/Test.java | 276 +++++- version2/src/java/iotcloud/Transaction.java | 2 - version2/src/server/iotquery.cpp | 221 +++-- version2/src/server/iotquery.h | 2 +- 10 files changed, 1537 insertions(+), 242 deletions(-) create mode 100644 version2/src/java/iotcloud/ArbitrationRound.java diff --git a/version2/src/java/iotcloud/Abort.java b/version2/src/java/iotcloud/Abort.java index 6231292..0579d44 100644 --- a/version2/src/java/iotcloud/Abort.java +++ b/version2/src/java/iotcloud/Abort.java @@ -9,32 +9,36 @@ import java.nio.ByteBuffer; */ -class Abort extends Entry{ +class Abort extends Entry { private long transactionClientLocalSequenceNumber = -1; private long transactionSequenceNumber = -1; private long sequenceNumber = -1; private long transactionMachineId = -1; private long transactionArbitrator = -1; + private long arbitratorLocalSequenceNumber = -1; private Pair abortId = null; - public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber , long _transactionMachineId, long _transactionArbitrator) { + public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber , long _transactionMachineId, long _transactionArbitrator, long _arbitratorLocalSequenceNumber) { super(slot); transactionClientLocalSequenceNumber = _transactionClientLocalSequenceNumber; transactionSequenceNumber = _transactionSequenceNumber; transactionMachineId = _transactionMachineId; transactionArbitrator = _transactionArbitrator; + arbitratorLocalSequenceNumber = _arbitratorLocalSequenceNumber; abortId = new Pair(transactionMachineId, transactionClientLocalSequenceNumber); } - public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber, long _sequenceNumber , long _transactionMachineId, long _transactionArbitrator) { + public Abort(Slot slot, long _transactionClientLocalSequenceNumber, long _transactionSequenceNumber, long _sequenceNumber , long _transactionMachineId, long _transactionArbitrator, long _arbitratorLocalSequenceNumber) { super(slot); transactionClientLocalSequenceNumber = _transactionClientLocalSequenceNumber; transactionSequenceNumber = _transactionSequenceNumber; sequenceNumber = _sequenceNumber; transactionMachineId = _transactionMachineId; transactionArbitrator = _transactionArbitrator; + arbitratorLocalSequenceNumber = _arbitratorLocalSequenceNumber; + abortId = new Pair(transactionMachineId, transactionClientLocalSequenceNumber); } @@ -54,6 +58,11 @@ class Abort extends Entry{ return transactionClientLocalSequenceNumber; } + public long getArbitratorLocalSequenceNumber() { + return arbitratorLocalSequenceNumber; + } + + public void setSlot(Slot s) { parentslot = s; } @@ -77,7 +86,9 @@ class Abort extends Entry{ long sequenceNumber = bb.getLong(); long transactionMachineId = bb.getLong(); long transactionArbitrator = bb.getLong(); - return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator); + long arbitratorLocalSequenceNumber = bb.getLong(); + + return new Abort(slot, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); } public void encode(ByteBuffer bb) { @@ -87,10 +98,11 @@ class Abort extends Entry{ bb.putLong(sequenceNumber); bb.putLong(transactionMachineId); bb.putLong(transactionArbitrator); + bb.putLong(arbitratorLocalSequenceNumber); } public int getSize() { - return (4 * Long.BYTES) + Byte.BYTES; + return (6 * Long.BYTES) + Byte.BYTES; } public byte getType() { @@ -98,6 +110,6 @@ class Abort extends Entry{ } public Entry getCopy(Slot s) { - return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator); + return new Abort(s, transactionClientLocalSequenceNumber, transactionSequenceNumber, sequenceNumber, transactionMachineId, transactionArbitrator, arbitratorLocalSequenceNumber); } } \ No newline at end of file diff --git a/version2/src/java/iotcloud/ArbitrationRound.java b/version2/src/java/iotcloud/ArbitrationRound.java new file mode 100644 index 0000000..0f7d8c8 --- /dev/null +++ b/version2/src/java/iotcloud/ArbitrationRound.java @@ -0,0 +1,110 @@ +package iotcloud; + +import java.util.Set; +import java.util.HashSet; + +import java.util.List; +import java.util.ArrayList; + + +class ArbitrationRound { + + public static final int MAX_PARTS = 10; + + Set abortsBefore = null; + List parts = null; + Commit commit = null; + int currentSize = 0; + boolean didSendPart = false; + boolean didGenerateParts = false; + + public ArbitrationRound(Commit _commit, Set _abortsBefore) { + + parts = new ArrayList(); + + commit = _commit; + abortsBefore = _abortsBefore; + + + if (commit != null) { + commit.createCommitParts(); + currentSize += commit.getNumberOfParts(); + } + + currentSize += abortsBefore.size(); + } + + public void generateParts() { + if (didGenerateParts) { + return; + } + parts = new ArrayList(abortsBefore); + if (commit != null) { + parts.addAll(commit.getParts().values()); + } + } + + + public List getParts() { + return parts; + } + + public void removeParts(List removeParts) { + parts.removeAll(removeParts); + didSendPart = true; + } + + public boolean isDoneSending() { + if ((commit == null) && abortsBefore.isEmpty()) { + return true; + } + + return parts.isEmpty(); + } + + public Commit getCommit() { + return commit; + } + + public void setCommit(Commit _commit) { + if (commit != null) { + currentSize -= commit.getNumberOfParts(); + } + commit = _commit; + + if (commit != null) { + currentSize += commit.getNumberOfParts(); + } + } + + public void addAbort(Abort abort) { + abortsBefore.add(abort); + currentSize++; + } + + public void addAborts(Set aborts) { + abortsBefore.addAll(aborts); + currentSize += aborts.size(); + } + + + public Set getAborts() { + return abortsBefore; + } + + public int getAbortsCount() { + return abortsBefore.size(); + } + + public int getCurrentSize() { + return currentSize; + } + + public boolean isFull() { + return currentSize >= MAX_PARTS; + } + + public boolean didSendPart() { + return didSendPart; + } +} \ No newline at end of file diff --git a/version2/src/java/iotcloud/CloudComm.java b/version2/src/java/iotcloud/CloudComm.java index ac499e5..5741019 100644 --- a/version2/src/java/iotcloud/CloudComm.java +++ b/version2/src/java/iotcloud/CloudComm.java @@ -16,16 +16,20 @@ import java.security.SecureRandom; class CloudComm { - String baseurl; - Cipher encryptCipher; - Cipher decryptCipher; - Mac mac; - String password; - SecureRandom random; - static final int SALT_SIZE = 8; - static final int TIMEOUT_MILLIS = 100; - byte salt[]; - Table table; + private static final int SALT_SIZE = 8; + private static final int TIMEOUT_MILLIS = 100; + + private String baseurl; + private Cipher encryptCipher; + private Cipher decryptCipher; + private Mac mac; + private String password; + private SecureRandom random; + private byte salt[]; + private Table table; + private int listeningPort = -1; + private Thread localServerThread = null; + private boolean doEnd = false; /** * Empty Constructor needed for child class. @@ -36,11 +40,21 @@ class CloudComm { /** * Constructor for actual use. Takes in the url and password. */ - CloudComm(Table _table, String _baseurl, String _password) { + CloudComm(Table _table, String _baseurl, String _password, int _listeningPort) { this.table = _table; this.baseurl = _baseurl; this.password = _password; this.random = new SecureRandom(); + this.listeningPort = _listeningPort; + + if (this.listeningPort > 0) { + localServerThread = new Thread(new Runnable() { + public void run() { + localServerWorkerFunction(); + } + }); + localServerThread.start(); + } } /** @@ -48,7 +62,10 @@ class CloudComm { */ private SecretKeySpec initKey() { try { - PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), salt, 65536, 128); + PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray(), + salt, + 65536, + 128); SecretKey tmpkey = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec); return new SecretKeySpec(tmpkey.getEncoded(), "AES"); } catch (Exception e) { @@ -61,6 +78,11 @@ class CloudComm { * Inits the HMAC generator. */ private void initCrypt() { + + if (password == null) { + return; + } + try { SecretKeySpec key = initKey(); password = null; // drop password @@ -88,12 +110,22 @@ class CloudComm { } public void setSalt() throws ServerException { + + if (salt != null) { + // Salt already sent to server so dont set it again + return; + } + byte[] saltTmp = new byte[SALT_SIZE]; + random.nextBytes(saltTmp); + + URL url = null; + URLConnection con = null; + HttpURLConnection http = null; + try { - byte[] saltTmp = new byte[SALT_SIZE]; - random.nextBytes(saltTmp); - URL url = new URL(baseurl + "?req=setsalt"); - URLConnection con = url.openConnection(); - HttpURLConnection http = (HttpURLConnection) con; + url = new URL(baseurl + "?req=setsalt"); + con = url.openConnection(); + http = (HttpURLConnection) con; http.setRequestMethod("POST"); http.setFixedLengthStreamingMode(saltTmp.length); http.setDoOutput(true); @@ -104,15 +136,34 @@ class CloudComm { int responsecode = http.getResponseCode(); if (responsecode != HttpURLConnection.HTTP_OK) { // TODO: Remove this print - // System.out.println(responsecode); + System.out.println(responsecode); throw new Error("Invalid response"); } - salt = saltTmp; } catch (Exception e) { throw new ServerException("Failed setting salt", ServerException.TypeConnectTimeout); } - initCrypt(); + + + try { + InputStream is = http.getInputStream(); + DataInputStream dis = new DataInputStream(is); + // byte [] tmp = new byte[1]; + byte tmp = dis.readByte(); + + if (tmp == 0) { + salt = saltTmp; + initCrypt(); + } else { + getSalt(); // there was already a salt so we need to get it + } + + } catch (SocketTimeoutException e) { + throw new ServerException("setSalt failed", ServerException.TypeInputTimeout); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("setSlot failed"); + } } private void getSalt() throws ServerException { @@ -154,7 +205,6 @@ class CloudComm { e.printStackTrace(); throw new Error("getSlot failed"); } - } /* @@ -191,6 +241,8 @@ class CloudComm { OutputStream os = http.getOutputStream(); os.write(bytes); os.flush(); + + // System.out.println("Bytes Sent: " + bytes.length); } catch (SocketTimeoutException e) { throw new ServerException("putSlot failed", ServerException.TypeConnectTimeout); } catch (Exception e) { @@ -221,7 +273,6 @@ class CloudComm { } } - /** * Request the server to send all slots with the given * sequencenumber or newer. @@ -243,11 +294,11 @@ class CloudComm { http.setRequestMethod("POST"); http.setConnectTimeout(TIMEOUT_MILLIS); http.setReadTimeout(TIMEOUT_MILLIS); - - http.connect(); - } catch (ServerException e) { + } catch (SocketTimeoutException e) { throw new ServerException("getSlots failed", ServerException.TypeConnectTimeout); + } catch (ServerException e) { + throw e; } catch (Exception e) { e.printStackTrace(); throw new Error("getSlots failed"); @@ -262,7 +313,7 @@ class CloudComm { throw new Error("Bad Response: " + new String(resptype)); else return processSlots(dis); - } catch (ServerException e) { + } catch (SocketTimeoutException e) { throw new ServerException("getSlots failed", ServerException.TypeInputTimeout); } catch (Exception e) { e.printStackTrace(); @@ -277,11 +328,20 @@ class CloudComm { private Slot[] processSlots(DataInputStream dis) throws Exception { int numberofslots = dis.readInt(); int[] sizesofslots = new int[numberofslots]; + + + // System.out.println("number of slots: " + numberofslots); + + + Slot[] slots = new Slot[numberofslots]; for (int i = 0; i < numberofslots; i++) sizesofslots[i] = dis.readInt(); for (int i = 0; i < numberofslots; i++) { + + // System.out.println("Size of slot: " + sizesofslots[i]); + byte[] data = new byte[sizesofslots[i]]; dis.readFully(data); @@ -292,4 +352,137 @@ class CloudComm { dis.close(); return slots; } + + public byte[] sendLocalData(byte[] sendData, String host, int port) { + + if (salt == null) { + return null; + } + try { + // Encrypt the data for sending + byte[] encryptedData = encryptCipher.doFinal(sendData); + + // Open a TCP socket connection to a local device + Socket socket = new Socket(host, port); + socket.setReuseAddress(true); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + DataInputStream input = new DataInputStream(socket.getInputStream()); + + // Send data to output (length of data, the data) + output.writeInt(encryptedData.length); + output.write(encryptedData, 0, encryptedData.length); + output.flush(); + + int lengthOfReturnData = input.readInt(); + byte[] returnData = new byte[lengthOfReturnData]; + input.readFully(returnData); + returnData = decryptCipher.doFinal(returnData); + + // We are dont with this socket + socket.close(); + + return returnData; + } catch (SocketTimeoutException e) { + + } catch (BadPaddingException e) { + + } catch (IllegalBlockSizeException e) { + + } catch (UnknownHostException e) { + + } catch (IOException e) { + + } + + return null; + } + + private void localServerWorkerFunction() { + + ServerSocket inputSocket = null; + + try { + // Local server socket + inputSocket = new ServerSocket(listeningPort); + inputSocket.setReuseAddress(true); + inputSocket.setSoTimeout(TIMEOUT_MILLIS); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local server setup failure..."); + } + + while (!doEnd) { + + try { + // Accept incoming socket + Socket socket = inputSocket.accept(); + + DataInputStream input = new DataInputStream(socket.getInputStream()); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + + // Get the encrypted data from the server + int dataSize = input.readInt(); + byte[] readData = new byte[dataSize]; + input.readFully(readData); + + // Decrypt the data + readData = decryptCipher.doFinal(readData); + + // Process the data + byte[] sendData = table.acceptDataFromLocal(readData); + + // Encrypt the data for sending + sendData = encryptCipher.doFinal(sendData); + + // Send data to output (length of data, the data) + output.writeInt(sendData.length); + output.write(sendData, 0, sendData.length); + output.flush(); + + // close the socket + socket.close(); + } catch (SocketTimeoutException e) { + + } catch (BadPaddingException e) { + + } catch (IllegalBlockSizeException e) { + + } catch (UnknownHostException e) { + + } catch (IOException e) { + + } + } + + if (inputSocket != null) { + try { + inputSocket.close(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local server close failure..."); + } + } + } + + public void close() { + doEnd = true; + + try { + localServerThread.join(); + } catch (Exception e) { + e.printStackTrace(); + throw new Error("Local Server thread join issue..."); + } + + System.out.println("Done Closing"); + } + + protected void finalize() throws Throwable { + try { + close(); // close open files + } finally { + super.finalize(); + } + } + } diff --git a/version2/src/java/iotcloud/Commit.java b/version2/src/java/iotcloud/Commit.java index c43fcfd..7084d0e 100644 --- a/version2/src/java/iotcloud/Commit.java +++ b/version2/src/java/iotcloud/Commit.java @@ -94,7 +94,6 @@ class Commit { return transactionSequenceNumber; } - public Map getParts() { return parts; } @@ -243,4 +242,44 @@ class Commit { return bbEncode.array(); } + + private void setKVsMap(Map newKVs) { + keyValueUpdateSet.clear(); + liveKeys.clear(); + + keyValueUpdateSet.addAll(newKVs.values()); + liveKeys.addAll(newKVs.keySet()); + + } + + + public static Commit merge(Commit newer, Commit older, long newSequenceNumber) { + + if (older == null) { + return newer; + } else if (newer == null) { + return older; + } + + Map kvSet = new HashMap(); + for (KeyValue kv : older.getKeyValueUpdateSet()) { + kvSet.put(kv.getKey(), kv); + } + + for (KeyValue kv : newer.getKeyValueUpdateSet()) { + kvSet.put(kv.getKey(), kv); + } + + long transactionSequenceNumber = newer.getTransactionSequenceNumber(); + + if (transactionSequenceNumber == -1) { + transactionSequenceNumber = older.getTransactionSequenceNumber(); + } + + Commit newCommit = new Commit(newSequenceNumber, newer.getMachineId(), transactionSequenceNumber); + + newCommit.setKVsMap(kvSet); + + return newCommit; + } } \ No newline at end of file diff --git a/version2/src/java/iotcloud/CommitPart.java b/version2/src/java/iotcloud/CommitPart.java index cea9fbf..53f164c 100644 --- a/version2/src/java/iotcloud/CommitPart.java +++ b/version2/src/java/iotcloud/CommitPart.java @@ -82,10 +82,6 @@ class CommitPart extends Entry{ return sequenceNumber; } - public void setSequenceNumber(long _sequenceNumber) { - sequenceNumber = _sequenceNumber; - } - static Entry decode(Slot s, ByteBuffer bb) { long machineId = bb.getLong(); long sequenceNumber = bb.getLong(); diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index 9764223..354747d 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -11,6 +11,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.ArrayList; import java.util.Collections; +import java.nio.ByteBuffer; /** * IoTTable data structure. Provides client interface. @@ -28,7 +29,6 @@ final public class Table { static final double RESIZE_THRESHOLD = 0.75; static final int REJECTED_THRESHOLD = 5; - /* Helper Objects */ private SlotBuffer buffer = null; private CloudComm cloud = null; @@ -50,13 +50,14 @@ final public class Table { private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on - private long localCommitSequenceNumber = 0; + private long localArbitrationSequenceNumber = 0; + private boolean hadPartialSendToServer = false; + private boolean attemptedToSendToServer = false; /* Data Structures */ private Map committedKeyValueTable = null; // Table of committed key value pairs private Map speculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value private Map pendingTransactionSpeculatedKeyValueTable = null; // Table of speculated key value pairs, if there is a speculative value from the pending transactions - private Map liveNewKeyTable = null; // Table of live new keys private HashMap> lastMessageTable = null; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage); private HashMap> rejectedMessageWatchListTable = null; // Table of machine Ids and the set of rejected messages they have not seen yet @@ -71,20 +72,24 @@ final public class Table { private Map liveCommitsByKeyTable = null; private Map lastCommitSeenSequenceNumberByArbitratorTable = null; private Vector rejectedSlotList = null; // List of rejected slots that have yet to be sent to the server - private List pendingTransactionQueue = null; - private List pendingSendArbitrationEntries = null; + private List pendingSendArbitrationRounds = null; private List pendingSendArbitrationEntriesToDelete = null; private Map> transactionPartsSent = null; private Map outstandingTransactionStatus = null; + private Map liveAbortsGeneratedByLocal = null; + private Set> offlineTransactionsCommittedAndAtServer = null; + private Map> localCommunicationTable = null; + private Map lastTransactionSeenFromMachineFromServer = null; + private Map lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = null; - public Table(String baseurl, String password, long _localMachineId) { + public Table(String baseurl, String password, long _localMachineId, int listeningPort) { localMachineId = _localMachineId; - cloud = new CloudComm(this, baseurl, password); + cloud = new CloudComm(this, baseurl, password, listeningPort); init(); } @@ -127,16 +132,97 @@ final public class Table { lastCommitSeenSequenceNumberByArbitratorTable = new HashMap(); rejectedSlotList = new Vector(); pendingTransactionQueue = new ArrayList(); - pendingSendArbitrationEntries = new ArrayList(); pendingSendArbitrationEntriesToDelete = new ArrayList(); transactionPartsSent = new HashMap>(); outstandingTransactionStatus = new HashMap(); + liveAbortsGeneratedByLocal = new HashMap(); + offlineTransactionsCommittedAndAtServer = new HashSet>(); + localCommunicationTable = new HashMap>(); + lastTransactionSeenFromMachineFromServer = new HashMap(); + pendingSendArbitrationRounds = new ArrayList(); + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = new HashMap(); + // Other init stuff numberOfSlots = buffer.capacity(); setResizeThreshold(); } + // TODO: delete method + public synchronized void printSlots() { + long o = buffer.getOldestSeqNum(); + long n = buffer.getNewestSeqNum(); + + int[] types = new int[10]; + + int num = 0; + + int livec = 0; + int deadc = 0; + for (long i = o; i < (n + 1); i++) { + Slot s = buffer.getSlot(i); + + Vector entries = s.getEntries(); + + for (Entry e : entries) { + if (e.isLive()) { + int type = e.getType(); + types[type] = types[type] + 1; + num++; + livec++; + } else { + deadc++; + } + } + } + + for (int i = 0; i < 10; i++) { + System.out.println(i + " " + types[i]); + } + System.out.println("Live count: " + livec); + System.out.println("Dead count: " + deadc); + System.out.println("Old: " + o); + System.out.println("New: " + n); + System.out.println("Size: " + buffer.size()); + + // List strList = new ArrayList(); + // for (int i = 0; i < 100; i++) { + // String keyA = "a" + i; + // String keyB = "b" + i; + // String keyC = "c" + i; + // String keyD = "d" + i; + + // IoTString iKeyA = new IoTString(keyA); + // IoTString iKeyB = new IoTString(keyB); + // IoTString iKeyC = new IoTString(keyC); + // IoTString iKeyD = new IoTString(keyD); + + // strList.add(iKeyA); + // strList.add(iKeyB); + // strList.add(iKeyC); + // strList.add(iKeyD); + // } + + + // for (Long l : commitMap.keySet()) { + // for (Long l2 : commitMap.get(l).keySet()) { + // for (KeyValue kv : commitMap.get(l).get(l2).getkeyValueUpdateSet()) { + // strList.remove(kv.getKey()); + // System.out.print(kv.getKey() + " "); + // } + // } + // } + + // System.out.println(); + // System.out.println(); + + // for (IoTString s : strList) { + // System.out.print(s + " "); + // } + // System.out.println(); + // System.out.println(strList.size()); + } + /** * Initialize the table by inserting a table status as the first entry into the table status * also initialize the crypto stuff. @@ -154,6 +240,9 @@ final public class Table { array = new Slot[] {s}; // update local block chain validateAndUpdate(array, true); + } else if (array.length == 1) { + // in case we did push the slot BUT we failed to init it + validateAndUpdate(array, true); } else { throw new Error("Error on initialization"); } @@ -168,24 +257,32 @@ final public class Table { validateAndUpdate(newslots, true); } - // public String toString() { - // String retString = " Committed Table: \n"; - // retString += "---------------------------\n"; - // retString += commitedTable.toString(); +// public String toString() { +// String retString = " Committed Table: \n"; +// retString += "---------------------------\n"; +// retString += commitedTable.toString(); - // retString += "\n\n"; +// retString += "\n\n"; - // retString += " Speculative Table: \n"; - // retString += "---------------------------\n"; - // retString += speculativeTable.toString(); +// retString += " Speculative Table: \n"; +// retString += "---------------------------\n"; +// retString += speculativeTable.toString(); - // return retString; - // } +// return retString; +// } + + public synchronized void addLocalCommunication(long arbitrator, String hostName, int portNumber) { + localCommunicationTable.put(arbitrator, new Pair(hostName, portNumber)); + } public synchronized Long getArbitrator(IoTString key) { return arbitratorTable.get(key); } + public synchronized void close() { + cloud.close(); + } + public synchronized IoTString getCommitted(IoTString key) { KeyValue kv = committedKeyValueTable.get(key); @@ -266,17 +363,21 @@ final public class Table { } } - public synchronized void update() { + public synchronized boolean update() { try { Slot[] newSlots = cloud.getSlots(sequenceNumber + 1); validateAndUpdate(newSlots, false); sendToServer(null); + + return true; } catch (Exception e) { - e.printStackTrace(); + // e.printStackTrace(); } + + return false; } - public synchronized boolean createNewKey(IoTString keyName, long machineId) { + public synchronized boolean createNewKey(IoTString keyName, long machineId) throws ServerException { while (true) { if (arbitratorTable.get(keyName) != null) { // There is already an arbitrator @@ -291,7 +392,7 @@ final public class Table { } } - public void startTransaction() { + public synchronized void startTransaction() { // Create a new transaction, invalidates any old pending transactions. pendingTransactionBuilder = new PendingTransaction(localMachineId); } @@ -342,7 +443,35 @@ final public class Table { pendingTransactionBuilder = new PendingTransaction(localMachineId); - sendToServer(null); + try { + sendToServer(null); + } catch (ServerException e) { + + Set arbitratorTriedAndFailed = new HashSet(); + for (Iterator iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) { + Transaction transaction = iter.next(); + + if (arbitratorTriedAndFailed.contains(transaction.getArbitrator())) { + // Already contacted this client so ignore all attempts to contact this client + // to preserve ordering for arbitrator + continue; + } + + Pair sendReturn = sendTransactionToLocal(transaction); + + if (sendReturn.getFirst()) { + // Failed to contact over local + arbitratorTriedAndFailed.add(transaction.getArbitrator()); + } else { + // Successful contact or should not contact + + if (sendReturn.getSecond()) { + // did arbitrate + iter.remove(); + } + } + } + } return transactionStatus; } @@ -369,17 +498,11 @@ final public class Table { bufferResizeThreshold = resizeLower - 1 + random.nextInt(numberOfSlots - resizeLower); } - private boolean sendToServer(NewKey newKey) { + private boolean sendToServer(NewKey newKey) throws ServerException { try { // While we have stuff that needs inserting into the block chain - while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationEntries.size() > 0) || (newKey != null)) { - - // try { - // Thread.sleep(300); - // } catch (Exception e) { - - // } + while ((pendingTransactionQueue.size() > 0) || (pendingSendArbitrationRounds.size() > 0) || (newKey != null)) { // Create the slot Slot slot = new Slot(this, sequenceNumber + 1, localMachineId, buffer.getSlot(sequenceNumber).getHMAC()); @@ -410,16 +533,34 @@ final public class Table { } // Try to send to the server - Pair sendSlotsReturn = sendSlotsToServer(slot, newSize); + ThreeTuple sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != null); + + // if (sendSlotsReturn.getSecond()) { + // System.out.println("Second was true"); + // } + - if (sendSlotsReturn.getFirst()) { + if (/*sendSlotsReturn.getSecond() || */sendSlotsReturn.getFirst()) { // Did insert into the block chain - // New Key was successfully inserted into the block chain so dont want to insert it again - newKey = null; + if (sendSlotsReturn.getFirst()) { + // This slot was what was inserted not a previous slot + + // New Key was successfully inserted into the block chain so dont want to insert it again + newKey = null; + } // Remove the aborts and commit parts that were sent from the pending to send queue - pendingSendArbitrationEntries.removeAll(pendingSendArbitrationEntriesToDelete); + for (Iterator iter = pendingSendArbitrationRounds.iterator(); iter.hasNext(); ) { + ArbitrationRound round = iter.next(); + round.removeParts(pendingSendArbitrationEntriesToDelete); + + if (round.isDoneSending()) { + // Sent all the parts + iter.remove(); + } + } + for (Transaction transaction : transactionPartsSent.keySet()) { @@ -454,15 +595,15 @@ final public class Table { pendingSendArbitrationEntriesToDelete.clear(); transactionPartsSent.clear(); - if (sendSlotsReturn.getSecond().length != 0) { + if (sendSlotsReturn.getThird().length != 0) { // insert into the local block chain - validateAndUpdate(sendSlotsReturn.getSecond(), true); + validateAndUpdate(sendSlotsReturn.getThird(), true); } } } catch (ServerException e) { if (e.getType() != ServerException.TypeInputTimeout) { - e.printStackTrace(); + // e.printStackTrace(); // Nothing was able to be sent to the server so just clear these data structures for (Transaction transaction : transactionPartsSent.keySet()) { @@ -471,34 +612,305 @@ final public class Table { transaction.setSequenceNumber(-1); } } - - pendingSendArbitrationEntriesToDelete.clear(); - transactionPartsSent.clear(); } else { // There was a partial send to the server + hadPartialSendToServer = true; } + + pendingSendArbitrationEntriesToDelete.clear(); + transactionPartsSent.clear(); + + throw e; } return newKey == null; } - private Pair sendSlotsToServer(Slot slot, int newSize) throws ServerException { + private Pair sendTransactionToLocal(Transaction transaction) { + + // Get the devices local communications + Pair localCommunicationInformation = localCommunicationTable.get(transaction.getArbitrator()); + + if (localCommunicationInformation == null) { + // Cant talk to that device locally so do nothing + return new Pair(false, false); + } + + // Get the size of the send data + int sendDataSize = Integer.BYTES + Long.BYTES; + for (TransactionPart part : transaction.getParts().values()) { + sendDataSize += part.getSize(); + } + + Long lastArbitrationDataLocalSequenceNumber = (long) - 1; + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()) != null) { + lastArbitrationDataLocalSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(transaction.getArbitrator()); + } + + // Make the send data size + byte[] sendData = new byte[sendDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(sendData); + + // Encode the data + bbEncode.putLong(lastArbitrationDataLocalSequenceNumber); + bbEncode.putInt(transaction.getParts().size()); + for (TransactionPart part : transaction.getParts().values()) { + part.encode(bbEncode); + } + + + + + + + + + + + + + System.out.println("================================"); + System.out.println("Sending Locally"); + for (KeyValue kv : transaction.getKeyValueUpdateSet()) { + System.out.println(kv); + } + + + + + + + + + + + + // Send by local + byte[] returnData = cloud.sendLocalData(sendData, localCommunicationInformation.getFirst(), localCommunicationInformation.getSecond()); + + + System.out.println("--------------------------------"); + System.out.println(); + + if (returnData == null) { + // Could not contact server + return new Pair(true, false); + } + + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(returnData); + boolean didCommit = bbDecode.get() == 1; + boolean couldArbitrate = bbDecode.get() == 1; + int numberOfEntries = bbDecode.getInt(); + boolean foundAbort = false; + + for (int i = 0; i < numberOfEntries; i++) { + byte type = bbDecode.get(); + if (type == Entry.TypeAbort) { + Abort abort = (Abort)Abort.decode(null, bbDecode); + + if ((abort.getTransactionMachineId() == localMachineId) && (abort.getTransactionClientLocalSequenceNumber() == transaction.getClientLocalSequenceNumber())) { + foundAbort = true; + } + + processEntry(abort); + } else if (type == Entry.TypeCommitPart) { + CommitPart commitPart = (CommitPart)CommitPart.decode(null, bbDecode); + processEntry(commitPart); + } + } + + updateLiveStateFromLocal(); + + if (couldArbitrate) { + TransactionStatus status = transaction.getTransactionStatus(); + if (didCommit) { + status.setStatus(TransactionStatus.StatusCommitted); + } else { + status.setStatus(TransactionStatus.StatusAborted); + } + } else { + if (foundAbort) { + TransactionStatus status = transaction.getTransactionStatus(); + status.setStatus(TransactionStatus.StatusAborted); + return new Pair(false, false); + } + } + + return new Pair(false, true); + } + + public synchronized byte[] acceptDataFromLocal(byte[] data) { + // Decode the data + ByteBuffer bbDecode = ByteBuffer.wrap(data); + long lastArbitratedSequenceNumberSeen = bbDecode.getLong(); + int numberOfParts = bbDecode.getInt(); + + // If we did commit a transaction or not + boolean didCommit = false; + boolean couldArbitrate = false; + + if (numberOfParts != 0) { + + // decode the transaction + Transaction transaction = new Transaction(); + for (int i = 0; i < numberOfParts; i++) { + bbDecode.get(); + TransactionPart newPart = (TransactionPart)TransactionPart.decode(null, bbDecode); + transaction.addPartDecode(newPart); + } + + // Arbitrate on transaction and pull relevant return data + Pair localArbitrateReturn = arbitrateOnLocalTransaction(transaction); + couldArbitrate = localArbitrateReturn.getFirst(); + didCommit = localArbitrateReturn.getSecond(); + + updateLiveStateFromLocal(); + + // Transaction was sent to the server so keep track of it to prevent double commit + if (transaction.getSequenceNumber() != -1) { + offlineTransactionsCommittedAndAtServer.add(transaction.getId()); + } + } + + // The data to send back + int returnDataSize = 0; + List unseenArbitrations = new ArrayList(); + + // Get the aborts to send back + List abortLocalSequenceNumbers = new ArrayList(liveAbortsGeneratedByLocal.keySet()); + Collections.sort(abortLocalSequenceNumbers); + for (Long localSequenceNumber : abortLocalSequenceNumbers) { + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + Abort abort = liveAbortsGeneratedByLocal.get(localSequenceNumber); + unseenArbitrations.add(abort); + returnDataSize += abort.getSize(); + } + + // Get the commits to send back + Map commitForClientTable = liveCommitsTable.get(localMachineId); + if (commitForClientTable != null) { + List commitLocalSequenceNumbers = new ArrayList(commitForClientTable.keySet()); + Collections.sort(commitLocalSequenceNumbers); + + for (Long localSequenceNumber : commitLocalSequenceNumbers) { + Commit commit = commitForClientTable.get(localSequenceNumber); + + if (localSequenceNumber <= lastArbitratedSequenceNumberSeen) { + continue; + } + + System.out.println("---"); + for (KeyValue kv : commit.getKeyValueUpdateSet()) { + System.out.println("Sending Commit Locally: " + kv); + } + System.out.println("---"); + + unseenArbitrations.addAll(commit.getParts().values()); + + for (CommitPart commitPart : commit.getParts().values()) { + returnDataSize += commitPart.getSize(); + } + } + } + + // Number of arbitration entries to decode + returnDataSize += 2 * Integer.BYTES; + + // Boolean of did commit or not + if (numberOfParts != 0) { + returnDataSize += Byte.BYTES; + } + + // Data to send Back + byte[] returnData = new byte[returnDataSize]; + ByteBuffer bbEncode = ByteBuffer.wrap(returnData); + + if (numberOfParts != 0) { + if (didCommit) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + if (couldArbitrate) { + bbEncode.put((byte)1); + } else { + bbEncode.put((byte)0); + } + } + + bbEncode.putInt(unseenArbitrations.size()); + for (Entry entry : unseenArbitrations) { + entry.encode(bbEncode); + } + + return returnData; + } + + private ThreeTuple sendSlotsToServer(Slot slot, int newSize, boolean isNewKey) throws ServerException { - boolean inserted = true; + boolean attemptedToSendToServerTmp = attemptedToSendToServer; + attemptedToSendToServer = true; + + boolean inserted = false; + boolean lastTryInserted = false; Slot[] array = cloud.putSlot(slot, newSize); if (array == null) { array = new Slot[] {slot}; rejectedSlotList.clear(); + inserted = true; } else { if (array.length == 0) { throw new Error("Server Error: Did not send any slots"); } - rejectedSlotList.add(slot.getSequenceNumber()); - inserted = false; + + // if (attemptedToSendToServerTmp) { + if (hadPartialSendToServer) { + + boolean isInserted = false; + for (Slot s : array) { + if ((s.getSequenceNumber() == slot.getSequenceNumber()) && (s.getMachineID() == localMachineId)) { + isInserted = true; + break; + } + } + + for (Slot s : array) { + if (isInserted) { + break; + } + + // Process each entry in the slot + for (Entry entry : s.getEntries()) { + + if (entry.getType() == Entry.TypeLastMessage) { + LastMessage lastMessage = (LastMessage)entry; + + if ((lastMessage.getMachineID() == localMachineId) && (lastMessage.getSequenceNumber() == slot.getSequenceNumber())) { + isInserted = true; + break; + } + } + } + } + + if (!isInserted) { + rejectedSlotList.add(slot.getSequenceNumber()); + lastTryInserted = false; + } else { + lastTryInserted = true; + } + } else { + rejectedSlotList.add(slot.getSequenceNumber()); + lastTryInserted = false; + } } - return new Pair(inserted, array); + return new ThreeTuple(inserted, lastTryInserted, array); } /** @@ -545,22 +957,33 @@ final public class Table { transactionPartsSent.clear(); pendingSendArbitrationEntriesToDelete.clear(); - // Insert pending arbitration data - for (Entry arbitrationData : pendingSendArbitrationEntries) { + for (ArbitrationRound round : pendingSendArbitrationRounds) { + boolean isFull = false; + round.generateParts(); + List parts = round.getParts(); + + // Insert pending arbitration data + for (Entry arbitrationData : parts) { + + // If it is an abort then we need to set some information + if (arbitrationData instanceof Abort) { + ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); + } + + if (!slot.hasSpace(arbitrationData)) { + // No space so cant do anything else with these data entries + isFull = true; + break; + } - // If it is an abort then we need to set some information - if (arbitrationData instanceof Abort) { - ((Abort)arbitrationData).setSequenceNumber(slot.getSequenceNumber()); + // Add to this current slot and add it to entries to delete + slot.addEntry(arbitrationData); + pendingSendArbitrationEntriesToDelete.add(arbitrationData); } - if (!slot.hasSpace(arbitrationData)) { - // No space so cant do anything else with these data entries + if (isFull) { break; } - - // Add to this current slot and add it to entries to delete - slot.addEntry(arbitrationData); - pendingSendArbitrationEntriesToDelete.add(arbitrationData); } // Insert as many transactions as possible while keeping order @@ -795,6 +1218,12 @@ final public class Table { sequenceNumber = newSlots[newSlots.length - 1].getSequenceNumber(); updateLiveStateFromServer(); + + // No Need to remember after we pulled from the server + offlineTransactionsCommittedAndAtServer.clear(); + + // This is invalidated now + hadPartialSendToServer = false; } private void updateLiveStateFromServer() { @@ -920,7 +1349,7 @@ final public class Table { newTransactionParts.clear(); } - public void arbitrateFromServer() { + private void arbitrateFromServer() { if (liveTransactionBySequenceNumberTable.size() == 0) { // Nothing to arbitrate on so move on @@ -936,6 +1365,7 @@ final public class Table { // The last transaction arbitrated on long lastTransactionCommitted = -1; + Set generatedAborts = new HashSet(); for (Long transactionSequenceNumber : transactionSequenceNumbers) { Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber); @@ -945,6 +1375,11 @@ final public class Table { continue; } + if (offlineTransactionsCommittedAndAtServer.contains(transaction.getId())) { + // We have seen this already locally so dont commit again + continue; + } + if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break @@ -952,6 +1387,15 @@ final public class Table { break; } + // update the largest transaction seen by arbitrator from server + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) == null) { + lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + } else { + Long lastTransactionSeenFromMachine = lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()); + if (transaction.getClientLocalSequenceNumber() > lastTransactionSeenFromMachine) { + lastTransactionSeenFromMachineFromServer.put(transaction.getMachineId(), transaction.getClientLocalSequenceNumber()); + } + } if (transaction.evaluateGuard(committedKeyValueTable, speculativeTableTmp, null)) { // Guard evaluated as true @@ -972,22 +1416,25 @@ final public class Table { transaction.getClientLocalSequenceNumber(), transaction.getSequenceNumber(), transaction.getMachineId(), - transaction.getArbitrator()); + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; - // Add the abort to the queue of aborts to send out - pendingSendArbitrationEntries.add(newAbort); + generatedAborts.add(newAbort); // Insert the abort so we can process processEntry(newAbort); } } + Commit newCommit = null; + // If there is something to commit if (speculativeTableTmp.size() != 0) { // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, lastTransactionCommitted); - localCommitSequenceNumber++; + newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, lastTransactionCommitted); + localArbitrationSequenceNumber++; // Add all the new keys to the commit for (KeyValue kv : speculativeTableTmp.values()) { @@ -998,34 +1445,58 @@ final public class Table { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - pendingSendArbitrationEntries.addAll(newCommit.getParts().values()); // Insert the commit so we can process it for (CommitPart commitPart : newCommit.getParts().values()) { processEntry(commitPart); } } + + + if ((newCommit != null) || (generatedAborts.size() > 0)) { + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, generatedAborts); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } + } } - public void arbitrateOnLocalTransaction(Transaction transaction) { + private Pair arbitrateOnLocalTransaction(Transaction transaction) { // Check if this machine arbitrates for this transaction if not then we cant arbitrate this transaction if (transaction.getArbitrator() != localMachineId) { - return; + return new Pair(false, false); } if (!transaction.isComplete()) { // Will arbitrate in incorrect order if we continue so just break // Most likely this - return; + return new Pair(false, false); + } + + if (transaction.getMachineId() != localMachineId) { + // dont do this check for local transactions + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) != null) { + if (lastTransactionSeenFromMachineFromServer.get(transaction.getMachineId()) > transaction.getClientLocalSequenceNumber()) { + // We've have already seen this from the server + + System.out.println("Local Arbitrate Seen Already from server, rejected"); + return new Pair(false, false); + } + } } if (transaction.evaluateGuard(committedKeyValueTable, null, null)) { // Guard evaluated as true // Create the commit and increment the commit sequence number - Commit newCommit = new Commit(localCommitSequenceNumber, localMachineId, -1); - localCommitSequenceNumber++; + Commit newCommit = new Commit(localArbitrationSequenceNumber, localMachineId, -1); + localArbitrationSequenceNumber++; // Update the local changes so we can make the commit for (KeyValue kv : transaction.getKeyValueUpdateSet()) { @@ -1036,21 +1507,161 @@ final public class Table { newCommit.createCommitParts(); // Append all the commit parts to the end of the pending queue waiting for sending to the server - pendingSendArbitrationEntries.addAll(newCommit.getParts().values()); + ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet()); + pendingSendArbitrationRounds.add(arbitrationRound); - // Insert the commit so we can process it - for (CommitPart commitPart : newCommit.getParts().values()) { - processEntry(commitPart); + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } else { + // Insert the commit so we can process it + for (CommitPart commitPart : newCommit.getParts().values()) { + processEntry(commitPart); + } } - TransactionStatus status = transaction.getTransactionStatus(); - status.setStatus(TransactionStatus.StatusCommitted); + if (transaction.getMachineId() == localMachineId) { + TransactionStatus status = transaction.getTransactionStatus(); + if (status != null) { + status.setStatus(TransactionStatus.StatusCommitted); + } + } + updateLiveStateFromLocal(); + return new Pair(true, true); } else { - // Guard evaluated was false so create abort - TransactionStatus status = transaction.getTransactionStatus(); - status.setStatus(TransactionStatus.StatusAborted); + + if (transaction.getMachineId() == localMachineId) { + // For locally created messages update the status + + // Guard evaluated was false so create abort + TransactionStatus status = transaction.getTransactionStatus(); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); + } + } else { + + Set addAbortSet = new HashSet(); + + + // Create the abort + Abort newAbort = new Abort(null, + transaction.getClientLocalSequenceNumber(), + -1, + transaction.getMachineId(), + transaction.getArbitrator(), + localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + addAbortSet.add(newAbort); + + + // Append all the commit parts to the end of the pending queue waiting for sending to the server + ArbitrationRound arbitrationRound = new ArbitrationRound(null, addAbortSet); + pendingSendArbitrationRounds.add(arbitrationRound); + + if (compactArbitrationData()) { + ArbitrationRound newArbitrationRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + for (CommitPart commitPart : newArbitrationRound.getCommit().getParts().values()) { + processEntry(commitPart); + } + } + } + + updateLiveStateFromLocal(); + return new Pair(true, false); + } + } + + /** + * Compacts the arbitration data my merging commits and aggregating aborts so that a single large push of commits can be done instead of many small updates + */ + private boolean compactArbitrationData() { + + + if (pendingSendArbitrationRounds.size() < 2) { + // Nothing to compact so do nothing + return false; + } + + ArbitrationRound lastRound = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - 1); + if (lastRound.didSendPart()) { + return false; + } + + boolean hadCommit = (lastRound.getCommit() == null); + boolean gotNewCommit = false; + + int numberToDelete = 1; + while (numberToDelete < pendingSendArbitrationRounds.size()) { + ArbitrationRound round = pendingSendArbitrationRounds.get(pendingSendArbitrationRounds.size() - numberToDelete - 1); + + if (round.isFull() || round.didSendPart()) { + // Stop since there is a part that cannot be compacted and we need to compact in order + break; + } + + if (round.getCommit() == null) { + + // Try compacting aborts only + int newSize = round.getCurrentSize() + lastRound.getAbortsCount(); + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; + } + lastRound.addAborts(round.getAborts()); + } else { + + // Create a new larger commit + Commit newCommit = Commit.merge(lastRound.getCommit(), round.getCommit(), localArbitrationSequenceNumber); + localArbitrationSequenceNumber++; + + // Create the commit parts so that we can count them + newCommit.createCommitParts(); + + // Calculate the new size of the parts + int newSize = newCommit.getNumberOfParts(); + newSize += lastRound.getAbortsCount(); + newSize += round.getAbortsCount(); + + if (newSize > ArbitrationRound.MAX_PARTS) { + // Cant compact since it would be too large + break; + } + + // Set the new compacted part + lastRound.setCommit(newCommit); + lastRound.addAborts(round.getAborts()); + gotNewCommit = true; + } + + numberToDelete++; + } + + if (numberToDelete != 1) { + // If there is a compaction + + // Delete the previous pieces that are now in the new compacted piece + if (numberToDelete == pendingSendArbitrationRounds.size()) { + pendingSendArbitrationRounds.clear(); + } else { + for (int i = 0; i < numberToDelete; i++) { + pendingSendArbitrationRounds.remove(pendingSendArbitrationRounds.size() - 1); + } + } + + // Add the new compacted into the pending to send list + pendingSendArbitrationRounds.add(lastRound); + + // Should reinsert into the commit processor + if (hadCommit && gotNewCommit) { + return true; + } } + + return false; } /** @@ -1136,10 +1747,18 @@ final public class Table { lastCommitSeenSequenceNumber = lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()); } + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()) != null) { - - - + long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(commit.getMachineId()); + if (commit.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + } + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(commit.getMachineId(), commit.getSequenceNumber()); + } // We have already seen this commit before so need to do the full processing on this commit if (commit.getSequenceNumber() <= lastCommitSeenSequenceNumber) { @@ -1157,7 +1776,6 @@ final public class Table { continue; } - // If we got here then this is a brand new commit and needs full processing // Get what commits should be edited, these are the commits that have live values for their keys @@ -1186,7 +1804,13 @@ final public class Table { } // Update the last seen sequence number from this arbitrator - lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); + if (lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId()) != null) { + if (commit.getSequenceNumber() > lastCommitSeenSequenceNumberByArbitratorTable.get(commit.getMachineId())) { + lastCommitSeenSequenceNumberByArbitratorTable.put(commit.getMachineId(), commit.getSequenceNumber()); + } + } + + // Update the last transaction that was updated if we can if (commit.getTransactionSequenceNumber() != -1) { @@ -1201,11 +1825,17 @@ final public class Table { // We processed a new commit that we havent seen before didProcessANewCommit = true; + + + System.out.println("============"); // Update the committed table of keys and which commit is using which key for (KeyValue kv : commit.getKeyValueUpdateSet()) { + System.out.println("Committing: " + kv); committedKeyValueTable.put(kv.getKey(), kv); liveCommitsByKeyTable.put(kv.getKey(), commit); } + System.out.println("--------------"); + System.out.println(); } } @@ -1367,7 +1997,6 @@ final public class Table { } } - /** * Process this slot, entry by entry. Also update the latest message sent by slot */ @@ -1537,19 +2166,44 @@ final public class Table { previouslySeenAbort.setDead(); // Delete old version of the abort since we got a rescued newer version } + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.put(entry.getArbitratorLocalSequenceNumber(), entry); + } + if ((entry.getSequenceNumber() != -1) && (lastMessageTable.get(entry.getTransactionMachineId()).getFirst() >= entry.getSequenceNumber())) { // The machine already saw this so it is dead entry.setDead(); - liveAbortTable.remove(entry); + liveAbortTable.remove(entry.getAbortId()); + + if (entry.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(entry.getArbitratorLocalSequenceNumber()); + } + return; } - // update the transaction status - TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); - if (status != null) { - status.setStatus(TransactionStatus.StatusAborted); + if (entry.getTransactionSequenceNumber() != -1) { + // update the transaction status if it was sent to the server + TransactionStatus status = outstandingTransactionStatus.remove(entry.getTransactionSequenceNumber()); + if (status != null) { + status.setStatus(TransactionStatus.StatusAborted); + } + } + + // Update the last arbitration data that we have seen so far + if (lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()) != null) { + + long lastArbitrationSequenceNumber = lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.get(entry.getTransactionArbitrator()); + if (entry.getSequenceNumber() > lastArbitrationSequenceNumber) { + // Is larger + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); + } + + } else { + // Never seen any data from this arbitrator so record the first one + lastArbitrationDataLocalSequenceNumberSeenFromArbitrator.put(entry.getTransactionArbitrator(), entry.getSequenceNumber()); } @@ -1605,7 +2259,7 @@ final public class Table { Map, CommitPart> commitPart = newCommitParts.get(entry.getMachineId()); if (commitPart == null) { - // Dont have a table for this machine Id yet so make one + // Don't have a table for this machine Id yet so make one commitPart = new HashMap, CommitPart>(); newCommitParts.put(entry.getMachineId(), commitPart); } @@ -1658,6 +2312,10 @@ final public class Table { if ((abort.getTransactionMachineId() == machineId) && (abort.getSequenceNumber() <= seqNum)) { abort.setDead(); i.remove(); + + if (abort.getTransactionArbitrator() == localMachineId) { + liveAbortsGeneratedByLocal.remove(abort.getArbitratorLocalSequenceNumber()); + } } } @@ -1698,9 +2356,17 @@ final public class Table { // Make sure the server is not playing any games if (machineId == localMachineId) { - // We were not making any updates and we had a machine mismatch - if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { - throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + seqNum + " got: " + lastMessageSeqNum); + if (hadPartialSendToServer) { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum > seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed at least: " + lastMessageSeqNum + " got: " + seqNum); + } + + } else { + // We were not making any updates and we had a machine mismatch + if (lastMessageSeqNum != seqNum && !acceptUpdatesToLocal) { + throw new Error("Server Error: Mismatch on local machine sequence number, needed: " + lastMessageSeqNum + " got: " + seqNum); + } } } else { if (lastMessageSeqNum > seqNum) { diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index 64b9a90..9e80def 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -26,18 +26,242 @@ public class Test { test6(); } else if (args[0].equals("7")) { test7(); + } else if (args[0].equals("8")) { + test8(); } } + static void test8() { + + boolean foundError = false; + List transStatusList = new ArrayList(); + + // Setup the 2 clients + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, 6000); + + System.out.println("Init Table t1s"); + + while (true) { + try { + System.out.println("-==-=-=-=-=-=-=-==-=-"); + t1.initTable(); + break; + } catch (Exception e) {} + } + + + System.out.println("Update Table t2"); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, 6001); + while (t2.update() == false) {} + + t1.addLocalCommunication(351, "127.0.0.1", 6001); + t2.addLocalCommunication(321, "127.0.0.1", 6000); + + // Make the Keys + System.out.println("Setting up keys"); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + + String a = "a" + i; + String b = "b" + i; + String c = "c" + i; + String d = "d" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + IoTString ic = new IoTString(c); + IoTString id = new IoTString(d); + + while (true) { + try { + t1.createNewKey(ia, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t1.createNewKey(ib, 351); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(ic, 321); + break; + } catch (Exception e) { } + } + + while (true) { + try { + t2.createNewKey(id, 351); + break; + } catch (Exception e) { } + } + } + + // Do Updates for the keys + System.out.println("Setting Key-Values..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); + String keyA = "a" + i; + String keyB = "b" + i; + String keyC = "c" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueB = "b" + i; + String valueC = "c" + i; + String valueD = "d" + i; + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyB = new IoTString(keyB); + IoTString iKeyC = new IoTString(keyC); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueB = new IoTString(valueB); + IoTString iValueC = new IoTString(valueC); + IoTString iValueD = new IoTString(valueD); + + + String keyAPrev = "a" + (i - 1); + String keyBPrev = "b" + (i - 1); + String keyCPrev = "c" + (i - 1); + String keyDPrev = "d" + (i - 1); + String valueAPrev = "a" + (i - 1); + String valueBPrev = "b" + (i - 1); + String valueCPrev = "c" + (i - 1); + String valueDPrev = "d" + (i - 1); + + IoTString iKeyAPrev = new IoTString(keyAPrev); + IoTString iKeyBPrev = new IoTString(keyBPrev); + IoTString iKeyCPrev = new IoTString(keyCPrev); + IoTString iKeyDPrev = new IoTString(keyDPrev); + IoTString iValueAPrev = new IoTString(valueAPrev); + IoTString iValueBPrev = new IoTString(valueBPrev); + IoTString iValueCPrev = new IoTString(valueCPrev); + IoTString iValueDPrev = new IoTString(valueDPrev); + + t1.startTransaction(); + t1.addKV(iKeyA, iValueA); + transStatusList.add(t1.commitTransaction()); + + t1.startTransaction(); + t1.addKV(iKeyB, iValueB); + transStatusList.add(t1.commitTransaction()); + + t2.startTransaction(); + t2.addKV(iKeyC, iValueC); + transStatusList.add(t2.commitTransaction()); + + t2.startTransaction(); + t2.addKV(iKeyD, iValueD); + transStatusList.add(t2.commitTransaction()); + } + + System.out.println("Updating..."); + while (t1.update() == false) {} + while (t2.update() == false) {} + while (t1.update() == false) {} + + System.out.println("Checking Key-Values..."); + for (int i = 0; i < NUMBER_OF_TESTS; i++) { + + String keyA = "a" + i; + String keyB = "b" + i; + String keyC = "c" + i; + String keyD = "d" + i; + String valueA = "a" + i; + String valueB = "b" + i; + String valueC = "c" + i; + String valueD = "d" + i; + + IoTString iKeyA = new IoTString(keyA); + IoTString iKeyB = new IoTString(keyB); + IoTString iKeyC = new IoTString(keyC); + IoTString iKeyD = new IoTString(keyD); + IoTString iValueA = new IoTString(valueA); + IoTString iValueB = new IoTString(valueB); + IoTString iValueC = new IoTString(valueC); + IoTString iValueD = new IoTString(valueD); + + + IoTString testValA1 = t1.getCommitted(iKeyA); + IoTString testValB1 = t1.getCommitted(iKeyB); + IoTString testValC1 = t1.getCommitted(iKeyC); + IoTString testValD1 = t1.getCommitted(iKeyD); + + IoTString testValA2 = t2.getCommitted(iKeyA); + IoTString testValB2 = t2.getCommitted(iKeyB); + IoTString testValC2 = t2.getCommitted(iKeyC); + IoTString testValD2 = t2.getCommitted(iKeyD); + + if ((testValA1 == null) || (testValA1.equals(iValueA) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyA + " " + testValA1); + foundError = true; + } + + if ((testValB1 == null) || (testValB1.equals(iValueB) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyB + " " + testValB1); + foundError = true; + } + + if ((testValC1 == null) || (testValC1.equals(iValueC) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyC + " " + testValC1); + foundError = true; + } + + if ((testValD1 == null) || (testValD1.equals(iValueD) == false)) { + System.out.println("Key-Value t1 incorrect: " + keyD + " " + testValD1); + foundError = true; + } + + + if ((testValA2 == null) || (testValA2.equals(iValueA) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyA + " " + testValA2); + foundError = true; + } + + if ((testValB2 == null) || (testValB2.equals(iValueB) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyB + " " + testValB2); + foundError = true; + } + + if ((testValC2 == null) || (testValC2.equals(iValueC) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyC + " " + testValC2); + foundError = true; + } + + if ((testValD2 == null) || (testValD2.equals(iValueD) == false)) { + System.out.println("Key-Value t2 incorrect: " + keyD + " " + testValD2); + foundError = true; + } + } + + for (TransactionStatus status : transStatusList) { + if (status.getStatus() != TransactionStatus.StatusCommitted) { + foundError = true; + } + } + + if (foundError) { + System.out.println("Found Errors..."); + } else { + System.out.println("No Errors Found..."); + } + + t1.close(); + t2.close(); + } + static void test7() throws ServerException { boolean foundError = false; List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); // Make the Keys @@ -251,9 +475,9 @@ public class Test { List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); // Make the Keys @@ -412,9 +636,9 @@ public class Test { List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); @@ -629,9 +853,9 @@ public class Test { List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); // Make the Keys @@ -804,9 +1028,9 @@ public class Test { List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); @@ -989,11 +1213,11 @@ public class Test { List transStatusList = new ArrayList(); // Setup the 2 clients - Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321, -1); t1.initTable(); System.out.println("T1 Ready"); - Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351, -1); t2.update(); System.out.println("T2 Ready"); @@ -1001,6 +1225,7 @@ public class Test { System.out.println("Setting up keys"); startTime = System.currentTimeMillis(); for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); String a = "a" + i; String b = "b" + i; String c = "c" + i; @@ -1023,6 +1248,7 @@ public class Test { System.out.println("Setting Key-Values..."); startTime = System.currentTimeMillis(); for (int i = 0; i < NUMBER_OF_TESTS; i++) { + System.out.println(i); String keyA = "a" + i; String keyB = "b" + i; String keyC = "c" + i; @@ -1041,24 +1267,42 @@ public class Test { IoTString iValueC = new IoTString(valueC); IoTString iValueD = new IoTString(valueD); + + System.out.println("==============================================================================="); + System.out.println("AAAAAAAA"); + System.out.println("==============================================================================="); t1.startTransaction(); t1.addKV(iKeyA, iValueA); transStatusList.add(t1.commitTransaction()); + System.out.println(); + System.out.println("==============================================================================="); + System.out.println("BBBBBBB"); + System.out.println("==============================================================================="); t1.startTransaction(); t1.addKV(iKeyB, iValueB); transStatusList.add(t1.commitTransaction()); + System.out.println(); + System.out.println("==============================================================================="); + System.out.println("CCCCCCC"); + System.out.println("==============================================================================="); t2.startTransaction(); t2.addKV(iKeyC, iValueC); transStatusList.add(t2.commitTransaction()); + System.out.println(); + System.out.println("==============================================================================="); + System.out.println("DDDDDDDDDD"); + System.out.println("==============================================================================="); t2.startTransaction(); t2.addKV(iKeyD, iValueD); transStatusList.add(t2.commitTransaction()); + System.out.println(); + } endTime = System.currentTimeMillis(); System.out.println("Time Taken: " + (double) ((endTime - startTime) / 1000.0) ); @@ -1157,5 +1401,13 @@ public class Test { } else { System.out.println("No Errors Found..."); } + + System.out.println(); + System.out.println(); + t1.printSlots(); + + System.out.println(); + System.out.println(); + t2.printSlots(); } } diff --git a/version2/src/java/iotcloud/Transaction.java b/version2/src/java/iotcloud/Transaction.java index 82280c0..b3a0490 100644 --- a/version2/src/java/iotcloud/Transaction.java +++ b/version2/src/java/iotcloud/Transaction.java @@ -125,7 +125,6 @@ class Transaction { return parts; } - public boolean didSendAPartToServer() { return didSendAPartToServer; } @@ -162,7 +161,6 @@ class Transaction { return partsPendingSend.isEmpty(); } - public Set getKeyValueUpdateSet() { return keyValueUpdateSet; } diff --git a/version2/src/server/iotquery.cpp b/version2/src/server/iotquery.cpp index 0b1c4b3..64639b3 100644 --- a/version2/src/server/iotquery.cpp +++ b/version2/src/server/iotquery.cpp @@ -11,11 +11,11 @@ using namespace std; -const char * query_str="QUERY_STRING"; -const char * uri_str="REQUEST_URI"; -const char * method_str="REQUEST_METHOD"; -const char * iotcloudroot_str="IOTCLOUD_ROOT"; -const char * length_str="CONTENT_LENGTH"; +const char * query_str = "QUERY_STRING"; +const char * uri_str = "REQUEST_URI"; +const char * method_str = "REQUEST_METHOD"; +const char * iotcloudroot_str = "IOTCLOUD_ROOT"; +const char * length_str = "CONTENT_LENGTH"; IoTQuery::IoTQuery(FCGX_Request *request) : request(request), @@ -52,7 +52,7 @@ IoTQuery::~IoTQuery() { bool IoTQuery::checkDirectory() { struct stat s; - int err=stat(directory, &s); + int err = stat(directory, &s); if (-1 == err) return false; return S_ISDIR(s.st_mode); @@ -65,13 +65,13 @@ bool IoTQuery::checkDirectory() { */ void IoTQuery::decodeQuery() { - int len=strlen(query); - char * str=new char[len+1]; - memcpy(str, query, len+1); - char *tok_ptr=str; - + int len = strlen(query); + char * str = new char[len + 1]; + memcpy(str, query, len + 1); + char *tok_ptr = str; + /* Parse commands */ - char *command=strsep(&tok_ptr, "&"); + char *command = strsep(&tok_ptr, "&"); if (strncmp(command, "req=putslot", 11) == 0) reqPutSlot = true; else if (strncmp(command, "req=getslot", 11) == 0) @@ -84,7 +84,7 @@ void IoTQuery::decodeQuery() { /* Load Sequence Number for request */ char *sequencenumber_str = strsep(&tok_ptr, "&"); if (sequencenumber_str != NULL && - strncmp(sequencenumber_str, "seq=", 4) == 0) { + strncmp(sequencenumber_str, "seq=", 4) == 0) { sequencenumber_str = strchr(sequencenumber_str, '='); if (sequencenumber_str != NULL) { requestsequencenumber = strtoll(sequencenumber_str + 1, NULL, 10); @@ -98,7 +98,7 @@ void IoTQuery::decodeQuery() { /* Update size if we get request */ char * numqueueentries_str = tok_ptr; if (numqueueentries_str != NULL && - strncmp(numqueueentries_str, "max=", 4) == 0) { + strncmp(numqueueentries_str, "max=", 4) == 0) { numqueueentries_str = strchr(numqueueentries_str, '=') + 1; numqueueentries = strtoll(numqueueentries_str, NULL, 10); } @@ -111,9 +111,9 @@ void IoTQuery::decodeQuery() { */ void doWrite(int fd, char *data, long long length) { - long long offset=0; + long long offset = 0; do { - long long byteswritten=write(fd, &data[offset], length); + long long byteswritten = write(fd, &data[offset], length); if (byteswritten > 0) { length -= byteswritten; offset += byteswritten; @@ -124,21 +124,21 @@ void doWrite(int fd, char *data, long long length) { } return; } - } while(length != 0); + } while (length != 0); } /** Helper function to read data from file. */ bool doRead(int fd, void *buf, int numbytes) { - int offset=0; - char *ptr=(char *)buf; + int offset = 0; + char *ptr = (char *)buf; do { - int bytesread=read(fd, ptr+offset, numbytes); + int bytesread = read(fd, ptr + offset, numbytes); if (bytesread > 0) { offset += bytesread; numbytes -= bytesread; } else return false; - } while (numbytes!=0); + } while (numbytes != 0); return true; } @@ -147,50 +147,50 @@ bool doRead(int fd, void *buf, int numbytes) { */ void IoTQuery::getSlot() { - int numrequeststosend = (int)((newestentry-requestsequencenumber)+1); + int numrequeststosend = (int)((newestentry - requestsequencenumber) + 1); if (numrequeststosend < 0) numrequeststosend = 0; long long numbytes = 0; int filesizes[numrequeststosend]; int fdarray[numrequeststosend]; - int index=0; - for(long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) { + int index = 0; + for (long long seqn = requestsequencenumber; seqn <= newestentry; seqn++, index++) { struct stat st; - char *filename=getSlotFileName(seqn); + char *filename = getSlotFileName(seqn); if (stat(filename, &st) == 0) { - fdarray[index]=open(filename, O_RDONLY); - filesizes[index]=st.st_size; - numbytes+=filesizes[index]; + fdarray[index] = open(filename, O_RDONLY); + filesizes[index] = st.st_size; + numbytes += filesizes[index]; } else { - fdarray[index]=-1; - filesizes[index]=0; + fdarray[index] = -1; + filesizes[index] = 0; } delete filename; } - const char header[]="getslot"; + const char header[] = "getslot"; /* Size is the header + the payload + space for number of requests plus sizes of each slot */ - long long size=sizeof(header)-1+sizeof(numrequeststosend)+4*numrequeststosend+numbytes; + long long size = sizeof(header) - 1 + sizeof(numrequeststosend) + 4 * numrequeststosend + numbytes; char * response = new char[size]; - long long offset=0; - memcpy(response, header, sizeof(header)-1); - offset+=sizeof(header)-1; - int numreq=htonl(numrequeststosend); + long long offset = 0; + memcpy(response, header, sizeof(header) - 1); + offset += sizeof(header) - 1; + int numreq = htonl(numrequeststosend); memcpy(response + offset, &numreq, sizeof(numreq)); - offset+=sizeof(numrequeststosend); - for(int i=0; i=0) { - doRead(fdarray[i], response+offset, filesizes[i]); - offset+=filesizes[i]; + for (int i = 0; i < numrequeststosend; i++) { + if (fdarray[i] >= 0) { + doRead(fdarray[i], response + offset, filesizes[i]); + offset += filesizes[i]; } } @@ -199,7 +199,7 @@ void IoTQuery::getSlot() { /* Delete the response buffer and close the files. */ delete response; - for(int i=0; i= 0) close(fdarray[i]); } @@ -212,12 +212,27 @@ void IoTQuery::getSlot() { void IoTQuery::setSalt() { /* Write the slot data we received to a SLOT file */ char *filename = getSaltFileName(); - int saltfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); - doWrite(saltfd, data, length); - char response[0]; - sendResponse(response, 0); - close(saltfd); + char * response = new char[1]; + + if (access(filename, F_OK) == 0) + { + /* Already Exists */ + response[0] = 1; + } + else + { + /* Does not exist so create it */ + int saltfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR); + doWrite(saltfd, data, length); + close(saltfd); + response[0] = 0; + } + + + sendResponse(response, 1); + delete filename; + delete response; } /** @@ -230,7 +245,7 @@ void IoTQuery::getSalt() { int filesize = 0; struct stat st; if (stat(filename, &st) == 0) { - filesize=st.st_size; + filesize = st.st_size; } else { delete filename; return; @@ -238,8 +253,8 @@ void IoTQuery::getSalt() { int saltfd = open(filename, O_RDONLY); int responsesize = filesize + sizeof(int); char * response = new char[responsesize]; - doRead(saltfd, response+ sizeof(int), filesize); - int n_filesize=htonl(filesize); + doRead(saltfd, response + sizeof(int), filesize); + int n_filesize = htonl(filesize); *((int*) response) = n_filesize; sendResponse(response, responsesize); close(saltfd); @@ -254,20 +269,20 @@ void IoTQuery::getSalt() { void IoTQuery::putSlot() { /* Check if the request is stale and send update in that case. This servers as an implicit failure of the request. */ - if (requestsequencenumber!=(newestentry+1)) { + if (requestsequencenumber != (newestentry + 1)) { getSlot(); return; } /* See if we have too many slots and if so, delete the old one */ - int numberofliveslots=(int) ((newestentry-oldestentry)+1); + int numberofliveslots = (int) ((newestentry - oldestentry) + 1); if (numberofliveslots >= numqueueentries) { removeOldestSlot(); } /* Write the slot data we received to a SLOT file */ char *filename = getSlotFileName(requestsequencenumber); - int slotfd = open(filename, O_CREAT|O_WRONLY, S_IRUSR| S_IWUSR); + int slotfd = open(filename, O_CREAT | O_WRONLY, S_IRUSR | S_IWUSR); doWrite(slotfd, data, length); close(slotfd); delete filename; @@ -277,8 +292,8 @@ void IoTQuery::putSlot() { updateStatusFile(); /* Send response acknowledging success */ - char command[]="putslot"; - sendResponse(command, sizeof(command)-1); + char command[] = "putslot"; + sendResponse(command, sizeof(command) - 1); } /** @@ -288,8 +303,8 @@ void IoTQuery::putSlot() { void IoTQuery::sendResponse(char * bytes, int len) { cout << "Accept-Ranges: bytes\r\n" - << "Content-Length: " << len << "\r\n" - << "\r\n"; + << "Content-Length: " << len << "\r\n" + << "\r\n"; cout.write(bytes, len); } @@ -298,14 +313,14 @@ void IoTQuery::sendResponse(char * bytes, int len) { */ char * IoTQuery::getSlotFileName(long long seqnum) { - int directorylen=strlen(directory); + int directorylen = strlen(directory); /* Size is 19 digits for ASCII representation of a long + 4 characters for SLOT string + 1 character for null termination + directory size*/ - char * filename=new char[25+directorylen]; - snprintf(filename, 25+directorylen, "%s/SLOT%lld", directory, seqnum); + char * filename = new char[25 + directorylen]; + snprintf(filename, 25 + directorylen, "%s/SLOT%lld", directory, seqnum); return filename; } @@ -314,13 +329,13 @@ char * IoTQuery::getSlotFileName(long long seqnum) { */ char * IoTQuery::getSaltFileName() { - int directorylen=strlen(directory); + int directorylen = strlen(directory); /* Size is 4 characters for SALT string + 1 character for null termination + directory size*/ - char * filename=new char[6+directorylen]; - snprintf(filename, 6+directorylen, "%s/SALT", directory); + char * filename = new char[6 + directorylen]; + snprintf(filename, 6 + directorylen, "%s/SALT", directory); return filename; } @@ -329,8 +344,8 @@ char * IoTQuery::getSaltFileName() { */ void IoTQuery::removeOldestSlot() { - if (oldestentry!=0) { - char * filename=getSlotFileName(oldestentry); + if (oldestentry != 0) { + char * filename = getSlotFileName(oldestentry); unlink(filename); delete filename; } @@ -344,7 +359,11 @@ void IoTQuery::removeOldestSlot() { void IoTQuery::processQuery() { getQuery(); getDirectory(); - readData(); + if (!readData()) + { + return; + } + /* Verify that we receive a post request. */ if (strncmp(method, "POST", 4) != 0) { @@ -354,7 +373,7 @@ void IoTQuery::processQuery() { /* Make sure the directory is okay. */ if (directory == NULL || - !checkDirectory()) { + !checkDirectory()) { cerr << "Directory " << directory << " does not exist" << endl; return; } @@ -371,7 +390,7 @@ void IoTQuery::processQuery() { /* Decode query. */ decodeQuery(); - + /* Handle request. */ if (reqGetSlot) getSlot(); @@ -392,16 +411,26 @@ void IoTQuery::processQuery() { * inserted. */ -void IoTQuery::readData() { +bool IoTQuery::readData() { if (length) { - data = new char[length+1]; - memset(data, 0, length+1); + data = new char[length + 1]; + memset(data, 0, length + 1); cin.read(data, length); } do { char dummy; cin >> dummy; } while (!cin.eof()); + + if (length) + { + if (cin.fail()) + { + return false; + } + } + + return true; } @@ -418,9 +447,9 @@ void IoTQuery::getQuery() { /** We require the content-length header to be sent. */ char * reqlength = FCGX_GetParam(length_str, request->envp); if (reqlength) { - length=strtoll(reqlength, NULL, 10); + length = strtoll(reqlength, NULL, 10); } else { - length=0; + length = 0; } } @@ -432,13 +461,13 @@ void IoTQuery::getDirectory() { char * split = strchr((char *)uri, '?'); if (split == NULL) return; - int split_len = (int) (split-uri); + int split_len = (int) (split - uri); int rootdir_len = strlen(iotcloudroot); int directory_len = split_len + rootdir_len + 1; directory = new char[directory_len]; memcpy(directory, iotcloudroot, rootdir_len); memcpy(directory + rootdir_len, uri, split_len); - directory[directory_len-1]=0; + directory[directory_len - 1] = 0; } /** @@ -447,13 +476,13 @@ void IoTQuery::getDirectory() { int doread(int fd, void *ptr, size_t count, off_t offset) { do { - size_t bytesread=pread(fd, ptr, count, offset); - if (bytesread==count) { + size_t bytesread = pread(fd, ptr, count, offset); + if (bytesread == count) { return 1; - } else if (bytesread==0) { + } else if (bytesread == 0) { return 0; } - } while(1); + } while (1); } @@ -473,15 +502,15 @@ void IoTQuery::updateStatusFile() { */ bool IoTQuery::openStatusFile() { - char statusfile[]="queuestatus"; - int len=strlen(directory); + char statusfile[] = "queuestatus"; + int len = strlen(directory); - char * filename=new char[len+sizeof(statusfile)+2]; + char * filename = new char[len + sizeof(statusfile) + 2]; memcpy(filename, directory, len); - filename[len]='/'; - memcpy(filename+len+1, statusfile, sizeof(statusfile)); - filename[len+sizeof(statusfile)+1]=0; - fd=open(filename, O_CREAT| O_RDWR, S_IRUSR| S_IWUSR); + filename[len] = '/'; + memcpy(filename + len + 1, statusfile, sizeof(statusfile)); + filename[len + sizeof(statusfile) + 1] = 0; + fd = open(filename, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); delete filename; if (fd < 0) { @@ -491,22 +520,22 @@ bool IoTQuery::openStatusFile() { /* Read in queue size, oldest sequence number, and newest sequence number. */ int size; - int needwrite=0; + int needwrite = 0; if (doread(fd, &size, sizeof(size), OFFSET_MAX)) - numqueueentries=size; + numqueueentries = size; else - needwrite=1; + needwrite = 1; long long entry; if (doread(fd, &entry, sizeof(entry), OFFSET_OLD)) - oldestentry=entry; + oldestentry = entry; else - needwrite=1; + needwrite = 1; if (doread(fd, &entry, sizeof(entry), OFFSET_NEW)) - newestentry=entry; + newestentry = entry; else - needwrite=1; + needwrite = 1; if (needwrite) updateStatusFile(); diff --git a/version2/src/server/iotquery.h b/version2/src/server/iotquery.h index ae39366..6ac5113 100644 --- a/version2/src/server/iotquery.h +++ b/version2/src/server/iotquery.h @@ -19,7 +19,7 @@ private: void sendResponse(char *data, int length); void getQuery(); void getDirectory(); - void readData(); + bool readData(); bool checkDirectory(); bool openStatusFile(); void updateStatusFile(); -- 2.34.1