import java.util.Set;
import java.lang.reflect.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/** Class IoTRMIObject is a class that stores info of an object.
* <p>
private IoTRMIUtil rmiUtil;
private IoTSocketServer rmiServer;
private byte[] methodBytes;
+ private ConcurrentLinkedQueue<byte[]> methodQueue;
+ private Map<Integer,AtomicBoolean> mapSkeletonId;
+ private AtomicBoolean didGetMethodBytes;
/**
ClassNotFoundException, InstantiationException,
IllegalAccessException, IOException {
+ didGetMethodBytes = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
methodBytes = null;
+ methodQueue = new ConcurrentLinkedQueue<byte[]>();
+ mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
rmiServer = new IoTSocketServer(_port);
rmiServer.connect();
+ waitForMethod();
+ wakeUpThread();
+ }
+
+
+ /**
+ * waitForMethod() starts a thread that waits for method bytes
+ */
+ public void waitForMethod() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ byte[] methBytes = null;
+ while(true) {
+ try {
+ methBytes = rmiServer.receiveBytes(methBytes);
+ if (methBytes != null) {
+ System.out.println("Command not null: " + Arrays.toString(methBytes));
+ methodQueue.offer(methBytes);
+ } else
+ Thread.sleep(100);
+ methBytes = null;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ throw new Error("IoTRMICall: Error receiving return value bytes!");
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * wakeUpThread() wakes up the correct thread
+ */
+ public void wakeUpThread() {
+
+ Thread thread = new Thread() {
+ public void run() {
+ while(true) {
+ // Take the current method from the queue and wake up the correct thread
+ methodBytes = methodQueue.poll();
+ if (methodBytes != null) { // If there is method bytes
+ int currObjId = getObjectId(methodBytes);
+ AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
+ didGetMethodBytes.set(false);
+ while(!methRecv.compareAndSet(false, true));
+ while(!didGetMethodBytes.get()); // While skeleton is still processing
+ }
+ }
+ }
+ };
+ thread.start();
+ }
+
+
+ /**
+ * registerSkeleton() registers the skeleton to be woken up
+ */
+ public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
+
+ mapSkeletonId.put(objectId, methodReceived);
}
/**
- * getMethodBytes() waits for method transmission in bytes
+ * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
+ */
+ public boolean setGetMethodBytes() {
+
+ return didGetMethodBytes.compareAndSet(false, true);
+ }
+
+
+ /**
+ * getMethodBytes() get the method in bytes
*/
public byte[] getMethodBytes() throws IOException {
- // Receive method info
- methodBytes = rmiServer.receiveBytes(methodBytes);
- System.out.println("Method: " + Arrays.toString(methodBytes));
+ // Just return the methodBytes content
return methodBytes;
}
/**
- * setMethodBytes() sets bytes for method
+ * getMethodId() gets method Id from bytes
*/
- /*public void setMethodBytes(byte[] _methodBytes) throws IOException {
+ public int getMethodId() {
- // Set method bytes
- methodBytes = _methodBytes;
- }*/
+ // Get method Id bytes
+ byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(methodBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
+ // Get method Id
+ int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
+ // Get method Id
+ return methodId;
+ }
/**
- * getMethodId() gets method Id from bytes
+ * static version of getMethodId()
*/
- public int getMethodId() {
+ public static int getMethodId(byte[] methodBytes) {
// Get method Id bytes
byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
/**
- * sendReturnObj() sends back return Object to client
+ * getMethodParams() overloading
+ */
+ public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
+
+ // Byte scanning position
+ int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+ Object[] paramObj = new Object[arrCls.length];
+ for (int i=0; i < arrCls.length; i++) {
+
+ String paramType = arrCls[i].getSimpleName();
+ int paramSize = rmiUtil.getTypeSize(paramType);
+ // Get the 32-bit field in the byte array to get the actual
+ // length (this is a param with indefinite length)
+ if (paramSize == -1) {
+ byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
+ System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
+ pos = pos + IoTRMIUtil.PARAM_LEN;
+ paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
+ }
+ byte[] paramBytes = new byte[paramSize];
+ System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
+ pos = pos + paramSize;
+ paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
+ }
+
+ return paramObj;
+ }
+
+
+ /**
+ * sendReturnObj() overloading
*/
public void sendReturnObj(Object retObj) throws IOException {
+
+ byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
+ // Send return value together with OBJECT_ID and METHOD_ID for arbitration
+ byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
+ // Copy OBJECT_ID and METHOD_ID
+ System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ // Copy array of bytes (return object)
+ System.arraycopy(retObjBytes, 0, retAllBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retObjBytes.length);
+ rmiServer.sendBytes(retAllBytes);
+ }
+
+
+ /**
+ * sendReturnObj() static version
+ */
+ public void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
+
// Send back return value
byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
+ // Send return value together with OBJECT_ID and METHOD_ID for arbitration
+ byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
+ // Copy OBJECT_ID and METHOD_ID
+ System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+ // Copy array of bytes (return object)
+ System.arraycopy(retObjBytes, 0, retAllBytes, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN, retObjBytes.length);
+ rmiServer.sendBytes(retAllBytes);
+ }
+
+
+ /**
+ * sendReturnObj() overloaded to send multiple return objects for structs
+ */
+ public void sendReturnObj(Class<?>[] retCls, Object[] retObj) throws IOException {
+
+ // Send back return value
+ byte[] retObjBytes = returnToBytes(retCls, retObj);
rmiServer.sendBytes(retObjBytes);
}
+
+
+ /**
+ * returnToBytes() takes array of objects and generates bytes
+ */
+ public byte[] returnToBytes(Class<?>[] retCls, Object[] retObj) {
+
+ // Get byte arrays and calculate method bytes length
+ int numbRet = retObj.length;
+ int retLen = 0;
+ byte[][] objBytesArr = new byte[numbRet][];
+ for (int i = 0; i < numbRet; i++) {
+ // Get byte arrays for the objects
+ objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]);
+ String clsName = retCls[i].getSimpleName();
+ int retObjLen = rmiUtil.getTypeSize(clsName);
+ if (retObjLen == -1) { // indefinite length - store the length first
+ retLen = retLen + IoTRMIUtil.RETURN_LEN;
+ }
+ retLen = retLen + objBytesArr[i].length;
+ }
+ // Construct return in byte array
+ byte[] retBytes = new byte[retLen];
+ int pos = 0;
+ // Iteration for copying bytes
+ for (int i = 0; i < numbRet; i++) {
+
+ String clsName = retCls[i].getSimpleName();
+ int retObjLen = rmiUtil.getTypeSize(clsName);
+ if (retObjLen == -1) { // indefinite length
+ retObjLen = objBytesArr[i].length;
+ byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen);
+ System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN);
+ pos = pos + IoTRMIUtil.RETURN_LEN;
+ }
+ System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen);
+ pos = pos + retObjLen;
+ }
+
+ return retBytes;
+ }
}