import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/** Class IoTRMICall is a class that serves method calls on stub.
* <p>
*/
private IoTRMIUtil rmiUtil;
private IoTSocketClient rmiClient;
-
-
+ private byte[] retValueBytes;
+ //private AtomicBoolean didGetReturnValue;
+ //private Map<String,byte[]> mapReturnValue; // Store the return value received in a map
+ private ConcurrentLinkedQueue<byte[]> returnQueue;
+ private Map<String,AtomicBoolean> mapStubId;
+ private AtomicBoolean didGetReturnBytes;
+ private int objectIdCounter = Integer.MAX_VALUE;
+
/**
* Constructors
*/
public IoTRMICall(int _port, String _address, int _rev) throws IOException {
+ //didGetReturnValue = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
rmiClient = new IoTSocketClient(_port, _address, _rev);
+ retValueBytes = null;
+ returnQueue = new ConcurrentLinkedQueue<byte[]>();
+ mapStubId = new HashMap<String,AtomicBoolean>();
+ didGetReturnBytes = new AtomicBoolean(false);
+ //mapReturnValue = new HashMap<String,byte[]>();
+ waitForReturnValue();
+ wakeUpThread();
+ }
+
+
+ public IoTRMICall(int _localPort, int _port, String _address, int _rev) throws IOException {
+
+ //didGetReturnValue = new AtomicBoolean(false);
+ rmiUtil = new IoTRMIUtil();
+ rmiClient = new IoTSocketClient(_localPort, _port, _address, _rev);
+ retValueBytes = null;
+ returnQueue = new ConcurrentLinkedQueue<byte[]>();
+ mapStubId = new HashMap<String,AtomicBoolean>();
+ didGetReturnBytes = new AtomicBoolean(false);
+ //mapReturnValue = new HashMap<String,byte[]>();
+ waitForReturnValue();
+ wakeUpThread();
}
/**
- * remoteCall() calls a method remotely by passing in parameters and getting a return Object
+ * waitForReturnValue() starts a thread that waits for return value for a method invocation
+ */
+ public void waitForReturnValue() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ byte[] retBytes = null;
+ while(true) {
+ try {
+ retBytes = rmiClient.receiveBytes(retBytes);
+ if (retBytes != null) {
+ System.out.println("Return value not null: " + Arrays.toString(retBytes));
+ //byte[] keyBytes = getObjectAndMethodIdBytes();
+ //String strKeyBytes = new String(keyBytes);
+ returnQueue.offer(retBytes);
+ } else
+ Thread.sleep(100);
+ retBytes = null;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error receiving return value bytes!");
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * wakeUpThread() wakes up the correct thread
+ */
+ public void wakeUpThread() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ while(true) {
+ // Take the current method from the queue and wake up the correct thread
+ retValueBytes = returnQueue.poll();
+ if (retValueBytes != null) { // If there is method bytes
+ System.out.println("methodBytes in wake up thread: " + Arrays.toString(retValueBytes));
+ int objectId = getObjectId();
+ int methodId = getMethodId();
+ String strKey = objectId + "-" + methodId;
+ AtomicBoolean retRecv = mapStubId.get(strKey);
+ System.out.println("boolean status: " + retRecv + " with key: " + strKey);
+ didGetReturnBytes.set(false);
+ while(!retRecv.compareAndSet(false, true));
+ System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
+ while(!didGetReturnBytes.get()); // While skeleton is still processing
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * registerStub() registers the skeleton to be woken up
+ */
+ public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
+
+ String strKey = objectId + "-" + methodId;
+ System.out.println("Key exist? " + mapStubId.containsKey(strKey));
+ mapStubId.put(strKey, retValueReceived);
+ System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
+ }
+
+
+ /**
+ * getObjectIdCounter() gets object Id counter
+ */
+ public int getObjectIdCounter() {
+
+ return objectIdCounter;
+ }
+
+
+ /**
+ * setObjectIdCounter() sets object Id counter
+ */
+ public void setObjectIdCounter(int objIdCounter) {
+
+ objectIdCounter = objIdCounter;
+ }
+
+
+ /**
+ * decrementObjectIdCounter() gets object Id counter
+ */
+ public void decrementObjectIdCounter() {
+
+ objectIdCounter--;
+ }
+
+
+
+ /**
+ * setGetReturnBytes() set boolean if there is a new return value already
+ */
+ public synchronized boolean setGetReturnBytes() {
+
+ return didGetReturnBytes.compareAndSet(false, true);
+ }
+
+
+ /**
+ * getObjectAndMethodIdBytes() extracts object Id and method Id from method bytes
+ */
+ public byte[] getObjectAndMethodIdBytes() {
+
+ int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+ byte[] objectMethodIdBytes = new byte[objMethIdLen];
+ System.arraycopy(retValueBytes, 0, objectMethodIdBytes, 0, objMethIdLen);
+ return objectMethodIdBytes;
+ }
+
+
+ /**
+ * getObjectAndMethodIdBytes() gets object and method Id in bytes
+ */
+ public byte[] getObjectAndMethodIdBytes(int objectId, int methodId) {
+
+ int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+ byte[] objectMethodIdBytes = new byte[objMethIdLen];
+ byte[] objIdBytes = IoTRMIUtil.intToByteArray(objectId);
+ byte[] methIdBytes = IoTRMIUtil.intToByteArray(methodId);
+ System.arraycopy(objIdBytes, 0, objectMethodIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
+ System.arraycopy(methIdBytes, 0, objectMethodIdBytes, IoTRMIUtil.OBJECT_ID_LEN, IoTRMIUtil.METHOD_ID_LEN);
+ return objectMethodIdBytes;
+ }
+
+
+ /**
+ * getObjectId() gets object Id from bytes
+ */
+ public int getObjectId() {
+
+ // Get object Id bytes
+ byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
+ System.arraycopy(retValueBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
+ // Get object Id
+ int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
+ return objectId;
+ }
+
+
+ /**
+ * getMethodId() gets method Id from bytes
+ */
+ public int getMethodId() {
+
+ // Get method Id bytes
+ byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
+ // Get method Id
+ int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
+ // Get method Id
+ return methodId;
+ }
+
+
+ /**
+ * remoteCall() calls a method remotely by passing in parameters and getting a return Object (DEPRECATED)
*/
public synchronized Object remoteCall(int objectId, int methodId, Class<?> retType,
Class<?> retGenTypeVal, Class<?>[] paramCls, Object[] paramObj) {
}
+ public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
+
+ // Send method info
+ byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
+ try {
+ rmiClient.sendBytes(methodBytes);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error when sending bytes - rmiClient.sendBytes()");
+ }
+ }
+
+
+ /**
+ * getReturnValue() returns return value
+ */
+ public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
+
+ // Receive return value and return it to caller
+ // Now just strip off the object ID and method ID
+ int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
+ Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
+ // This means the right object and method have gotten the return value, so we set this back to false
+ return retObj;
+ }
+
+
+ /**
+ * getReturnValue() returns return value
+ */
+ public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal, byte[] retValueBytes) {
+
+ // Receive return value and return it to caller
+ // Now just strip off the object ID and method ID
+ int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
+ Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
+ // This means the right object and method have gotten the return value, so we set this back to false
+ return retObj;
+ }
+
+
/**
* methodToBytes() returns byte representation of a method
*/