import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/** Class IoTRMICall is a class that serves method calls on stub.
* <p>
*/
private IoTRMIUtil rmiUtil;
private IoTSocketClient rmiClient;
- private byte[] retValueBytes;
- //private AtomicBoolean didGetReturnValue;
- //private Map<String,byte[]> mapReturnValue; // Store the return value received in a map
- private ConcurrentLinkedQueue<byte[]> returnQueue;
- private Map<String,AtomicBoolean> mapStubId;
- private AtomicBoolean didGetReturnBytes;
- private int objectIdCounter = Integer.MAX_VALUE;
-
+
+
/**
* Constructors
*/
public IoTRMICall(int _port, String _address, int _rev) throws IOException {
- //didGetReturnValue = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
rmiClient = new IoTSocketClient(_port, _address, _rev);
- retValueBytes = null;
- returnQueue = new ConcurrentLinkedQueue<byte[]>();
- mapStubId = new HashMap<String,AtomicBoolean>();
- didGetReturnBytes = new AtomicBoolean(false);
- //mapReturnValue = new HashMap<String,byte[]>();
- waitForReturnValue();
- wakeUpThread();
- }
-
-
- public IoTRMICall(int _localPort, int _port, String _address, int _rev) throws IOException {
-
- //didGetReturnValue = new AtomicBoolean(false);
- rmiUtil = new IoTRMIUtil();
- rmiClient = new IoTSocketClient(_localPort, _port, _address, _rev);
- retValueBytes = null;
- returnQueue = new ConcurrentLinkedQueue<byte[]>();
- mapStubId = new HashMap<String,AtomicBoolean>();
- didGetReturnBytes = new AtomicBoolean(false);
- //mapReturnValue = new HashMap<String,byte[]>();
- waitForReturnValue();
- wakeUpThread();
- }
-
-
- /**
- * waitForReturnValue() starts a thread that waits for return value for a method invocation
- */
- public void waitForReturnValue() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] retBytes = null;
- while(true) {
- try {
- retBytes = rmiClient.receiveBytes(retBytes);
- if (retBytes != null) {
- System.out.println("Return value not null: " + Arrays.toString(retBytes));
- //byte[] keyBytes = getObjectAndMethodIdBytes();
- //String strKeyBytes = new String(keyBytes);
- returnQueue.offer(retBytes);
- } else
- Thread.sleep(100);
- retBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error receiving return value bytes!");
- }
- }
- }
- };
- thread.start();
}
-
-
- /**
- * wakeUpThread() wakes up the correct thread
- */
- public void wakeUpThread() {
- Thread thread = new Thread() {
- public void run() {
- while(true) {
- // Take the current method from the queue and wake up the correct thread
- retValueBytes = returnQueue.poll();
- if (retValueBytes != null) { // If there is method bytes
- System.out.println("methodBytes in wake up thread: " + Arrays.toString(retValueBytes));
- int objectId = getObjectId();
- int methodId = getMethodId();
- String strKey = objectId + "-" + methodId;
- AtomicBoolean retRecv = mapStubId.get(strKey);
- System.out.println("boolean status: " + retRecv + " with key: " + strKey);
- didGetReturnBytes.set(false);
- while(!retRecv.compareAndSet(false, true));
- System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
- while(!didGetReturnBytes.get()); // While skeleton is still processing
- }
- }
- }
- };
- thread.start();
- }
-
/**
- * registerStub() registers the skeleton to be woken up
- */
- public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
-
- String strKey = objectId + "-" + methodId;
- System.out.println("Key exist? " + mapStubId.containsKey(strKey));
- mapStubId.put(strKey, retValueReceived);
- System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
- }
-
-
- /**
- * getObjectIdCounter() gets object Id counter
- */
- public int getObjectIdCounter() {
-
- return objectIdCounter;
- }
-
-
- /**
- * setObjectIdCounter() sets object Id counter
- */
- public void setObjectIdCounter(int objIdCounter) {
-
- objectIdCounter = objIdCounter;
- }
-
-
- /**
- * decrementObjectIdCounter() gets object Id counter
- */
- public void decrementObjectIdCounter() {
-
- objectIdCounter--;
- }
-
-
-
- /**
- * setGetReturnBytes() set boolean if there is a new return value already
- */
- public synchronized boolean setGetReturnBytes() {
-
- return didGetReturnBytes.compareAndSet(false, true);
- }
-
-
- /**
- * getObjectAndMethodIdBytes() extracts object Id and method Id from method bytes
- */
- public byte[] getObjectAndMethodIdBytes() {
-
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- byte[] objectMethodIdBytes = new byte[objMethIdLen];
- System.arraycopy(retValueBytes, 0, objectMethodIdBytes, 0, objMethIdLen);
- return objectMethodIdBytes;
- }
-
-
- /**
- * getObjectAndMethodIdBytes() gets object and method Id in bytes
- */
- public byte[] getObjectAndMethodIdBytes(int objectId, int methodId) {
-
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- byte[] objectMethodIdBytes = new byte[objMethIdLen];
- byte[] objIdBytes = IoTRMIUtil.intToByteArray(objectId);
- byte[] methIdBytes = IoTRMIUtil.intToByteArray(methodId);
- System.arraycopy(objIdBytes, 0, objectMethodIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
- System.arraycopy(methIdBytes, 0, objectMethodIdBytes, IoTRMIUtil.OBJECT_ID_LEN, IoTRMIUtil.METHOD_ID_LEN);
- return objectMethodIdBytes;
- }
-
-
- /**
- * getObjectId() gets object Id from bytes
- */
- public int getObjectId() {
-
- // Get object Id bytes
- byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
- System.arraycopy(retValueBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
- // Get object Id
- int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
- return objectId;
- }
-
-
- /**
- * getMethodId() gets method Id from bytes
- */
- public int getMethodId() {
-
- // Get method Id bytes
- byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
- // Method Id is positioned after object Id in the byte array
- System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
- // Get method Id
- int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
- // Get method Id
- return methodId;
- }
-
-
- /**
- * remoteCall() calls a method remotely by passing in parameters and getting a return Object (DEPRECATED)
+ * remoteCall() calls a method remotely by passing in parameters and getting a return Object
*/
public synchronized Object remoteCall(int objectId, int methodId, Class<?> retType,
Class<?> retGenTypeVal, Class<?>[] paramCls, Object[] paramObj) {
}
- public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
-
- // Send method info
- byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
- try {
- rmiClient.sendBytes(methodBytes);
- } catch (IOException ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error when sending bytes - rmiClient.sendBytes()");
- }
- }
-
-
- /**
- * getReturnValue() returns return value
- */
- public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
-
- // Receive return value and return it to caller
- // Now just strip off the object ID and method ID
- int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
- byte[] retValBytes = new byte[valByteLen];
- // Method Id is positioned after object Id in the byte array
- System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
- Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
- // This means the right object and method have gotten the return value, so we set this back to false
- return retObj;
- }
-
-
- /**
- * getReturnValue() returns return value
- */
- public synchronized Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal, byte[] retValueBytes) {
-
- // Receive return value and return it to caller
- // Now just strip off the object ID and method ID
- int valByteLen = retValueBytes.length - (IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
- byte[] retValBytes = new byte[valByteLen];
- // Method Id is positioned after object Id in the byte array
- System.arraycopy(retValueBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retValBytes, 0, valByteLen);
- Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
- // This means the right object and method have gotten the return value, so we set this back to false
- return retObj;
- }
-
-
/**
* methodToBytes() returns byte representation of a method
*/
import java.util.Set;
import java.lang.reflect.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/** Class IoTRMIObject is a class that stores info of an object.
private IoTRMIUtil rmiUtil;
private IoTSocketServer rmiServer;
private byte[] methodBytes;
- private ConcurrentLinkedQueue<byte[]> methodQueue;
- private Map<Integer,AtomicBoolean> mapSkeletonId;
- private AtomicBoolean didGetMethodBytes;
+ private Lock lock = new ReentrantLock();
/**
ClassNotFoundException, InstantiationException,
IllegalAccessException, IOException {
- didGetMethodBytes = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
methodBytes = null;
- methodQueue = new ConcurrentLinkedQueue<byte[]>();
- mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
rmiServer = new IoTSocketServer(_port);
rmiServer.connect();
- waitForMethod();
- wakeUpThread();
}
/**
- * waitForMethod() starts a thread that waits for method bytes
- */
- public void waitForMethod() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] methBytes = null;
- while(true) {
- try {
- methBytes = rmiServer.receiveBytes(methBytes);
- if (methBytes != null) {
- System.out.println("Command not null: " + Arrays.toString(methBytes));
- methodQueue.offer(methBytes);
- } else
- Thread.sleep(100);
- methBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error receiving return value bytes!");
- }
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * wakeUpThread() wakes up the correct thread
- */
- public void wakeUpThread() {
-
- Thread thread = new Thread() {
- public void run() {
- while(true) {
- // Take the current method from the queue and wake up the correct thread
- methodBytes = methodQueue.poll();
- if (methodBytes != null) { // If there is method bytes
- int currObjId = getObjectId(methodBytes);
- AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
- didGetMethodBytes.set(false);
- while(!methRecv.compareAndSet(false, true));
- while(!didGetMethodBytes.get()); // While skeleton is still processing
- }
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * registerSkeleton() registers the skeleton to be woken up
- */
- public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
-
- mapSkeletonId.put(objectId, methodReceived);
- }
-
-
- /**
- * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
- */
- public boolean setGetMethodBytes() {
-
- return didGetMethodBytes.compareAndSet(false, true);
- }
-
-
- /**
- * getMethodBytes() get the method in bytes
+ * getMethodBytes() waits for method transmission in bytes
*/
public byte[] getMethodBytes() throws IOException {
- // Just return the methodBytes content
+ // Receive method info
+ //System.out.println("Method RMIObj before: " + Arrays.toString(methodBytes));
+ methodBytes = rmiServer.receiveBytes(methodBytes);
+ //System.out.println("Method RMIObj after: " + Arrays.toString(methodBytes));
return methodBytes;
}
}
+ /**
+ * setMethodBytes() sets bytes for method
+ */
+ /*public void setMethodBytes(byte[] _methodBytes) throws IOException {
+
+ // Set method bytes
+ methodBytes = _methodBytes;
+ }*/
+
+
/**
* getMethodId() gets method Id from bytes
*/
/**
- * getMethodParams() overloading
- */
- public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
-
- // Byte scanning position
- int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- Object[] paramObj = new Object[arrCls.length];
- for (int i=0; i < arrCls.length; i++) {
-
- String paramType = arrCls[i].getSimpleName();
- int paramSize = rmiUtil.getTypeSize(paramType);
- // Get the 32-bit field in the byte array to get the actual
- // length (this is a param with indefinite length)
- if (paramSize == -1) {
- byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
- System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
- pos = pos + IoTRMIUtil.PARAM_LEN;
- paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
- }
- byte[] paramBytes = new byte[paramSize];
- System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
- pos = pos + paramSize;
- paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
- }
-
- return paramObj;
- }
-
-
- /**
- * sendReturnObj() overloading
+ * sendReturnObj() sends back return Object to client
*/
public void sendReturnObj(Object retObj) throws IOException {
-
- byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
- // Copy array of bytes (return object)
- System.arraycopy(retObjBytes, 0, retAllBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retObjBytes.length);
- rmiServer.sendBytes(retAllBytes);
- }
-
-
- /**
- * sendReturnObj() static version
- */
- public void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
-
// Send back return value
byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
- // Copy array of bytes (return object)
- System.arraycopy(retObjBytes, 0, retAllBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retObjBytes.length);
- rmiServer.sendBytes(retAllBytes);
+ rmiServer.sendBytes(retObjBytes);
}
/**
* sendReturnObj() overloaded to send multiple return objects for structs
*/
- public void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
+ public void sendReturnObj(Class<?>[] retCls, Object[] retObj) throws IOException {
// Send back return value
byte[] retObjBytes = returnToBytes(retCls, retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
- // Copy array of bytes (return object)
- rmiServer.sendBytes(retAllBytes);
+ rmiServer.sendBytes(retObjBytes);
}