*/
abstract byte getType();
+
+ /**
+ * Returns a copy of the Entry that can be added to a different slot.
+ */
+ abstract Entry getCopy(Slot s);
+
}
byte getType() {
return Entry.TypeKeyValue;
}
+
+ public String toString() {
+ return value.toString();
+ }
+
+ Entry getCopy(Slot s) {
+ return new KeyValue(s, key, value);
+ }
}
byte getType() {
return Entry.TypeLastMessage;
}
+
+ Entry getCopy(Slot s) {
+ return new LastMessage(s, machineid, seqnum);
+ }
}
byte getType() {
return Entry.TypeRejectedMessage;
}
+
+ Entry getCopy(Slot s) {
+ return new RejectedMessage(s, machineid, oldseqnum, newseqnum, equalto);
+ }
}
}
void addEntry(Entry e) {
+ e=e.getCopy(this);
+ entries.add(e);
+ livecount++;
+ freespace -= e.getSize();
+ }
+
+ private void addShallowEntry(Entry e) {
entries.add(e);
livecount++;
freespace -= e.getSize();
Slot slot=new Slot(seqnum, machineid, prevhmac, hmac);
for(int i=0; i<numentries; i++) {
- slot.addEntry(Entry.decode(slot, bb));
+ slot.addShallowEntry(Entry.decode(slot, bb));
}
return slot;
Vector<Entry> getLiveEntries() {
Vector<Entry> liveEntries=new Vector<Entry>();
- for(Entry entry: entries)
+ for(Entry entry: entries) {
if (entry.isLive())
liveEntries.add(entry);
-
+ }
+
if (seqnumlive)
liveEntries.add(new LastMessage(this, machineid, seqnum));
*/
void setDead() {
- decrementLiveCount();
seqnumlive=false;
+ decrementLiveCount();
}
/**
void decrementLiveCount() {
livecount--;
+ Vector<Entry> e=getLiveEntries();
}
/**
}
}
+ public String toString() {
+ return table.toString();
+ }
+
public IoTString put(IoTString key, IoTString value) {
while(true) {
KeyValue oldvalue=table.get(key);
long seqn = buffer.getOldestSeqNum();
if (forcedresize) {
- System.out.println("A");
TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
s.addEntry(status);
}
if ((numslots - buffer.size()) < FREE_SLOTS) {
/* have to check whether we have enough free slots */
- System.out.println("B");
long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
seqn = fullfirstseqn < 1?1:fullfirstseqn;
for(int i=0; i < FREE_SLOTS; i++, seqn++) {
Slot prevslot=buffer.getSlot(seqn);
- System.out.println(i);
if (!prevslot.isLive())
continue;
- System.out.println("islive");
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
- if (redundant(liveentry))
- continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else if (i==0) {
continue;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
- if (redundant(liveentry))
- continue;
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
else
return insertedkv;
}
- boolean redundant(Entry liveentry) {
- if (liveentry.getType()==Entry.TypeLastMessage) {
- LastMessage lastmsg=(LastMessage) liveentry;
- return lastmsg.getMachineID() == localmachineid;
- }
- return false;
- }
-
-
private void validateandupdate(Slot[] newslots, boolean acceptupdatestolocal) {
/* The cloud communication layer has checked slot HMACs already
before decoding */
}
}
}
-
+
+ if (machineid == localmachineid) {
+ /* Our own messages are immediately dead. */
+ if (liveness instanceof LastMessage) {
+ ((LastMessage)liveness).setDead();
+ } else if (liveness instanceof Slot) {
+ ((Slot)liveness).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
+ }
+
+
Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
if (lastmsgentry == null)
return;
long lastmsgseqnum = lastmsgentry.getFirst();
Liveness lastentry = lastmsgentry.getSecond();
- if (lastentry instanceof LastMessage) {
- ((LastMessage)lastentry).setDead();
- } else if (lastentry instanceof Slot) {
- ((Slot)lastentry).setDead();
- } else {
- throw new Error("Unrecognized type");
+ if (machineid != localmachineid) {
+ if (lastentry instanceof LastMessage) {
+ ((LastMessage)lastentry).setDead();
+ } else if (lastentry instanceof Slot) {
+ ((Slot)lastentry).setDead();
+ } else {
+ throw new Error("Unrecognized type");
+ }
}
-
+
if (machineid == localmachineid) {
if (lastmsgseqnum != seqnum && !acceptupdatestolocal)
throw new Error("Server Error: Mismatch on local machine sequence number");
private void processSlot(SlotIndexer indexer, Slot slot, boolean acceptupdatestolocal, HashSet<Long> machineSet) {
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptupdatestolocal, machineSet);
-
for(Entry entry : slot.getEntries()) {
switch(entry.getType()) {
case Entry.TypeKeyValue:
byte getType() {
return Entry.TypeTableStatus;
}
+
+ Entry getCopy(Slot s) {
+ return new TableStatus(s, maxslots);
+ }
}
public class Test {
public static void main(String[] args) {
- if (args[0].equals("1"))
- test1();
- else if(args[0].equals("2"))
+ if(args[0].equals("2"))
test2();
else if(args[0].equals("3"))
test3();
else if(args[0].equals("4"))
test4();
+ else if(args[0].equals("5"))
+ test5();
}
+
+
static Thread buildThread(String prefix, Table t) {
return new Thread() {
- public void run() {
- for(int i=0; i<600; i++) {
- String a=prefix+i;
- IoTString ia=new IoTString(a);
- t.put(ia, ia);
- System.out.println(ia+"->"+t.get(ia));
- }
- }
+ public void run() {
+ for(int i=0; i<10000; i++) {
+ String a=prefix+i;
+ IoTString ia=new IoTString(a);
+ t.put(ia, ia);
+ System.out.println(ia+"->"+t.get(ia));
+ }
+ }
};
}
-
+
+ static void test5() {
+ Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ t1.rebuild();
+ System.out.println(t1);
+ }
+
static void test4() {
Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
System.out.println(ia+"->"+t2.get(ia));
}
}
-
- static void test1() {
- TestCloudComm cc=new TestCloudComm();
- Table t1=new Table(cc, 6513);
- t1.initTable();
- Table t2=new Table(cc, 6512);
- t2.update();
- for(int i=0; i<600; i++) {
- String a="STR"+i;
- String b="ABR"+i;
- IoTString ia=new IoTString(a);
- IoTString ib=new IoTString(b);
- t1.put(ia, ia);
- t2.put(ib, ib);
- t1.update();
- System.out.println(ib+"->"+t1.get(ib));
- System.out.println(ia+"->"+t2.get(ia));
- }
- for(int i=0; i<600; i++) {
- String a="STR"+i;
- String b="ABR"+i;
- IoTString ia=new IoTString(a);
- IoTString ib=new IoTString(b);
- System.out.println(ib+"->"+t1.get(ib));
- System.out.println(ia+"->"+t2.get(ia));
- System.out.println(ib+"->"+t2.get(ib));
- System.out.println(ia+"->"+t1.get(ia));
- }
-
- }
}
+++ /dev/null
-package iotcloud;
-import java.io.*;
-import java.net.*;
-
-/**
- * This class is a test driver to test the code w/o going through an
- * actual web server.
- * @author Brian Demsky <bdemsky@uci.edu>
- * @version 1.0
- */
-
-class TestCloudComm extends CloudComm {
- SlotBuffer buffer;
-
- TestCloudComm() {
- buffer = new SlotBuffer();
- }
-
- public synchronized Slot[] putSlot(Slot slot, int max) {
- if ((buffer.size()==0 && 1 == slot.getSequenceNumber()) ||
- buffer.getNewestSeqNum()+1 == slot.getSequenceNumber()) {
- if (max!=0)
- buffer.resize(max);
- buffer.putSlot(slot);
- return null;
- } else
- return getSlots(slot.getSequenceNumber());
- }
-
- public synchronized Slot[] getSlots(long sequencenumber) {
- long newestseqnum=buffer.getNewestSeqNum();
- long oldestseqnum=buffer.getOldestSeqNum();
- if (sequencenumber < oldestseqnum)
- sequencenumber=oldestseqnum;
- int numslots=(int)((newestseqnum - sequencenumber)+1);
- Slot[] slots=new Slot[numslots];
- for(int i=0; i<numslots; i++,sequencenumber++)
- slots[i]=buffer.getSlot(sequencenumber);
- return slots;
- }
-}