From b4be52a8821c74d7d9dae03a0e5d655c1bba9bda Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Thu, 2 Feb 2017 16:36:48 -0800 Subject: [PATCH] Fixed Rejected Messages, Calculating correct size --- .../src/java/iotcloud/RejectedMessage.java | 20 +++- version2/src/java/iotcloud/Table.java | 94 ++++++++++--------- version2/src/java/iotcloud/Test.java | 2 +- 3 files changed, 65 insertions(+), 51 deletions(-) diff --git a/version2/src/java/iotcloud/RejectedMessage.java b/version2/src/java/iotcloud/RejectedMessage.java index 9c84f18..741f92f 100644 --- a/version2/src/java/iotcloud/RejectedMessage.java +++ b/version2/src/java/iotcloud/RejectedMessage.java @@ -12,6 +12,10 @@ import java.util.HashSet; class RejectedMessage extends Entry { + /* Sequence number */ + private long sequencenum; + + /* Machine identifier */ private long machineid; /* Oldest sequence number in range */ @@ -24,8 +28,9 @@ class RejectedMessage extends Entry { /* Set of machines that have not received notification. */ private HashSet watchset; - RejectedMessage(Slot slot, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) { + RejectedMessage(Slot slot, long _sequencenum, long _machineid, long _oldseqnum, long _newseqnum, boolean _equalto) { super(slot); + sequencenum = _sequencenum; machineid=_machineid; oldseqnum=_oldseqnum; newseqnum=_newseqnum; @@ -48,12 +53,18 @@ class RejectedMessage extends Entry { return machineid; } + + long getSequenceNumber() { + return sequencenum; + } + static Entry decode(Slot slot, ByteBuffer bb) { + long sequencenum=bb.getLong(); long machineid=bb.getLong(); long oldseqnum=bb.getLong(); long newseqnum=bb.getLong(); byte equalto=bb.get(); - return new RejectedMessage(slot, machineid, oldseqnum, newseqnum, equalto==1); + return new RejectedMessage(slot,sequencenum, machineid, oldseqnum, newseqnum, equalto==1); } void setWatchSet(HashSet _watchset) { @@ -68,6 +79,7 @@ class RejectedMessage extends Entry { void encode(ByteBuffer bb) { bb.put(Entry.TypeRejectedMessage); + bb.putLong(sequencenum); bb.putLong(machineid); bb.putLong(oldseqnum); bb.putLong(newseqnum); @@ -75,7 +87,7 @@ class RejectedMessage extends Entry { } int getSize() { - return 3*Long.BYTES + 2*Byte.BYTES; + return 4*Long.BYTES + 2*Byte.BYTES; } byte getType() { @@ -83,6 +95,6 @@ class RejectedMessage extends Entry { } Entry getCopy(Slot s) { - return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto); + return new RejectedMessage(s,sequencenum, machineid, oldseqnum, newseqnum, equalto); } } diff --git a/version2/src/java/iotcloud/Table.java b/version2/src/java/iotcloud/Table.java index e0d26cd..f29b540 100644 --- a/version2/src/java/iotcloud/Table.java +++ b/version2/src/java/iotcloud/Table.java @@ -20,8 +20,7 @@ import java.nio.ByteBuffer; */ final public class Table { - - + /* Constants */ static final int FREE_SLOTS = 10; // Number of slots that should be kept free static final int SKIP_THRESHOLD = 10; @@ -45,14 +44,17 @@ final public class Table { private long oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry private long localMachineId = 0; // Machine ID of this client device private long sequenceNumber = 0; // Largest sequence number a client has received - private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server - private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server + // private int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server + // private int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server private long localTransactionSequenceNumber = 0; // Local sequence number counter for transactions private long lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on private long oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on private long localArbitrationSequenceNumber = 0; private boolean hadPartialSendToServer = false; private boolean attemptedToSendToServer = false; + private long expectedsize; + private boolean didFindTableStatus = false; + private long currMaxSize = 0; /* Data Structures */ private Map committedKeyValueTable = null; // Table of committed key value pairs @@ -1029,7 +1031,7 @@ final public class Table { long old_seqn = rejectedSlotList.firstElement(); if (rejectedSlotList.size() > REJECTED_THRESHOLD) { long new_seqn = rejectedSlotList.lastElement(); - RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, new_seqn, false); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, new_seqn, false); s.addEntry(rm); } else { long prev_seqn = -1; @@ -1044,7 +1046,7 @@ final public class Table { } /* Generate rejected message entry for missing messages */ if (prev_seqn != -1) { - RejectedMessage rm = new RejectedMessage(s, localMachineId, old_seqn, prev_seqn, false); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), localMachineId, old_seqn, prev_seqn, false); s.addEntry(rm); } /* Generate rejected message entries for present messages */ @@ -1052,7 +1054,7 @@ final public class Table { long curr_seqn = rejectedSlotList.get(i); Slot s_msg = buffer.getSlot(curr_seqn); long machineid = s_msg.getMachineID(); - RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); + RejectedMessage rm = new RejectedMessage(s, s.getSequenceNumber(), machineid, curr_seqn, curr_seqn, true); s.addEntry(rm); } } @@ -1148,11 +1150,6 @@ final public class Table { return; } - // Reset the table status declared sizes - smallestTableStatusSeen = -1; - largestTableStatusSeen = -1; - - // Make sure all slots are newer than the last largest slot this client has seen long firstSeqNum = newSlots[0].getSequenceNumber(); if (firstSeqNum <= sequenceNumber) { @@ -1172,6 +1169,7 @@ final public class Table { // Process each slots data for (Slot slot : newSlots) { processSlot(indexer, slot, acceptUpdatesToLocal, machineSet); + updateExpectedSize(); } // If there is a gap, check to see if the server sent us everything. @@ -1243,6 +1241,26 @@ final public class Table { updatePendingTransactionSpeculativeTable(didCommitOrSpeculate); } + + + private void initExpectedSize(long firstSequenceNumber, long numberOfSlots) { + if (didFindTableStatus) { + return; + } + long prevslots = firstSequenceNumber; + expectedsize = (prevslots < ((long) numberOfSlots)) ? (int) prevslots : numberOfSlots; + currMaxSize = numberOfSlots; + } + + private void updateExpectedSize() { + expectedsize++; + if (expectedsize > currMaxSize) + { + expectedsize = currMaxSize; + } + } + + /** * Check the size of the block chain to make sure there are enough slots sent back by the server. * This is only called when we have a gap between the slots that we have locally and the slots @@ -1250,41 +1268,29 @@ final public class Table { * status message */ private void checkNumSlots(int numberOfSlots) { - - // We only have 1 size so we must have this many slots - if (largestTableStatusSeen == smallestTableStatusSeen) { - if (numberOfSlots != smallestTableStatusSeen) { - throw new Error("Server Error: Server did not send all slots. Expected: " + smallestTableStatusSeen + " Received:" + numberOfSlots); - } - } else { - // We have more than 1 - if (numberOfSlots < smallestTableStatusSeen) { - throw new Error("Server Error: Server did not send all slots. Expected at least: " + smallestTableStatusSeen + " Received:" + numberOfSlots); - } + if (numberOfSlots != expectedsize) { + throw new Error("Server Error: Server did not send all slots. Expected: " + expectedsize + " Received:" + numberOfSlots); } } + private void updateCurrMaxSize(int newmaxsize) { + currMaxSize=newmaxsize; + } + + /** * Update the size of of the local buffer if it is needed. */ private void commitNewMaxSize() { - - int currMaxSize = 0; - - if (largestTableStatusSeen == -1) { - // No table status seen so the current max size does not change - currMaxSize = numberOfSlots; - } else { - currMaxSize = largestTableStatusSeen; - } + didFindTableStatus = false; // Resize the local slot buffer if (numberOfSlots != currMaxSize) { - buffer.resize(currMaxSize); + buffer.resize((int)currMaxSize); } // Change the number of local slots to the new size - numberOfSlots = currMaxSize; + numberOfSlots = (int)currMaxSize; // Recalculate the resize threshold since the size of the local buffer has changed setResizeThreshold(); @@ -2014,7 +2020,7 @@ final public class Table { break; case Entry.TypeTableStatus: - processEntry((TableStatus)entry); + processEntry((TableStatus)entry, slot.getSequenceNumber()); break; default: @@ -2052,8 +2058,11 @@ final public class Table { * keeps track of the largest and smallest table status seen in this current round * of updating the local copy of the block chain */ - private void processEntry(TableStatus entry) { + private void processEntry(TableStatus entry, long seq) { int newNumSlots = entry.getMaxSlots(); + updateCurrMaxSize(newNumSlots); + + initExpectedSize(seq, newNumSlots); if (liveTableStatus != null) { // We have a larger table status so the old table status is no longer alive @@ -2062,14 +2071,6 @@ final public class Table { // Make this new table status the latest alive table status liveTableStatus = entry; - - if ((smallestTableStatusSeen == -1) || (newNumSlots < smallestTableStatusSeen)) { - smallestTableStatusSeen = newNumSlots; - } - - if ((largestTableStatusSeen == -1) || (newNumSlots > largestTableStatusSeen)) { - largestTableStatusSeen = newNumSlots; - } } /** @@ -2080,6 +2081,7 @@ final public class Table { long newSeqNum = entry.getNewSeqNum(); boolean isequal = entry.getEqual(); long machineId = entry.getMachineID(); + long seq = entry.getSequenceNumber(); // Check if we have messages that were supposed to be rejected in our local block chain @@ -2115,7 +2117,7 @@ final public class Table { Pair lastMessageValue = lastMessageEntry.getValue(); long entrySequenceNumber = lastMessageValue.getFirst(); - if (entrySequenceNumber < newSeqNum) { + if (entrySequenceNumber < seq) { // Add this rejected message to the set of messages that this machine ID did not see yet addWatchList(lastMessageEntryMachineId, entry); @@ -2276,7 +2278,7 @@ final public class Table { RejectedMessage rm = rmit.next(); // If this machine Id has seen this rejected message... - if (rm.getNewSeqNum() <= seqNum) { + if (rm.getSequenceNumber() <= seqNum) { // Remove it from our watchlist rmit.remove(); diff --git a/version2/src/java/iotcloud/Test.java b/version2/src/java/iotcloud/Test.java index d659862..bfe0f87 100644 --- a/version2/src/java/iotcloud/Test.java +++ b/version2/src/java/iotcloud/Test.java @@ -11,7 +11,7 @@ import java.util.ArrayList; public class Test { - public static final int NUMBER_OF_TESTS = 10; + public static final int NUMBER_OF_TESTS = 1000; public static void main(String[] args) throws ServerException { if (args[0].equals("2")) { -- 2.34.1