import java.util.Collection;
import java.util.Collections;
import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
/**
private Map<Long, TransactionStatus> transactionStatusMap = null;
private Map<Long, TransactionStatus> transactionStatusNotSentMap = null;
+ private Semaphore mutex = null;
+
+
public Table(String hostname, String baseurl, String password, long _localmachineid) {
localmachineid = _localmachineid;
localCommunicationChannels = new HashMap<Long, LocalComm>();
transactionStatusMap = new HashMap<Long, TransactionStatus>();
transactionStatusNotSentMap = new HashMap<Long, TransactionStatus>();
+ mutex = new Semaphore(1);
}
public void initTable() throws ServerException {
}
}
- public void rebuild() throws ServerException {
+ public void rebuild() throws ServerException, InterruptedException {
+ mutex.acquire();
Slot[] newslots = cloud.getSlots(sequencenumber + 1);
validateandupdate(newslots, true);
+ mutex.release();
}
// TODO: delete method
public void addLocalComm(long machineId, LocalComm lc) {
localCommunicationChannels.put(machineId, lc);
}
- public Long getArbitrator(IoTString key) {
- return arbitratorTable.get(key);
+ public Long getArbitrator(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+ Long arb = arbitratorTable.get(key);
+ mutex.release();
+
+ return arb;
}
- public IoTString getCommitted(IoTString key) {
+ public IoTString getCommitted(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
KeyValue kv = commitedTable.get(key);
+ mutex.release();
+
+
if (kv != null) {
return kv.getValue();
} else {
}
}
- public IoTString getSpeculative(IoTString key) {
+ public IoTString getSpeculative(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+
KeyValue kv = pendingTransSpeculativeTable.get(key);
if (kv == null) {
if (kv == null) {
kv = commitedTable.get(key);
}
+ mutex.release();
+
if (kv != null) {
return kv.getValue();
}
}
- public IoTString getCommittedAtomic(IoTString key) {
+ public IoTString getCommittedAtomic(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
+
KeyValue kv = commitedTable.get(key);
if (arbitratorTable.get(key) == null) {
throw new Error("Not all Key Values Match Arbitrator.");
}
+ mutex.release();
+
if (kv != null) {
pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
return kv.getValue();
}
}
- public IoTString getSpeculativeAtomic(IoTString key) {
+ public IoTString getSpeculativeAtomic(IoTString key) throws InterruptedException {
+
+ mutex.acquire();
if (arbitratorTable.get(key) == null) {
throw new Error("Key not Found.");
kv = commitedTable.get(key);
}
+ mutex.release();
+
if (kv != null) {
pendingTransBuild.addKVGuard(new KeyValue(key, kv.getValue()));
return kv.getValue();
}
}
- public Pair<Boolean, Boolean> update() {
+ public Pair<Boolean, Boolean> update() throws InterruptedException {
+
+ mutex.acquire();
boolean gotLatestFromServer = false;
boolean didSendLocal = false;
didSendLocal = true;
+
} catch (Exception e) {
// could not update so do nothing
}
+
+ mutex.release();
return new Pair<Boolean, Boolean>(gotLatestFromServer, didSendLocal);
}
- public Boolean updateFromLocal(long arb) {
+ public Boolean updateFromLocal(long arb) throws InterruptedException {
LocalComm lc = localCommunicationChannels.get(arb);
if (lc == null) {
// Cant talk directly to arbitrator so cant do anything
bbEncode.putLong(0);
}
+ mutex.acquire();
byte[] data = lc.sendDataToLocalDevice(arb, bbEncode.array());
// Decode the data
newCommits.add(com);
}
-
for (Commit commit : newCommits) {
// Prepare to process the commit
processEntry(commit);
didCommitOrSpeculate |= createSpeculativeTable();
createPendingTransactionSpeculativeTable(didCommitOrSpeculate);
+
+ mutex.release();
+
return true;
}
- public void startTransaction() {
+ public void startTransaction() throws InterruptedException {
// Create a new transaction, invalidates any old pending transactions.
pendingTransBuild = new PendingTransaction();
}
pendingTransBuild.addKV(kv);
}
- public TransactionStatus commitTransaction() {
+ public TransactionStatus commitTransaction() throws InterruptedException {
if (pendingTransBuild.getKVUpdates().size() == 0) {
return new TransactionStatus(TransactionStatus.StatusNoEffect, -1);
}
+ mutex.acquire();
+
TransactionStatus transStatus = null;
if (pendingTransBuild.getArbitrator() != localmachineid) {
// reset it so next time is fresh
pendingTransBuild = new PendingTransaction();
+
+ mutex.release();
return transStatus;
}
- public boolean createNewKey(IoTString keyName, long machineId) throws ServerException {
+ public boolean createNewKey(IoTString keyName, long machineId) throws ServerException, InterruptedException {
+ try {
+ mutex.acquire();
- while (true) {
- if (arbitratorTable.get(keyName) != null) {
- // There is already an arbitrator
- return false;
- }
+ while (true) {
+ if (arbitratorTable.get(keyName) != null) {
+ // There is already an arbitrator
+ mutex.release();
+ return false;
+ }
- if (tryput(keyName, machineId, false)) {
- // If successfully inserted
- return true;
+ if (tryput(keyName, machineId, false)) {
+ // If successfully inserted
+ mutex.release();
+ return true;
+ }
}
+ } catch (ServerException e) {
+ mutex.release();
+ throw e;
}
}
- private void processPendingTrans() {
+ private void processPendingTrans() throws InterruptedException {
boolean sentAllPending = false;
try {
}
}
- private void updateWithNotPendingTrans() throws ServerException {
-
+ private void updateWithNotPendingTrans() throws ServerException, InterruptedException {
boolean doEnd = false;
boolean needResize = false;
while (!doEnd && ((uncommittedTransactionsMap.keySet().size() > 0) || (pendingCommitsList.size() > 0)) ) {
}
}
- private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) {
+ private Pair<Boolean, List<Commit>> sendTransactionToLocal(Transaction ut, LocalComm lc) throws InterruptedException {
// encode the request
byte[] array = new byte[Long.BYTES + ut.getSize()];
return new Pair<Boolean, List<Commit>>(didCommit, newCommits);
}
- public byte[] localCommInput(byte[] data) {
+ public byte[] localCommInput(byte[] data) throws InterruptedException {
+
+
// Decode the data
ByteBuffer bbDecode = ByteBuffer.wrap(data);
bbDecode.get();
ut = (Transaction)Transaction.decode(null, bbDecode);
}
+
+ mutex.acquire();
// Do the local update and arbitrate
Pair<Boolean, List<Commit>> returnData = doLocalUpdateAndArbitrate(ut, lastSeenCommit);
+ mutex.release();
+
// Calculate the size of the response
int size = Byte.BYTES + Integer.BYTES;
public static final int NUMBER_OF_TESTS = 100;
- public static void main(String[] args) throws ServerException {
+ public static void main(String[] args) throws ServerException, InterruptedException {
if (args[0].equals("2")) {
test2();
} else if (args[0].equals("3")) {
}
- static void test11() {
+ static void test11() throws InterruptedException {
boolean foundError = false;
}
}
- static void test10() {
+ static void test10() throws InterruptedException {
boolean foundError = false;
}
}
- static void test9() {
+ static void test9() throws InterruptedException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
}
}
- static void test8() {
+ static void test8() throws InterruptedException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
}
}
- static void test7() throws ServerException {
+ static void test7() throws ServerException, InterruptedException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
}
}
- static void test6() throws ServerException {
+ static void test6() throws ServerException, InterruptedException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
}
}
- static void test5() throws ServerException {
+ static void test5() throws ServerException, InterruptedException {
boolean foundError = false;
List<TransactionStatus> transStatusList = new ArrayList<TransactionStatus>();
}
}
- static void test4() throws ServerException {
+ static void test4() throws ServerException, InterruptedException {
boolean foundError = false;
long startTime = 0;
}
}
- static void test3() throws ServerException {
+ static void test3() throws ServerException, InterruptedException {
long startTime = 0;
long endTime = 0;
}
}
- static void test2() throws ServerException {
+ static void test2() throws ServerException, InterruptedException {
boolean foundError = false;
long startTime = 0;
// Setup the 2 clients
Table t1 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 321);
t1.initTable();
+ System.out.println("T1 Ready");
+
Table t2 = new Table("127.0.0.1", "http://127.0.0.1/test.iotcloud/", "reallysecret", 351);
t2.update();
-
+ System.out.println("T2 Ready");
// Make the Keys
System.out.println("Setting up keys");