First version of skeleton-stub communication using queue and 2 sockets (one send...
authorrtrimana <rtrimana@uci.edu>
Sat, 28 Jan 2017 00:15:22 +0000 (16:15 -0800)
committerrtrimana <rtrimana@uci.edu>
Sat, 28 Jan 2017 00:15:22 +0000 (16:15 -0800)
iotjava/iotrmi/Java/IoTRMIComm.java [new file with mode: 0644]
iotjava/iotrmi/Java/IoTRMIObject.java
iotjava/iotrmi/Java/IoTRMIUtil.java
iotjava/iotrmi/Java/IoTSocket.java
iotjava/iotrmi/Java/basics/CallBack.java
iotjava/iotrmi/Java/basics/TestClass.java
iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java
iotjava/iotrmi/Java/basics/TestClass_Skeleton.java

diff --git a/iotjava/iotrmi/Java/IoTRMIComm.java b/iotjava/iotrmi/Java/IoTRMIComm.java
new file mode 100644 (file)
index 0000000..0be3a25
--- /dev/null
@@ -0,0 +1,675 @@
+package iotrmi.Java;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.lang.reflect.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/** Class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
+ *  <p>
+ *  We will arbitrate packets into 2 queues and wake up the right threads/callers.
+ *  We separate traffics one-directionally.
+ *
+ * @author      Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version     1.0
+ * @since       2017-01-27
+ */
+public class IoTRMIComm {
+
+       /**
+        * Class Properties
+        */
+       private List<String> listMethodId2Sign; // List of method signature (we use list index as method Id)
+       private IoTRMIUtil rmiUtil;
+       private IoTSocketServer rmiServerSend;
+       private IoTSocketServer rmiServerRecv;
+       private IoTSocketClient rmiClientSend;
+       private IoTSocketClient rmiClientRecv;
+       private byte[] methodBytes;
+       private byte[] retValueBytes;
+       private ConcurrentLinkedQueue<byte[]> methodQueue;
+       private ConcurrentLinkedQueue<byte[]> returnQueue;
+       private Map<Integer,AtomicBoolean> mapSkeletonId;
+       private Map<String,AtomicBoolean> mapStubId;
+       private AtomicBoolean didGetMethodBytes;
+       private AtomicBoolean didGetReturnBytes;
+       private AtomicBoolean didServerSendConnect;
+       private AtomicBoolean didServerRecvConnect;
+       private int objectIdCounter = Integer.MAX_VALUE;
+
+       /**
+        * Constructor (for skeleton)
+        */
+       public IoTRMIComm(int _portSend, int _portRecv) throws  
+               ClassNotFoundException, InstantiationException, 
+                       IllegalAccessException, IOException {
+
+               didGetMethodBytes = new AtomicBoolean(false);
+               didGetReturnBytes = new AtomicBoolean(false);
+               didServerSendConnect = new AtomicBoolean(false);
+               didServerRecvConnect = new AtomicBoolean(false);
+               rmiUtil = new IoTRMIUtil();
+               methodBytes = null;
+               retValueBytes = null;
+               methodQueue = new ConcurrentLinkedQueue<byte[]>();
+               returnQueue = new ConcurrentLinkedQueue<byte[]>();
+               mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
+               mapStubId = new HashMap<String,AtomicBoolean>();
+               //rmiServerSend = new IoTSocketServer(_portSend);
+               //rmiServerSend.connect();
+               //rmiServerRecv = new IoTSocketServer(_portRecv);
+               //rmiServerRecv.connect();
+               rmiServerSend = new IoTSocketServer(_portSend);
+               rmiServerRecv = new IoTSocketServer(_portRecv);
+               waitForConnectionOnServerSend();
+               waitForConnectionOnServerRecv();
+               while(!didServerSendConnect.get());     // Wait until server is connected
+               while(!didServerRecvConnect.get()); // Wait until server is connected
+               rmiClientSend = null;
+               rmiClientRecv = null;
+               waitForPacketsOnServer();
+               wakeUpThreadOnMethodCall();
+               wakeUpThreadOnReturnValue();
+       }
+
+
+       /**
+        * waitForConnectionOnServerSend() starts a thread that waits server connection
+        */
+       public void waitForConnectionOnServerSend() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               try {
+                                       rmiServerSend.connect();
+                                       didServerSendConnect.set(true);
+                               } catch (Exception ex) {
+                                       ex.printStackTrace();
+                                       throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * waitForConnectionOnServerRecv() starts a thread that waits server connection
+        */
+       public void waitForConnectionOnServerRecv() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               try {
+                                       rmiServerRecv.connect();
+                                       didServerRecvConnect.set(true);
+                               } catch (Exception ex) {
+                                       ex.printStackTrace();
+                                       throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * Constructor (for stub) - send and recv from the perspective of RMI socket servers
+        */
+       public IoTRMIComm(int _localPortSend, int _localPortRecv, int _portSend, int _portRecv, String _address, int _rev) throws  
+               ClassNotFoundException, InstantiationException, 
+                       IllegalAccessException, IOException {
+
+               didGetMethodBytes = new AtomicBoolean(false);
+               didGetReturnBytes = new AtomicBoolean(false);
+               didServerSendConnect = null;
+               didServerRecvConnect = null;
+               rmiUtil = new IoTRMIUtil();
+               methodBytes = null;
+               retValueBytes = null;
+               methodQueue = new ConcurrentLinkedQueue<byte[]>();
+               returnQueue = new ConcurrentLinkedQueue<byte[]>();
+               mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
+               mapStubId = new HashMap<String,AtomicBoolean>();
+               rmiServerSend = null;
+               rmiServerRecv = null;
+               rmiClientRecv = new IoTSocketClient(_localPortSend, _portSend, _address, _rev);
+               rmiClientSend = new IoTSocketClient(_localPortRecv, _portRecv, _address, _rev);
+               waitForPacketsOnClient();
+               wakeUpThreadOnMethodCall();
+               wakeUpThreadOnReturnValue();
+       }
+
+
+       /**
+        * waitForPacketsOnServer() starts a thread that waits for packet bytes on server side
+        */
+       public void waitForPacketsOnServer() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               byte[] packetBytes = null;
+                               while(true) {
+                                       try {
+                                               packetBytes = rmiServerRecv.receiveBytes(packetBytes);
+                                               if (packetBytes != null) {
+                                                       System.out.println("Packet received: " + Arrays.toString(packetBytes));
+                                                       int packetType = IoTRMIComm.getPacketType(packetBytes);
+                                                       if (packetType == IoTRMIUtil.METHOD_TYPE) {
+                                                               System.out.println("Method packet: " + Arrays.toString(packetBytes));
+                                                               methodQueue.offer(packetBytes);
+                                                       } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
+                                                               System.out.println("Return value packet: " + Arrays.toString(packetBytes));
+                                                               returnQueue.offer(packetBytes);
+                                                       } else
+                                                               throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
+                                               } //else
+                                               //      Thread.sleep(100);
+                                               packetBytes = null;
+                                       } catch (Exception ex) {
+                                               ex.printStackTrace();
+                                               throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
+                                       }
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * waitForPacketsOnClient() starts a thread that waits for packet bytes on client side
+        */
+       public void waitForPacketsOnClient() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               byte[] packetBytes = null;
+                               while(true) {
+                                       try {
+                                               packetBytes = rmiClientRecv.receiveBytes(packetBytes);
+                                               if (packetBytes != null) {
+                                                       int packetType = IoTRMIComm.getPacketType(packetBytes);
+                                                       if (packetType == IoTRMIUtil.METHOD_TYPE) {
+                                                               System.out.println("Method packet: " + Arrays.toString(packetBytes));
+                                                               methodQueue.offer(packetBytes);
+                                                       } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
+                                                               System.out.println("Return value packet: " + Arrays.toString(packetBytes));
+                                                               returnQueue.offer(packetBytes);
+                                                       } else
+                                                               throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
+                                               } //else
+                                               //      Thread.sleep(100);
+                                               packetBytes = null;
+                                       } catch (Exception ex) {
+                                               ex.printStackTrace();
+                                               throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
+                                       }
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
+        */
+       public void wakeUpThreadOnMethodCall() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               while(true) {
+                                       // Take the current method from the queue and wake up the correct thread
+                                       methodBytes = methodQueue.poll();
+                                       if (methodBytes != null) {      // If there is method bytes
+                                               int currObjId = getObjectId(methodBytes);
+                                               AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
+                                               didGetMethodBytes.set(false);
+                                               while(!methRecv.compareAndSet(false, true));
+                                               while(!didGetMethodBytes.get());        // While skeleton is still processing
+                                       }
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value
+        */
+       public void wakeUpThreadOnReturnValue() {
+
+               Thread thread = new Thread() {
+                       public void run() {
+                               while(true) {
+                                       // Take the current method from the queue and wake up the correct thread
+                                       retValueBytes = returnQueue.poll();
+                                       if (retValueBytes != null) {    // If there is method bytes
+                                               System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes));
+                                               int objectId = getObjectId(retValueBytes);
+                                               int methodId = getMethodId(retValueBytes);
+                                               String strKey = objectId + "-" + methodId;
+                                               AtomicBoolean retRecv = mapStubId.get(strKey);
+                                               //System.out.println("boolean status: " + retRecv + " with key: " + strKey);
+                                               didGetReturnBytes.set(false);
+                                               while(!retRecv.compareAndSet(false, true));
+                                               //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
+                                               while(!didGetReturnBytes.get());        // While skeleton is still processing
+                                       }
+                               }
+                       }
+               };
+               thread.start();
+       }
+
+
+       /**
+        * registerSkeleton() registers the skeleton to be woken up
+
+        */
+       public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
+
+               mapSkeletonId.put(objectId, methodReceived);
+       }
+
+
+       /**
+        * registerStub() registers the skeleton to be woken up
+        */
+       public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
+
+               String strKey = objectId + "-" + methodId;
+               //System.out.println("Key exist? " + mapStubId.containsKey(strKey));
+               mapStubId.put(strKey, retValueReceived);
+               //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
+       }
+
+
+       /**
+        * getObjectIdCounter() gets object Id counter
+        */
+       public int getObjectIdCounter() {
+
+               return objectIdCounter;
+       }
+
+
+       /**
+        * setObjectIdCounter() sets object Id counter
+        */
+       public void setObjectIdCounter(int objIdCounter) {
+
+               objectIdCounter = objIdCounter;
+       }
+
+
+       /**
+        * decrementObjectIdCounter() gets object Id counter
+        */
+       public void decrementObjectIdCounter() {
+
+               objectIdCounter--;
+       }
+
+
+       /**
+        * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
+        */
+       public boolean setGetMethodBytes() {
+
+               return didGetMethodBytes.compareAndSet(false, true);
+       }
+
+
+       /**
+        * setGetReturnBytes() set didGetReturnBytes if there is a new return value already
+        */
+       public synchronized boolean setGetReturnBytes() {
+
+               return didGetReturnBytes.compareAndSet(false, true);
+       }
+
+
+       /**
+        * getMethodBytes() get the method in bytes
+        */
+       public byte[] getMethodBytes() throws IOException {
+
+               // Just return the methodBytes content
+               return methodBytes;
+       }
+
+
+       /**
+        * static version of getObjectId()
+        */
+       public static int getObjectId(byte[] packetBytes) {
+
+               // Get object Id bytes
+               byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
+               System.arraycopy(packetBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
+               // Get object Id
+               int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
+               return objectId;
+       }
+
+
+       /**
+        * static version of getMethodId()
+        */
+       public static int getMethodId(byte[] packetBytes) {
+
+               // Get method Id bytes
+               byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
+               // Method Id is positioned after object Id in the byte array
+               System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
+               // Get method Id
+               int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
+               // Get method Id
+               return methodId;
+       }
+
+
+       /**
+        * static version of getPacketType() - either method or return value (position is after object Id and method Id)
+        */
+       public static int getPacketType(byte[] packetBytes) {
+
+               // Get packet type bytes
+               byte[] packetTypeBytes = new byte[IoTRMIUtil.PACKET_TYPE_LEN];
+               int offset = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+               System.arraycopy(packetBytes, offset, packetTypeBytes, 0, IoTRMIUtil.PACKET_TYPE_LEN);
+               // Get packet type (for now we assume 1 as method and -1 as return value
+               int packetType = IoTRMIUtil.byteArrayToInt(packetTypeBytes);
+               return packetType;
+       }
+
+
+       /**
+        * getMethodParams() gets method params based on byte array received
+        * <p>
+        * Basically this is the format of a method in bytes:
+        * 1) 32-bit value of object ID
+        * 2) 32-bit value of method ID
+        * 3) m parameters with n-bit value each (m x n-bit)
+        * For the parameters that don't have definite length,
+        * we need to extract the length from a preceding 32-bit
+        * field in front of it.
+        *
+        * For primitive objects:
+        * | 32-bit object ID | 32-bit method ID | m-bit actual data (fixed length)  | ...
+        * 
+        * For string, arrays, and non-primitive objects:
+        * | 32-bit object ID | 32-bit method ID | 32-bit length | n-bit actual data | ...
+        * 
+        */
+       public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
+
+               // Byte scanning position
+               int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
+               Object[] paramObj = new Object[arrCls.length];
+               for (int i=0; i < arrCls.length; i++) {
+
+                       String paramType = arrCls[i].getSimpleName();
+                       int paramSize = rmiUtil.getTypeSize(paramType);
+                       // Get the 32-bit field in the byte array to get the actual
+                       //              length (this is a param with indefinite length)
+                       if (paramSize == -1) {
+                               byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
+                               System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
+                               pos = pos + IoTRMIUtil.PARAM_LEN;
+                               paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
+                       }
+                       byte[] paramBytes = new byte[paramSize];
+                       System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
+                       pos = pos + paramSize;
+                       paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
+               }
+
+               return paramObj;
+       }
+
+
+       /**
+        * sendReturnObj() static version
+        */
+       public synchronized void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
+
+               // Send back return value
+               byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
+               // Send return value together with OBJECT_ID and METHOD_ID for arbitration
+               int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+               int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
+               byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
+               // Copy OBJECT_ID and METHOD_ID
+               System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
+               int packetType = IoTRMIUtil.RET_VAL_TYPE;       // This is a return value
+               byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
+               System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
+               // Copy array of bytes (return object)
+               System.arraycopy(retObjBytes, 0, retAllBytes, headerLen, retObjBytes.length);
+               if (rmiServerSend == null)
+                       rmiClientSend.sendBytes(retAllBytes);
+               else
+                       rmiServerSend.sendBytes(retAllBytes);
+       }
+
+
+       /**
+        * sendReturnObj() overloaded to send multiple return objects for structs
+        */
+       public synchronized void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
+
+               // Send back return value
+               byte[] retObjBytes = returnToBytes(retCls, retObj);
+               // Send return value together with OBJECT_ID and METHOD_ID for arbitration
+               int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
+               int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
+               byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
+               // Copy OBJECT_ID and METHOD_ID
+               System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
+               int packetType = IoTRMIUtil.RET_VAL_TYPE;       // This is a return value
+               byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
+               System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
+               // Copy array of bytes (return object)
+               if (rmiServerSend == null)
+                       rmiClientSend.sendBytes(retAllBytes);
+               else
+                       rmiServerSend.sendBytes(retAllBytes);
+       }
+
+
+       /**
+        * returnToBytes() takes array of objects and generates bytes
+        */
+       public byte[] returnToBytes(Class<?>[] retCls, Object[] retObj) {
+
+               // Get byte arrays and calculate method bytes length
+               int numbRet = retObj.length;
+               int retLen = 0;
+               byte[][] objBytesArr = new byte[numbRet][];
+               for (int i = 0; i < numbRet; i++) {
+                       // Get byte arrays for the objects
+                       objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]);
+                       String clsName = retCls[i].getSimpleName();
+                       int retObjLen = rmiUtil.getTypeSize(clsName);
+                       if (retObjLen == -1) {          // indefinite length - store the length first
+                               retLen = retLen + IoTRMIUtil.RETURN_LEN;
+                       }
+                       retLen = retLen + objBytesArr[i].length;
+               }
+               // Construct return in byte array
+               byte[] retBytes = new byte[retLen];
+               int pos = 0;
+               // Iteration for copying bytes
+               for (int i = 0; i < numbRet; i++) {
+
+                       String clsName = retCls[i].getSimpleName();
+                       int retObjLen = rmiUtil.getTypeSize(clsName);
+                       if (retObjLen == -1) {          // indefinite length
+                               retObjLen = objBytesArr[i].length;
+                               byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen);
+                               System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN);
+                               pos = pos + IoTRMIUtil.RETURN_LEN;
+                       }               
+                       System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen);
+                       pos = pos + retObjLen;
+               }
+
+               return retBytes;
+       }
+
+
+       /**
+        * remoteCall() calls a method remotely by passing in parameters
+        */
+       public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
+
+               // Send method info
+               byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
+               try {
+                       if (rmiClientSend == null)
+                               rmiServerSend.sendBytes(methodBytes);
+                       else
+                               rmiClientSend.sendBytes(methodBytes);
+               } catch (IOException ex) {
+                       ex.printStackTrace();
+                       throw new Error("IoTRMICall: Error when sending bytes in remoteCall()!");
+               }
+       }
+
+
+       /**
+        * getReturnValue() returns return value object
+        */
+       public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
+
+               // Receive return value and return it to caller
+               // Now just strip off the object ID and method ID
+               int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
+               int valByteLen = retValueBytes.length - headerLen;
+               byte[] retValBytes = new byte[valByteLen];
+               // Method Id is positioned after object Id in the byte array
+               System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
+               Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
+               // This means the right object and method have gotten the return value, so we set this back to false
+               return retObj;
+       }
+
+
+       /**
+        * methodToBytes() returns byte representation of a method
+        */
+       public byte[] methodToBytes(int objectId, int methId, Class<?>[] paramCls, Object[] paramObj) {
+
+               // Initialized to the length of method ID
+               int methodLen = IoTRMIUtil.OBJECT_ID_LEN;
+               byte[] objId = IoTRMIUtil.intToByteArray(objectId);
+               // Get method ID in bytes
+               byte[] methodId = IoTRMIUtil.intToByteArray(methId);
+               // Get byte arrays and calculate method bytes length
+               int numbParam = paramObj.length;
+               methodLen = methodLen + IoTRMIUtil.METHOD_ID_LEN;
+               methodLen = methodLen + IoTRMIUtil.PACKET_TYPE_LEN;
+               byte[][] objBytesArr = new byte[numbParam][];
+               for (int i = 0; i < numbParam; i++) {
+                       // Get byte arrays for the objects
+                       objBytesArr[i] = IoTRMIUtil.getObjectBytes(paramObj[i]);
+                       String clsName = paramCls[i].getSimpleName();
+                       int paramLen = rmiUtil.getTypeSize(clsName);
+                       if (paramLen == -1) {           // indefinite length - store the length first
+                               methodLen = methodLen + IoTRMIUtil.PARAM_LEN;
+                       }
+                       methodLen = methodLen + objBytesArr[i].length;
+               }
+               // Construct method in byte array
+               byte[] method = new byte[methodLen];
+               int pos = 0;
+               System.arraycopy(objId, 0, method, 0, IoTRMIUtil.METHOD_ID_LEN);
+               pos = pos + IoTRMIUtil.OBJECT_ID_LEN;
+               System.arraycopy(methodId, 0, method, pos, IoTRMIUtil.METHOD_ID_LEN);
+               pos = pos + IoTRMIUtil.METHOD_ID_LEN;
+               int packetType = IoTRMIUtil.METHOD_TYPE;        // This is a method
+               byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
+               System.arraycopy(packetTypeBytes, 0, method, pos, IoTRMIUtil.PACKET_TYPE_LEN);
+               pos = pos + IoTRMIUtil.PACKET_TYPE_LEN;
+               // Second iteration for copying bytes
+               for (int i = 0; i < numbParam; i++) {
+
+                       String clsName = paramCls[i].getSimpleName();
+                       int paramLen = rmiUtil.getTypeSize(clsName);
+                       if (paramLen == -1) {           // indefinite length
+                               paramLen = objBytesArr[i].length;
+                               byte[] paramLenBytes = IoTRMIUtil.intToByteArray(paramLen);
+                               System.arraycopy(paramLenBytes, 0, method, pos, IoTRMIUtil.PARAM_LEN);
+                               pos = pos + IoTRMIUtil.PARAM_LEN;
+                       }               
+                       System.arraycopy(objBytesArr[i], 0, method, pos, paramLen);
+                       pos = pos + paramLen;
+               }
+
+               return method;
+       }
+
+
+       /**
+        * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
+        */
+       /*public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
+
+               // Receive return value and return it to caller
+               Object[] retObj = null;
+               byte[] retObjBytes = null;
+               try {
+                       retObjBytes = rmiClientRecv.receiveBytes(retObjBytes);
+               } catch (IOException ex) {
+                       ex.printStackTrace();
+                       throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
+               }
+               retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
+
+               return retObj;
+       }*/
+
+
+       /**
+        * remoteCall() calls a method remotely by passing in parameters and getting a return Object
+        */
+       public Object[] getReturnObjects(byte[] retBytes, Class<?>[] arrCls, Class<?>[] arrGenValCls) {
+
+               // Byte scanning position
+               int pos = 0;
+               Object[] retObj = new Object[arrCls.length];
+               for (int i=0; i < arrCls.length; i++) {
+
+                       String retType = arrCls[i].getSimpleName();
+                       int retSize = rmiUtil.getTypeSize(retType);
+                       // Get the 32-bit field in the byte array to get the actual
+                       //              length (this is a param with indefinite length)
+                       if (retSize == -1) {
+                               byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN];
+                               System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN);
+                               pos = pos + IoTRMIUtil.RETURN_LEN;
+                               retSize = IoTRMIUtil.byteArrayToInt(bytRetLen);
+                       }
+                       byte[] retObjBytes = new byte[retSize];
+                       System.arraycopy(retBytes, pos, retObjBytes, 0, retSize);
+                       pos = pos + retSize;
+                       retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes);
+               }
+
+               return retObj;
+       }
+
+}
index dfbe9c6b348484a6dcc5bc16430571bf05c60e04..7ac2c3e36a7a766b20e054d261df533743728c87 100644 (file)
@@ -311,11 +311,16 @@ public class IoTRMIObject {
        /**
         * sendReturnObj() overloaded to send multiple return objects for structs
         */
-       public void sendReturnObj(Class<?>[] retCls, Object[] retObj) throws IOException {
+       public void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
 
                // Send back return value
                byte[] retObjBytes = returnToBytes(retCls, retObj);
-               rmiServer.sendBytes(retObjBytes);
+               // Send return value together with OBJECT_ID and METHOD_ID for arbitration
+               byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length];
+               // Copy OBJECT_ID and METHOD_ID
+               System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN);
+               // Copy array of bytes (return object)
+               rmiServer.sendBytes(retAllBytes);
        }
 
 
index c4d383b829ee253cc82fc62d3fa826acbd4baa6c..59b3a49bfc90223a0ffef7ee5c8fdc2096cabed2 100644 (file)
@@ -37,10 +37,13 @@ public class IoTRMIUtil {
        /**
         * Class Constants
         */
-       public final static int OBJECT_ID_LEN = 4;      // 4 bytes = 32 bits
-       public final static int METHOD_ID_LEN = 4;      // 4 bytes = 32 bits
-       public final static int PARAM_LEN = 4;          // 4 bytes = 32 bits (4-byte field that stores the length of the param)
-       public final static int RETURN_LEN = 4;         // 4 bytes = 32 bits (4-byte field that stores the length of the return object)
+       public final static int OBJECT_ID_LEN = 4;              // 4 bytes = 32 bits
+       public final static int METHOD_ID_LEN = 4;              // 4 bytes = 32 bits
+       public final static int PACKET_TYPE_LEN = 4;    // 4 bytes = 32 bits
+       public final static int PARAM_LEN = 4;                  // 4 bytes = 32 bits (4-byte field that stores the length of the param)
+       public final static int RETURN_LEN = 4;                 // 4 bytes = 32 bits (4-byte field that stores the length of the return object)
+       public final static int RET_VAL_TYPE = -1;              // Packet type of return value
+       public final static int METHOD_TYPE = 1;                // Packet type of method
 
        public final static int SHT_LEN = 2;
        public final static int INT_LEN = 4;
index 08e7325af64f2ae818337169cbc3c92edd2d92ca..2cdb2581d3cc24a40ba2b9ab97a2084f418303f6 100644 (file)
@@ -7,6 +7,8 @@ import java.awt.*;
 import java.util.*;
 import java.nio.ByteBuffer;
 
+import java.util.concurrent.Semaphore;
+
 
 /** Class IoTSocket is the basic class for IoT RMI
  *  socket communication. This class will be extended
@@ -32,6 +34,8 @@ public abstract class IoTSocket {
        protected BufferedInputStream input;
        protected BufferedOutputStream output;
 
+       //protected static Semaphore sendRecvMutex = new Semaphore(1);
+
        /**
         * Class Constant
         */
@@ -68,13 +72,15 @@ public abstract class IoTSocket {
                ByteBuffer bb = ByteBuffer.allocate(MSG_LEN_SIZE);
                bb.putInt(len);
                output.write(bb.array(), 0, MSG_LEN_SIZE);
+               System.out.println("Sender about to send: " + Arrays.toString(bb.array()));
                output.flush();
                // Write the byte array
                output.write(vals, 0, len);
+               System.out.println("Sender sending: " + len);
                output.flush();
-               //System.out.println("Sender about to receive ACK!");
+               System.out.println("Sender about to receive ACK!");
                receiveAck();
-               //System.out.println("Sender about to send ACK!\n\n");
+               System.out.println("Sender about to send ACK!\n\n");
                sendAck();
        }
 
@@ -89,14 +95,17 @@ public abstract class IoTSocket {
                int numbytes;
 
                // Wait until input is available
-//             while(input.available() == 0);
-               if (input.available() == 0)
+               if (input.available() == 0) {
                        return null;
+               }
 
+               System.out.println("Receiver about to receive: " + input.available());
                // Read the maxlen first - read 4 bytes here
                byte[] lenBytes = new byte[MSG_LEN_SIZE];
                input.read(lenBytes, 0, MSG_LEN_SIZE);
+               System.out.println("Receiver lenBytes: " + Arrays.toString(lenBytes));
                int maxlen = ByteBuffer.wrap(lenBytes).getInt();
+               System.out.println("Receiver received length: " + maxlen);
                // Receive until maxlen
                if (maxlen>BUFFSIZE) {
                        System.out.println("IoTSocketClient/Server: Sending more bytes then will fit in buffer! Number of bytes: " + maxlen);
@@ -119,9 +128,9 @@ public abstract class IoTSocket {
                }
                // we now send an acknowledgement to the server to let them
                // know we've got it
-               //System.out.println("Receiver about to send ACK!");
+               System.out.println("Receiver about to send ACK!");
                sendAck();
-               //System.out.println("Receiver about to receive ACK!\n\n");
+               System.out.println("Receiver about to receive ACK!\n\n");
                receiveAck();
 
                return val;
index 1fb79e2fa601e214113d04293e9c77a8ec6744f0..5f731daa5e549d8edc4031943887053e0519fc3c 100644 (file)
@@ -30,7 +30,8 @@ public class CallBack implements CallBackInterface {
        
        public void needCallback(TestClassComplete tc) {
 
-               System.out.println("Going to invoke getShort()!");
+               //System.out.println("Going to invoke getShort()!");
+               //for(int i=0; i<10; i++)
                System.out.println("Short from TestClass: " + tc.getShort((short)1234));
        }
 }
index 48748d0546c7ba5cac88e3832bef5e1c8195c153..f9666fe0f8a0668f658c0472a7ef6af139aa7f59 100644 (file)
@@ -40,11 +40,11 @@ public class TestClass implements TestClassInterface {
                System.out.println("Callback called! cblist: " + cblist.size());
                for (CallBackInterfaceWithCallBack cb : cblist) {
                        sum = sum + cb.printInt();
-                       //TestClass tci = new TestClass();
-                       cb.needCallback(this);
                        //cb.needCallback(this);
-                       //cb.needCallback(tci);
-                       System.out.println("Inside the loop!");
+                       TestClass tci = new TestClass();
+                       cb.needCallback(this);
+                       cb.needCallback(tci);
+                       System.out.println("\n\nInside the loop! Sum is now: " + sum + "\n\n");
                }
                System.out.println("Executed callback of callback! Returning value: " + sum + "\n\n");
                return sum;
index 81b20dd25dab5eedbe22daedb801bedb7b5730c0..1b0bc08d5f9fe51a7d14a8b554262e9e0b01ba4b 100644 (file)
@@ -9,10 +9,12 @@ public class TestClassCallbacks_Stub {
 
                CommunicationHandler comHan = new CommunicationHandler(true);
                int numOfPorts = 2;
-               int[] ports = comHan.getCallbackPorts(numOfPorts);
+               //int[] ports = comHan.getCallbackPorts(numOfPorts);
 
-               int localport = 5011;
-               int port = 5010;
+               int localportsend = 5011;
+               int localportrecv = 6011;
+               int portsend = 5000;
+               int portrecv = 6000;
                //String address = "localhost";
                //String address = "192.168.2.191";     // RPi2
                //String skeletonAddress = "128.195.136.170";   // dc-9.calit2.uci.edu
@@ -22,18 +24,22 @@ public class TestClassCallbacks_Stub {
                //String callbackAddress = "192.168.2.191";     // RPi2
                int rev = 0;
 
-               TestClassComplete_Stub tcstub = new TestClassComplete_Stub(localport, port, skeletonAddress, callbackAddress, rev, ports);
+               TestClassComplete_Stub tcstub = new TestClassComplete_Stub(localportsend, localportrecv, portsend, portrecv, 
+                       skeletonAddress, rev);
                System.out.println("==== CALLBACKS ====");
                CallBackInterface cbSingle = new CallBack(2354);
                tcstub.registerCallback(cbSingle);
                System.out.println("Registered callback!");
-               //CallBackInterface cbSingle1 = new CallBack(2356);
-               //tcstub.registerCallback(cbSingle1);
+               CallBackInterface cbSingle1 = new CallBack(2356);
+               tcstub.registerCallback(cbSingle1);
+               System.out.println("Registered callback!");
+               CallBackInterface cbSingle2 = new CallBack(2360);
+               tcstub.registerCallback(cbSingle2);
                System.out.println("Registered callback!");
 
                System.out.println("Return value from callback 1: " + tcstub.callBack() + "\n\n");
                //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n");
-               System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
+               //System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
                //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n");
                //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n");
                while(true) {}
index 76e2db7c89bc829925f23cd44356d97ec1cdd7c1..c85842d205bac945c03d5ac4b6d7cd9f22597613 100644 (file)
@@ -4,9 +4,10 @@ public class TestClass_Skeleton {
 
        public static void main(String[] args) throws Exception {
 
-               int port = 5010;
+               int portsend = 5000;
+               int portrecv = 6000;
                String callbackAddress = InetAddress.getLocalHost().getHostAddress();
                TestClass tc = new TestClass(3, 5f, "7911");
-               TestClassInterface_Skeleton tcSkel = new TestClassInterface_Skeleton(tc, callbackAddress, port);
+               TestClassInterface_Skeleton tcSkel = new TestClassInterface_Skeleton(tc, portsend, portrecv);
        }
 }