class Slot implements Liveness {
/** Sets the slot size. */
static final int SLOT_SIZE=2048;
- /** Sets how many bytes we reserve. */
- static final int RESERVED_SPACE=64;
/** Sets the size for the HMAC. */
static final int HMAC_SIZE=32;
private boolean seqnumlive;
/** Number of bytes of free space. */
private int freespace;
+ /** Reference to Table */
+ private Table table;
- Slot(long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
+ Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac, byte[] _hmac) {
seqnum=_seqnum;
machineid=_machineid;
prevhmac=_prevhmac;
livecount=1;
seqnumlive=true;
freespace = SLOT_SIZE - getBaseSize();
+ table=_table;
}
- Slot(long _seqnum, long _machineid, byte[] _prevhmac) {
- this(_seqnum, _machineid, _prevhmac, null);
+ Slot(Table _table, long _seqnum, long _machineid, byte[] _prevhmac) {
+ this(_table, _seqnum, _machineid, _prevhmac, null);
}
- Slot(long _seqnum, long _machineid) {
- this(_seqnum, _machineid, new byte[HMAC_SIZE], null);
+ Slot(Table _table, long _seqnum, long _machineid) {
+ this(_table, _seqnum, _machineid, new byte[HMAC_SIZE], null);
}
byte[] getHMAC() {
* using its reserved space. */
boolean hasSpace(Entry e) {
- int newfreespace = freespace - e.getSize();
- return newfreespace > RESERVED_SPACE;
- }
-
- /**
- * Returns true if the slot can fit the entry potentially using the
- * reserved space. */
-
- boolean canFit(Entry e) {
int newfreespace = freespace - e.getSize();
return newfreespace >= 0;
}
return entries;
}
- static Slot decode(byte[] array, Mac mac) {
+ static Slot decode(Table table, byte[] array, Mac mac) {
mac.update(array, HMAC_SIZE, array.length-HMAC_SIZE);
byte[] realmac=mac.doFinal();
long seqnum=bb.getLong();
long machineid=bb.getLong();
int numentries=bb.getInt();
- Slot slot=new Slot(seqnum, machineid, prevhmac, hmac);
+ Slot slot=new Slot(table, seqnum, machineid, prevhmac, hmac);
for(int i=0; i<numentries; i++) {
slot.addShallowEntry(Entry.decode(slot, bb));
void decrementLiveCount() {
livecount--;
- Vector<Entry> e=getLiveEntries();
+ if (livecount==0)
+ table.decrementLiveCount();
}
/**
import java.util.HashSet;
import java.util.Arrays;
import java.util.Vector;
+import java.util.Random;
/**
* IoTTable data structure. Provides client inferface.
* @version 1.0
*/
-
final public class Table {
private int numslots;
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
private long localmachineid;
private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10;
- static final int FORCED_RESIZE_INCREMENT = 20;
-
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount=0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ private int resizethreshold;
+ private long lastliveslotseqn;
+ private Random random;
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
- cloud=new CloudComm(baseurl, password);
+ cloud=new CloudComm(this, baseurl, password);
+ lastliveslotseqn = 1;
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
cloud=_cloud;
}
public void initTable() {
cloud.setSalt();//Set the salt
- Slot s=new Slot(1, localmachineid);
+ Slot s=new Slot(this, 1, localmachineid);
TableStatus status=new TableStatus(s, numslots);
s.addEntry(status);
Slot[] array=cloud.putSlot(s, numslots);
}
}
- private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
- Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- long seqn = buffer.getOldestSeqNum();
+ void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+ private void setResizeThreshold() {
+ int resize_lower=(int) RESIZE_THRESHOLD * numslots;
+ resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean resize) {
+ Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize=true;
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ }
- if (forcedresize) {
- TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status=new TableStatus(s, newsize);
s.addEntry(status);
}
- if ((numslots - buffer.size()) < FREE_SLOTS) {
- /* have to check whether we have enough free slots */
- long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
- seqn = fullfirstseqn < 1?1:fullfirstseqn;
- for(int i=0; i < FREE_SLOTS; i++, seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- if (!prevslot.isLive())
- continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
- for(Entry liveentry:liveentries) {
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else if (i==0) {
- if (s.canFit(liveentry))
- s.addEntry(liveentry);
- else if (!forcedresize) {
- return tryput(key, value, true);
- }
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots;
+ long threshold = firstiffull + FREE_SLOTS;
+
+ for(; seqn < threshold; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn==firstiffull) {
+ if (!resize) {
+ return tryput(key, value, true);
}
}
}
}
+
KeyValue kv=new KeyValue(s, key, value);
boolean insertedkv=false;
if (s.hasSpace(kv)) {
insertedkv=true;
}
- long newestseqnum=buffer.getNewestSeqNum();
-search:
- for(; seqn<=newestseqnum; seqn++) {
+ int skipcount=0;
+ search:
+ for(; seqn <= newestseqnum; seqn++) {
Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
if (!prevslot.isLive())
continue;
+ seenliveslot = true;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
- else
- break search;
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
}
}
int max=0;
- if (forcedresize)
- max = numslots + FORCED_RESIZE_INCREMENT;
+ if (resize)
+ max = newsize;
Slot[] array=cloud.putSlot(s, max);
if (array == null)
array = new Slot[] {s};
/* Commit new to slots. */
for(Slot slot:newslots) {
buffer.putSlot(slot);
+ liveslotcount++;
}
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
buffer.resize(currmaxsize);
numslots=currmaxsize;
+ setResizeThreshold();
}
private void processEntry(KeyValue entry, SlotIndexer indexer) {