public void addKV(IoTString key, IoTString value) {
// Make sure new key value pair matches the current arbitrator
- if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) {
+ if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) {
// TODO: Maybe not throw and error
throw new Error("Not all Key Values match");
}
pendingTransBuild.addKV(kv);
}
-
- // TODo: FIx Guard
- public void addGuard(IoTString key, IoTString value) {
- KeyValue kv = new KeyValue(key, value);
- pendingTransBuild.addKV(kv);
+ public void addGuard(Guard guard) {
+ pendingTransBuild.addGuard(guard);
}
public void update() {
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, false);
+
+ if (uncommittedTransactionsList.size() > 0) {
+ List<Transaction> uncommittedTransArb = new LinkedList<Transaction>();
+
+ for (Transaction ut : uncommittedTransactionsList) {
+ KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]);
+ long arb = arbitratorTable.get(kv.getKey());
+
+ if (arb == localmachineid) {
+ uncommittedTransArb.add(ut);
+ }
+ }
+
+
+ boolean needResize = false;
+ while (uncommittedTransArb.size() > 0) {
+ boolean resize = needResize;
+ needResize = false;
+
+ Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
+ int newsize = 0;
+ if (liveslotcount > resizethreshold) {
+ resize = true; //Resize is forced
+ }
+
+ if (resize) {
+ newsize = (int) (numslots * RESIZE_MULTIPLE);
+ TableStatus status = new TableStatus(s, newsize);
+ s.addEntry(status);
+ }
+
+ if (! rejectedmessagelist.isEmpty()) {
+ /* TODO: We should avoid generating a rejected message entry if
+ * there is already a sufficient entry in the queue (e.g.,
+ * equalsto value of true and same sequence number). */
+
+ long old_seqn = rejectedmessagelist.firstElement();
+ if (rejectedmessagelist.size() > REJECTED_THRESHOLD) {
+ long new_seqn = rejectedmessagelist.lastElement();
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false);
+ s.addEntry(rm);
+ } else {
+ long prev_seqn = -1;
+ int i = 0;
+ /* Go through list of missing messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ if (s_msg != null)
+ break;
+ prev_seqn = curr_seqn;
+ }
+ /* Generate rejected message entry for missing messages */
+ if (prev_seqn != -1) {
+ RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false);
+ s.addEntry(rm);
+ }
+ /* Generate rejected message entries for present messages */
+ for (; i < rejectedmessagelist.size(); i++) {
+ long curr_seqn = rejectedmessagelist.get(i);
+ Slot s_msg = buffer.getSlot(curr_seqn);
+ long machineid = s_msg.getMachineID();
+ RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true);
+ s.addEntry(rm);
+ }
+ }
+ }
+
+ long newestseqnum = buffer.getNewestSeqNum();
+ long oldestseqnum = buffer.getOldestSeqNum();
+ if (lastliveslotseqn < oldestseqnum) {
+ lastliveslotseqn = oldestseqnum;
+ }
+
+ long seqn = lastliveslotseqn;
+ boolean seenliveslot = false;
+ long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full
+ long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point
+
+ boolean tryAgain = false;
+
+ // Mandatory Rescue
+ for (; seqn < threshold; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (! seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (! prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry)) {
+ s.addEntry(liveentry);
+ } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue
+ if (!resize) {
+ System.out.print("B"); //?
+ tryAgain = true;
+ needResize = true;
+ }
+ }
+ }
+ }
+
+ if (tryAgain) {
+ continue;
+ }
+
+ // Arbitrate
+ Map speculativeTableTmp = new HashMap<IoTString, KeyValue>(commitedTable);
+ for (Iterator<Transaction> i = uncommittedTransArb.iterator(); i.hasNext();) {
+ Transaction ut = i.next();
+
+ KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0];
+ // Check if this machine arbitrates for this transaction
+ if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) {
+ continue;
+ }
+
+ Entry newEntry = null;
+
+ try {
+ if ( ut.getGuard().evaluate(new HashSet<KeyValue>(speculativeTableTmp.values()))) {
+ // Guard evaluated as true
+
+ // update the local tmp current key set
+ for (KeyValue kv : ut.getkeyValueUpdateSet()) {
+ speculativeTableTmp.put(kv.getKey(), kv);
+ }
+
+ // create the commit
+ newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet());
+ } else {
+ // Guard was false
+
+ // create the abort
+ newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID());
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if ((newEntry != null) && s.hasSpace(newEntry)) {
+ s.addEntry(newEntry);
+ i.remove();
+
+ } else {
+ break;
+ }
+ }
+
+ /* now go through live entries from least to greatest sequence number until
+ * either all live slots added, or the slot doesn't have enough room
+ * for SKIP_THRESHOLD consecutive entries*/
+ int skipcount = 0;
+ search:
+ for (; seqn <= newestseqnum; seqn++) {
+ Slot prevslot = buffer.getSlot(seqn);
+ //Push slot number forward
+ if (!seenliveslot)
+ lastliveslotseqn = seqn;
+
+ if (!prevslot.isLive())
+ continue;
+ seenliveslot = true;
+ Vector<Entry> liveentries = prevslot.getLiveEntries(resize);
+ for (Entry liveentry : liveentries) {
+ if (s.hasSpace(liveentry))
+ s.addEntry(liveentry);
+ else {
+ skipcount++;
+ if (skipcount > SKIP_THRESHOLD)
+ break search;
+ }
+ }
+ }
+
+ int max = 0;
+ if (resize)
+ max = newsize;
+ Slot[] array = cloud.putSlot(s, max);
+ if (array == null) {
+ array = new Slot[] {s};
+ rejectedmessagelist.clear();
+ } else {
+ if (array.length == 0)
+ throw new Error("Server Error: Did not send any slots");
+ rejectedmessagelist.add(s.getSequenceNumber());
+ }
+
+ /* update data structure */
+ validateandupdate(array, true);
+ }
+
+
+ }
}
public boolean createNewKey(IoTString keyName, long machineId) {
public class Test {
public static void main(String[] args) {
- // if(args[0].equals("2"))
- // test2();
- // else if(args[0].equals("3"))
- // test3();
- // else if(args[0].equals("4"))
- // test4();
- // else if(args[0].equals("5"))
- // test5();
-
+ if (args[0].equals("2")) {
+ test2();
+ }
}
-
-
- // static Thread buildThread(String prefix, Table t) {
- // return new Thread() {
- // public void run() {
- // for(int i=0; i<10000; i++) {
- // String a=prefix+i;
- // IoTString ia=new IoTString(a);
- // t.put(ia, ia);
- // System.out.println(ia+"->"+t.get(ia));
- // }
- // }
- // };
- // }
-
- // static void test5() {
- // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
- // t1.rebuild();
- // System.out.println(t1);
- // }
-
- // static void test4() {
- // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
- // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
- // t1.rebuild();
- // t2.rebuild();
- // Thread thr1=buildThread("p1", t1);
- // Thread thr2=buildThread("p2", t2);
- // thr1.start();
- // thr2.start();
- // try {
- // thr1.join();
- // thr2.join();
- // } catch (Exception e) {
- // e.printStackTrace();
- // }
- // }
+ static void test2() {
+ Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
+ t1.initTable();
+ Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
+ t2.update();
+
+ for (int i = 0; i < 600; i++) {
+ String a = "STR" + i;
+ String b = "ABR" + i;
+ IoTString ia = new IoTString(a);
+ IoTString ib = new IoTString(b);
+
+
+ t1.createNewKey(ia, 321);
+ t2.createNewKey(ib, 351);
+
+ t1.startTransaction();
+ t1.addKV(ia, ia);
+ t1.commitTransaction();
- // static void test3() {
- // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
- // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
- // t1.rebuild();
- // t2.rebuild();
- // for(int i=0; i<600; i++) {
- // String a="STR"+i;
- // String b="ABR"+i;
- // IoTString ia=new IoTString(a);
- // IoTString ib=new IoTString(b);
- // t1.put(ia, ia);
- // t2.put(ib, ib);
- // t1.update();
- // System.out.println(ib+"->"+t1.get(ib));
- // System.out.println(ia+"->"+t2.get(ia));
- // }
- // }
+ t2.startTransaction();
+ t2.addKV(ib, ib);
+ t2.commitTransaction();
+ }
- // static void test2() {
- // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
- // t1.initTable();
- // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
- // t2.update();
- // for(int i=0; i<600; i++) {
- // String a="STR"+i;
- // String b="ABR"+i;
- // IoTString ia=new IoTString(a);
- // IoTString ib=new IoTString(b);
- // t1.put(ia, ia);
- // t2.put(ib, ib);
- // t1.update();
- // System.out.println(ib+"->"+t1.get(ib));
- // System.out.println(ia+"->"+t2.get(ia));
- // }
- // }
+ t1.update();
+ t2.update();
+
+
+ for (int i = 0; i < 600; i++) {
+ String a = "STR" + i;
+ String b = "ABR" + i;
+ IoTString ia = new IoTString(a);
+ IoTString ib = new IoTString(b);
+
+ System.out.println(ib + "->" + t1.getCommitted(ib));
+ System.out.println(ia + "->" + t2.getCommitted(ia));
+ }
+ }
}