import javax.crypto.spec.*;
import javax.crypto.*;
-public class Table {
- int numslots;
- HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
- HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
- SlotBuffer buffer;
- CloudComm cloud;
+final public class Table {
+ private int numslots;
+ private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
+ private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
+ private SlotBuffer buffer;
+ private CloudComm cloud;
private Mac hmac;
- long sequencenumber;
- long localmachineid;
-
+ private long sequencenumber;
+ private long localmachineid;
+ private TableStatus lastTableStatus;
+ static final int FREE_SLOTS = 10;
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
}
public IoTString put(IoTString key, IoTString value) {
- return null;
+ Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+
+ if ((numslots - buffer.size()) < FREE_SLOTS) {
+ //have to check whether we have enough free slots
+ long seqn = buffer.getNewestSeqNum() + 1 - numslots;
+ for(int i=0; i < FREE_SLOTS; i++, seqn--) {
+ Slot prevslot=buffer.getSlot(seqn);
+ if (!prevslot.isLive())
+ continue;
+
+ }
+ }
+
+
+ return null;
}
- void validateandupdate(Slot[] newslots) {
+ private void validateandupdate(Slot[] newslots) {
//The cloud communication layer has checked slot HMACs already
//before decoding
if (newslots.length==0)
SlotIndexer indexer = new SlotIndexer(newslots, buffer);
checkHMACChain(indexer, newslots);
- for(Slot slot: newslots) {
+
+ initExpectedSize();
+ for(Slot slot: newslots) {
+ updateExpectedSize();
processSlot(indexer, slot);
}
+ checkNumSlots(newslots.length);
+ commitNewMaxSize();
+ //commit new to slots
+ for(Slot slot:newslots) {
+ buffer.putSlot(slot);
+ }
}
- void processEntry(KeyValue entry, SlotIndexer indexer) {
+ private int expectedsize, currmaxsize;
+
+ private void checkNumSlots(int numslots) {
+ if (numslots != expectedsize)
+ throw new Error("Server Error: Server did not send all slots");
+ }
+
+ private void initExpectedSize() {
+ expectedsize = (sequencenumber < ((long) numslots)) ? (int) sequencenumber : numslots;
+ currmaxsize = numslots;
+ }
+
+ private void updateExpectedSize() {
+ expectedsize++;
+ if (expectedsize > currmaxsize)
+ expectedsize = currmaxsize;
+ }
+
+ private void updateCurrMaxSize(int newmaxsize) {
+ currmaxsize=newmaxsize;
+ }
+
+ private void commitNewMaxSize() {
+ if (numslots != currmaxsize)
+ buffer.resize(currmaxsize);
+
+ numslots=currmaxsize;
+ }
+
+ private void processEntry(KeyValue entry, SlotIndexer indexer) {
IoTString key=entry.getKey();
KeyValue oldvalue=table.get(key);
if (oldvalue != null) {
- oldvalue.decrementLiveCount();
+ oldvalue.setDead();
}
table.put(key, entry);
}
- void processEntry(LastMessage entry, SlotIndexer indexer) {
+ private void processEntry(LastMessage entry, SlotIndexer indexer) {
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
}
- void processEntry(RejectedMessage entry, SlotIndexer indexer) {
+ private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
long oldseqnum=entry.getOldSeqNum();
long newseqnum=entry.getNewSeqNum();
boolean isequal=entry.getEqual();
}
}
- void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
-
+ private void processEntry(TableStatus entry, SlotIndexer indexer) {
+ int newnumslots=entry.getMaxSlots();
+ updateCurrMaxSize(newnumslots);
+ if (lastTableStatus != null)
+ lastTableStatus.setDead();
+ lastTableStatus = entry;
}
- void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
+ private void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
long lastmsgseqnum = lastmsgentry.getFirst();
Liveness lastentry = lastmsgentry.getSecond();
if (lastentry instanceof LastMessage) {
- ((LastMessage)lastentry).decrementLiveCount();
+ ((LastMessage)lastentry).setDead();
} else if (lastentry instanceof Slot) {
- ((Slot)lastentry).decrementLiveCount();
+ ((Slot)lastentry).setDead();
} else {
throw new Error("Unrecognized type");
}
throw new Error("Server Error: Mismatch on local machine sequence number");
} else {
if (lastmsgseqnum > seqnum)
- throw new Error("Server Error: Rolback on remote machine sequence number");
+ throw new Error("Server Error: Rollback on remote machine sequence number");
}
}
- void processSlot(SlotIndexer indexer, Slot slot) {
+ private void processSlot(SlotIndexer indexer, Slot slot) {
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
-
+
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue:
break;
case Entry.TypeTableStatus:
- processEntry((TableStatus)entry, indexer, slot);
+ processEntry((TableStatus)entry, indexer);
break;
default:
}
}
- void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
+ private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
for(int i=0; i < newslots.length; i++) {
Slot currslot=newslots[i];
Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);