$SeqN(\tuple{s, sv})=s$ \\\r
$SlotVal(\tuple{s, sv})=sv$ \\\r
$CreateLastSL(s,sv,id)=\tuple{s,sv,id}=sl_{last}$ \\\r
-$CreateEntLV(kv_s,s_s)=\tuple{kv_s,s_s}$ \\\r
$Decrypt(SK_s,sv_s)=Dat_s=\tuple{s,id,hmac_p,DE,hmac_c}$ \\\r
$GetSeqN(Dat_s = \tuple{s,id,hmac_p,DE,hmac_c})=s$ \\\r
$GetMacId(Dat_s = \tuple{s,id,hmac_p,DE,hmac_c})=id$ \\\r
\begin{algorithmic}[1]\r
\Function{GetQueSta}{$DE_s$}\r
\State $de_{qs} \gets ss$ \textit{such that} $ss \in DE_s, \r
- de_s \in D \land type(de_s) = "qs"$\r
+ de_s \in D \land type(de_s) = ``qs"$\r
\If{$de_{qs} \neq \emptyset$}\r
\State $qs_{ret} \gets GetQS(de_{qs})$\r
\Else\r
\r
\begin{algorithmic}[1]\r
\Function{GetSlotSeq}{$DE_s$}\r
-\State $DE_{ss} \gets \{de | de \in DE_s \land type(de) = "ss"\}$\r
+\State $DE_{ss} \gets \{de | de \in DE_s \land type(de) = ``ss"\}$\r
\State \Return{$DE_{ss}$}\r
\EndFunction\r
\end{algorithmic}\r
\r
\begin{algorithmic}[1]\r
\Function{GetColRes}{$DE_s$}\r
-\State $DE_{cr} \gets \{de | de \in DE_s \land type(de) = "cr"\}$\r
+\State $DE_{cr} \gets \{de | de \in DE_s \land type(de) = ``cr"\}$\r
\State \Return{$DE_{cr}$}\r
\EndFunction\r
\end{algorithmic}\r
\r
\begin{algorithmic}[1]\r
\Function{UpdateDT}{$DT_s,DE_s,LV_s,s_s$}\r
-\State $DE_{s_{kv}} \gets \{de_s | de_s \in DE_s, type(de_s) = "kv"\}$\r
+\State $DE_{s_{kv}} \gets \{de_s | de_s \in DE_s, type(de_s) = ``kv"\}$\r
\ForAll{$de_s \in DE_s$}\r
\State $kv_s \gets GetKV(de_s)$\r
\State $LV_s \gets \Call{UpdateKVLivEnt}{LV_s,kv_s,s_s}$\r
\ForAll{$de_s \in DE_s$}\r
\State $live \gets \Call{CheckLiveness}{s_s,de_s}$\r
\If{$live$}\r
- \If{$de_s = kv$}\r
+ \If{$type(de_s) = ``kv"$}\r
\State $\tuple{k_s,v_s} \gets GetKV(de_s)$\r
\State $KV \gets \Call{PutKVPair}{KV,\tuple{k_s,v_s}}$\r
- \ElsIf{$de_s = ss$}\r
+ \ElsIf{$type(de_s) = ``ss"$}\r
\State $\tuple{id_s,s_{s_{last}}} \gets GetSS(de_s)$\r
\State $SS \gets \Call{PutSSPair}{SS,\tuple{id_s,s_{s_{last}}}}$\r
- \ElsIf{$de_s = cr$}\r
+ \ElsIf{$type(de_s) = ``cr"$}\r
\State $\tuple{s_s,id_s} \gets GetCR(de_s)$\r
\State $CR \gets \Call{PutCRPair}{CR,\tuple{s_s,id_s}}$\r
- \ElsIf{$de_s = qs$}\r
+ \ElsIf{$type(de_s) = ``qs"$}\r
\State $reinsert_{qs} \gets true$\r
\EndIf\r
\EndIf\r
*/
final public class Table {
- private int numslots;
+ private int numslots; //number of slots stored in buffer
+
+ //table of key-value pairs
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+
+ // machine id -> (sequence number, Slot or LastMessage); records last message by each client
private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ // machine id -> ...
private HashMap<Long, HashSet<RejectedMessage> > watchlist = new HashMap<Long, HashSet<RejectedMessage> >();
private Vector<Long> rejectedmessagelist=new Vector<Long>();
private SlotBuffer buffer;
private CloudComm cloud;
- private long sequencenumber;
+ private long sequencenumber; //Largest sequence number a client has received
private long localmachineid;
private TableStatus lastTableStatus;
- static final int FREE_SLOTS = 10;
+ static final int FREE_SLOTS = 10; //number of slots that should be kept free
static final int SKIP_THRESHOLD = 10;
private long liveslotcount=0;
private int chance;
static final double RESIZE_THRESHOLD = 0.75;
static final int REJECTED_THRESHOLD = 5;
private int resizethreshold;
- private long lastliveslotseqn;
+ private long lastliveslotseqn; //smallest sequence number with a live entry
private Random random=new Random();
public Table(String baseurl, String password, long _localmachineid) {
liveslotcount--;
}
+
private void setResizeThreshold() {
int resize_lower=(int) (RESIZE_THRESHOLD * numslots);
resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
private boolean tryput(IoTString key, IoTString value, boolean resize) {
Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
int newsize = 0;
- if (liveslotcount > resizethreshold) {
- resize=true;
- newsize = (int) (numslots * RESIZE_MULTIPLE);
+ if (liveslotcount > resizethreshold) {
+ resize=true; //Resize is forced
}
if (resize) {
s.addEntry(status);
}
- if (!rejectedmessagelist.isEmpty()) {
+ if (! rejectedmessagelist.isEmpty()) {
/* TODO: We should avoid generating a rejected message entry if
* there is already a sufficient entry in the queue (e.g.,
* equalsto value of true and same sequence number). */
RejectedMessage rm=new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
s.addEntry(rm);
} else {
- long prev_seqn=-1;
+ long prev_seqn = -1;
int i=0;
/* Go through list of missing messages */
- for(;i<rejectedmessagelist.size();i++) {
- long curr_seqn=rejectedmessagelist.get(i);
- Slot s_msg=buffer.getSlot(curr_seqn);
- if (s_msg!=null)
+ for(; i<rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
break;
prev_seqn=curr_seqn;
}
s.addEntry(rm);
}
/* Generate rejected message entries for present messages */
- for(;i<rejectedmessagelist.size();i++) {
+ for(; i<rejectedmessagelist.size(); i++) {
long curr_seqn=rejectedmessagelist.get(i);
Slot s_msg=buffer.getSlot(curr_seqn);
long machineid=s_msg.getMachineID();
long seqn = lastliveslotseqn;
boolean seenliveslot = false;
- long firstiffull = newestseqnum + 1 - numslots;
- long threshold = firstiffull + FREE_SLOTS;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
for(; seqn < threshold; seqn++) {
Slot prevslot=buffer.getSlot(seqn);
//Push slot number forward
- if (!seenliveslot)
+ if (! seenliveslot)
lastliveslotseqn = seqn;
- if (!prevslot.isLive())
+ if (! prevslot.isLive())
continue;
seenliveslot = true;
Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
for(Entry liveentry:liveentries) {
if (s.hasSpace(liveentry)) {
s.addEntry(liveentry);
- } else if (seqn==firstiffull) {
+ } else if (seqn==firstiffull) { //if there's no space but the entry is about to fall off the queue
if (!resize) {
- System.out.print("B");
+ System.out.print("B"); //?
return tryput(key, value, true);
}
}
insertedkv=true;
}
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
int skipcount=0;
search:
for(; seqn <= newestseqnum; seqn++) {
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
/* The cloud communication layer has checked slot HMACs already
before decoding */
- if (newslots.length==0)
- return;
+ if (newslots.length==0) return;
long firstseqnum=newslots[0].getSequenceNumber();
if (firstseqnum <= sequencenumber)
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
- HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet());
+ HashSet<Long> machineSet=new HashSet<Long>(lastmessagetable.keySet()); //
initExpectedSize(firstseqnum);
for(Slot slot: newslots) {
private void initExpectedSize(long firstsequencenumber) {
long prevslots = firstsequencenumber;
- expectedsize = (prevslots < ((long) numslots))?(int) prevslots:numslots;
+ expectedsize = (prevslots < ((long) numslots))? (int) prevslots : numslots;
currmaxsize = numslots;
}
long newseqnum=entry.getNewSeqNum();
boolean isequal=entry.getEqual();
long machineid=entry.getMachineID();
- for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
+ for(long seqnum=oldseqnum; seqnum <= newseqnum; seqnum++) {
Slot slot=indexer.getSlot(seqnum);
if (slot != null) {
long slotmachineid=slot.getMachineID();
- if (isequal!=(slotmachineid==machineid)) {
+ if (isequal != (slotmachineid==machineid)) {
throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
}
}
}
HashSet<Long> watchset=new HashSet<Long>();
- for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry:lastmessagetable.entrySet()) {
+ for(Map.Entry<Long, Pair<Long,Liveness> > lastmsg_entry : lastmessagetable.entrySet()) {
long entry_mid=lastmsg_entry.getKey();
/* We've seen it, don't need to continue to watch. Our next
* message will implicitly acknowledge it. */