From da5de9bf8afb1660d2c4e573d41e999fc458dc9c Mon Sep 17 00:00:00 2001 From: Ali Younis Date: Thu, 1 Dec 2016 15:17:45 -0800 Subject: [PATCH] Changes --- src2/java/iotcloud/Table.java | 206 +++++++++++++++++++++++++++++++++- src2/java/iotcloud/Test.java | 120 +++++++------------- 2 files changed, 240 insertions(+), 86 deletions(-) diff --git a/src2/java/iotcloud/Table.java b/src2/java/iotcloud/Table.java index 1cdb3d5..00d48b0 100644 --- a/src2/java/iotcloud/Table.java +++ b/src2/java/iotcloud/Table.java @@ -178,7 +178,7 @@ final public class Table { public void addKV(IoTString key, IoTString value) { // Make sure new key value pair matches the current arbitrator - if (!pendingTransBuild.checkArbitrator( arbitratorTable.get(key))) { + if (!pendingTransBuild.checkArbitrator(arbitratorTable.get(key))) { // TODO: Maybe not throw and error throw new Error("Not all Key Values match"); } @@ -189,17 +189,211 @@ final public class Table { pendingTransBuild.addKV(kv); } - - // TODo: FIx Guard - public void addGuard(IoTString key, IoTString value) { - KeyValue kv = new KeyValue(key, value); - pendingTransBuild.addKV(kv); + public void addGuard(Guard guard) { + pendingTransBuild.addGuard(guard); } public void update() { Slot[] newslots = cloud.getSlots(sequencenumber + 1); validateandupdate(newslots, false); + + if (uncommittedTransactionsList.size() > 0) { + List uncommittedTransArb = new LinkedList(); + + for (Transaction ut : uncommittedTransactionsList) { + KeyValue kv = (KeyValue)(ut.getkeyValueUpdateSet().toArray()[0]); + long arb = arbitratorTable.get(kv.getKey()); + + if (arb == localmachineid) { + uncommittedTransArb.add(ut); + } + } + + + boolean needResize = false; + while (uncommittedTransArb.size() > 0) { + boolean resize = needResize; + needResize = false; + + Slot s = new Slot(this, sequencenumber + 1, localmachineid, buffer.getSlot(sequencenumber).getHMAC()); + int newsize = 0; + if (liveslotcount > resizethreshold) { + resize = true; //Resize is forced + } + + if (resize) { + newsize = (int) (numslots * RESIZE_MULTIPLE); + TableStatus status = new TableStatus(s, newsize); + s.addEntry(status); + } + + if (! rejectedmessagelist.isEmpty()) { + /* TODO: We should avoid generating a rejected message entry if + * there is already a sufficient entry in the queue (e.g., + * equalsto value of true and same sequence number). */ + + long old_seqn = rejectedmessagelist.firstElement(); + if (rejectedmessagelist.size() > REJECTED_THRESHOLD) { + long new_seqn = rejectedmessagelist.lastElement(); + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, new_seqn, false); + s.addEntry(rm); + } else { + long prev_seqn = -1; + int i = 0; + /* Go through list of missing messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + if (s_msg != null) + break; + prev_seqn = curr_seqn; + } + /* Generate rejected message entry for missing messages */ + if (prev_seqn != -1) { + RejectedMessage rm = new RejectedMessage(s, localmachineid, old_seqn, prev_seqn, false); + s.addEntry(rm); + } + /* Generate rejected message entries for present messages */ + for (; i < rejectedmessagelist.size(); i++) { + long curr_seqn = rejectedmessagelist.get(i); + Slot s_msg = buffer.getSlot(curr_seqn); + long machineid = s_msg.getMachineID(); + RejectedMessage rm = new RejectedMessage(s, machineid, curr_seqn, curr_seqn, true); + s.addEntry(rm); + } + } + } + + long newestseqnum = buffer.getNewestSeqNum(); + long oldestseqnum = buffer.getOldestSeqNum(); + if (lastliveslotseqn < oldestseqnum) { + lastliveslotseqn = oldestseqnum; + } + + long seqn = lastliveslotseqn; + boolean seenliveslot = false; + long firstiffull = newestseqnum + 1 - numslots; //smallest seq number in the buffer if it is full + long threshold = firstiffull + FREE_SLOTS; //we want the buffer to be clear of live entries up to this point + + boolean tryAgain = false; + + // Mandatory Rescue + for (; seqn < threshold; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (! seenliveslot) + lastliveslotseqn = seqn; + + if (! prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) { + s.addEntry(liveentry); + } else if (seqn == firstiffull) { //if there's no space but the entry is about to fall off the queue + if (!resize) { + System.out.print("B"); //? + tryAgain = true; + needResize = true; + } + } + } + } + + if (tryAgain) { + continue; + } + + // Arbitrate + Map speculativeTableTmp = new HashMap(commitedTable); + for (Iterator i = uncommittedTransArb.iterator(); i.hasNext();) { + Transaction ut = i.next(); + + KeyValue keyVal = (KeyValue)(ut.getkeyValueUpdateSet().toArray())[0]; + // Check if this machine arbitrates for this transaction + if (arbitratorTable.get( keyVal.getKey() ) != localmachineid ) { + continue; + } + + Entry newEntry = null; + + try { + if ( ut.getGuard().evaluate(new HashSet(speculativeTableTmp.values()))) { + // Guard evaluated as true + + // update the local tmp current key set + for (KeyValue kv : ut.getkeyValueUpdateSet()) { + speculativeTableTmp.put(kv.getKey(), kv); + } + + // create the commit + newEntry = new Commit(s, ut.getSequenceNumber(), ut.getkeyValueUpdateSet()); + } else { + // Guard was false + + // create the abort + newEntry = new Abort(s, ut.getSequenceNumber(), ut.getMachineID()); + } + } catch (Exception e) { + e.printStackTrace(); + } + + if ((newEntry != null) && s.hasSpace(newEntry)) { + s.addEntry(newEntry); + i.remove(); + + } else { + break; + } + } + + /* now go through live entries from least to greatest sequence number until + * either all live slots added, or the slot doesn't have enough room + * for SKIP_THRESHOLD consecutive entries*/ + int skipcount = 0; + search: + for (; seqn <= newestseqnum; seqn++) { + Slot prevslot = buffer.getSlot(seqn); + //Push slot number forward + if (!seenliveslot) + lastliveslotseqn = seqn; + + if (!prevslot.isLive()) + continue; + seenliveslot = true; + Vector liveentries = prevslot.getLiveEntries(resize); + for (Entry liveentry : liveentries) { + if (s.hasSpace(liveentry)) + s.addEntry(liveentry); + else { + skipcount++; + if (skipcount > SKIP_THRESHOLD) + break search; + } + } + } + + int max = 0; + if (resize) + max = newsize; + Slot[] array = cloud.putSlot(s, max); + if (array == null) { + array = new Slot[] {s}; + rejectedmessagelist.clear(); + } else { + if (array.length == 0) + throw new Error("Server Error: Did not send any slots"); + rejectedmessagelist.add(s.getSequenceNumber()); + } + + /* update data structure */ + validateandupdate(array, true); + } + + + } } public boolean createNewKey(IoTString keyName, long machineId) { diff --git a/src2/java/iotcloud/Test.java b/src2/java/iotcloud/Test.java index e70d88e..6ac5b06 100644 --- a/src2/java/iotcloud/Test.java +++ b/src2/java/iotcloud/Test.java @@ -8,88 +8,48 @@ package iotcloud; public class Test { public static void main(String[] args) { - // if(args[0].equals("2")) - // test2(); - // else if(args[0].equals("3")) - // test3(); - // else if(args[0].equals("4")) - // test4(); - // else if(args[0].equals("5")) - // test5(); - + if (args[0].equals("2")) { + test2(); + } } - - - // static Thread buildThread(String prefix, Table t) { - // return new Thread() { - // public void run() { - // for(int i=0; i<10000; i++) { - // String a=prefix+i; - // IoTString ia=new IoTString(a); - // t.put(ia, ia); - // System.out.println(ia+"->"+t.get(ia)); - // } - // } - // }; - // } - - // static void test5() { - // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // t1.rebuild(); - // System.out.println(t1); - // } - - // static void test4() { - // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - // t1.rebuild(); - // t2.rebuild(); - // Thread thr1=buildThread("p1", t1); - // Thread thr2=buildThread("p2", t2); - // thr1.start(); - // thr2.start(); - // try { - // thr1.join(); - // thr2.join(); - // } catch (Exception e) { - // e.printStackTrace(); - // } - // } + static void test2() { + Table t1 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); + t1.initTable(); + Table t2 = new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); + t2.update(); + + for (int i = 0; i < 600; i++) { + String a = "STR" + i; + String b = "ABR" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + + + t1.createNewKey(ia, 321); + t2.createNewKey(ib, 351); + + t1.startTransaction(); + t1.addKV(ia, ia); + t1.commitTransaction(); - // static void test3() { - // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - // t1.rebuild(); - // t2.rebuild(); - // for(int i=0; i<600; i++) { - // String a="STR"+i; - // String b="ABR"+i; - // IoTString ia=new IoTString(a); - // IoTString ib=new IoTString(b); - // t1.put(ia, ia); - // t2.put(ib, ib); - // t1.update(); - // System.out.println(ib+"->"+t1.get(ib)); - // System.out.println(ia+"->"+t2.get(ia)); - // } - // } + t2.startTransaction(); + t2.addKV(ib, ib); + t2.commitTransaction(); + } - // static void test2() { - // Table t1=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 321); - // t1.initTable(); - // Table t2=new Table("http://127.0.0.1/test.iotcloud/", "reallysecret", 351); - // t2.update(); - // for(int i=0; i<600; i++) { - // String a="STR"+i; - // String b="ABR"+i; - // IoTString ia=new IoTString(a); - // IoTString ib=new IoTString(b); - // t1.put(ia, ia); - // t2.put(ib, ib); - // t1.update(); - // System.out.println(ib+"->"+t1.get(ib)); - // System.out.println(ia+"->"+t2.get(ia)); - // } - // } + t1.update(); + t2.update(); + + + for (int i = 0; i < 600; i++) { + String a = "STR" + i; + String b = "ABR" + i; + IoTString ia = new IoTString(a); + IoTString ib = new IoTString(b); + + System.out.println(ib + "->" + t1.getCommitted(ib)); + System.out.println(ia + "->" + t2.getCommitted(ia)); + } + } } -- 2.34.1