import java.util.HashSet;
import java.util.Arrays;
import java.util.Vector;
+import java.util.Random;
/**
* IoTTable data structure. Provides client inferface.
* @version 1.0
*/
-
final public class Table {
private int numslots;
private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
private long localmachineid;
private TableStatus lastTableStatus;
static final int FREE_SLOTS = 10;
- static final int FORCED_RESIZE_INCREMENT = 20;
-
+ static final int SKIP_THRESHOLD = 10;
+ private long liveslotcount=0;
+ private int chance;
+ static final double RESIZE_MULTIPLE = 1.2;
+ static final double RESIZE_THRESHOLD = 0.75;
+ private int resizethreshold;
+ private long lastliveslotseqn;
+ private Random random;
+
public Table(String baseurl, String password, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
- cloud=new CloudComm(baseurl, password);
+ cloud=new CloudComm(this, baseurl, password);
+ lastliveslotseqn = 1;
}
public Table(CloudComm _cloud, long _localmachineid) {
localmachineid=_localmachineid;
buffer = new SlotBuffer();
numslots = buffer.capacity();
+ setResizeThreshold();
sequencenumber = 0;
cloud=_cloud;
}
public void initTable() {
cloud.setSalt();//Set the salt
- Slot s=new Slot(1, localmachineid);
+ Slot s=new Slot(this, 1, localmachineid);
TableStatus status=new TableStatus(s, numslots);
s.addEntry(status);
Slot[] array=cloud.putSlot(s, numslots);
}
}
- private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
- Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
- long seqn = buffer.getOldestSeqNum();
+ void decrementLiveCount() {
+ liveslotcount--;
+ }
+
+ private void setResizeThreshold() {
+ int resize_lower=(int) RESIZE_THRESHOLD * numslots;
+ resizethreshold=resize_lower-1+random.nextInt(numslots-resize_lower);
+ }
+
+ private boolean tryput(IoTString key, IoTString value, boolean resize) {
+ Slot s=new Slot(this, sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize=true;
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ }
- if (forcedresize) {
- TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status=new TableStatus(s, newsize);
s.addEntry(status);
}
- if ((numslots - buffer.size()) < FREE_SLOTS) {
- /* have to check whether we have enough free slots */
- long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
- seqn = fullfirstseqn < 1?1:fullfirstseqn;
- for(int i=0; i < FREE_SLOTS; i++, seqn++) {
- Slot prevslot=buffer.getSlot(seqn);
- if (!prevslot.isLive())
- continue;
- Vector<Entry> liveentries = prevslot.getLiveEntries();
- for(Entry liveentry:liveentries) {
- if (s.hasSpace(liveentry))
- s.addEntry(liveentry);
- else if (i==0) {
- if (s.canFit(liveentry))
- s.addEntry(liveentry);
- else if (!forcedresize) {
- return tryput(key, value, true);
- }
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum)
+ lastliveslotseqn = oldestseqnum;
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots;
+ long threshold = firstiffull + FREE_SLOTS;
+
+ for(; seqn < threshold; seqn++) {
+ Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries();
+ for(Entry liveentry:liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn==firstiffull) {
+ if (!resize) {
+ return tryput(key, value, true);
}
}
}
}
+
KeyValue kv=new KeyValue(s, key, value);
boolean insertedkv=false;
if (s.hasSpace(kv)) {
insertedkv=true;
}
- long newestseqnum=buffer.getNewestSeqNum();
-search:
- for(; seqn<=newestseqnum; seqn++) {
+ int skipcount=0;
+ search:
+ for(; seqn <= newestseqnum; seqn++) {
Slot prevslot=buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
if (!prevslot.isLive())
continue;
+ seenliveslot = true;
Vector<Entry> liveentries = prevslot.getLiveEntries();
for(Entry liveentry:liveentries) {
if (s.hasSpace(liveentry))
s.addEntry(liveentry);
- else
- break search;
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
}
}
int max=0;
- if (forcedresize)
- max = numslots + FORCED_RESIZE_INCREMENT;
+ if (resize)
+ max = newsize;
Slot[] array=cloud.putSlot(s, max);
if (array == null)
array = new Slot[] {s};
/* Commit new to slots. */
for(Slot slot:newslots) {
buffer.putSlot(slot);
+ liveslotcount++;
}
sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
}
buffer.resize(currmaxsize);
numslots=currmaxsize;
+ setResizeThreshold();
}
private void processEntry(KeyValue entry, SlotIndexer indexer) {