From: rtrimana Date: Sat, 28 Jan 2017 00:15:22 +0000 (-0800) Subject: First version of skeleton-stub communication using queue and 2 sockets (one send... X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=01d6adb8883a99ed944eca1d7b74b6f7e180e869;p=iot2.git First version of skeleton-stub communication using queue and 2 sockets (one send and one receive) as this involves multi-threading and Java socket is NOT thread safegit status! --- diff --git a/iotjava/iotrmi/Java/IoTRMIComm.java b/iotjava/iotrmi/Java/IoTRMIComm.java new file mode 100644 index 0000000..0be3a25 --- /dev/null +++ b/iotjava/iotrmi/Java/IoTRMIComm.java @@ -0,0 +1,675 @@ +package iotrmi.Java; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.lang.reflect.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** Class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall + *

+ * We will arbitrate packets into 2 queues and wake up the right threads/callers. + * We separate traffics one-directionally. + * + * @author Rahmadi Trimananda + * @version 1.0 + * @since 2017-01-27 + */ +public class IoTRMIComm { + + /** + * Class Properties + */ + private List listMethodId2Sign; // List of method signature (we use list index as method Id) + private IoTRMIUtil rmiUtil; + private IoTSocketServer rmiServerSend; + private IoTSocketServer rmiServerRecv; + private IoTSocketClient rmiClientSend; + private IoTSocketClient rmiClientRecv; + private byte[] methodBytes; + private byte[] retValueBytes; + private ConcurrentLinkedQueue methodQueue; + private ConcurrentLinkedQueue returnQueue; + private Map mapSkeletonId; + private Map mapStubId; + private AtomicBoolean didGetMethodBytes; + private AtomicBoolean didGetReturnBytes; + private AtomicBoolean didServerSendConnect; + private AtomicBoolean didServerRecvConnect; + private int objectIdCounter = Integer.MAX_VALUE; + + /** + * Constructor (for skeleton) + */ + public IoTRMIComm(int _portSend, int _portRecv) throws + ClassNotFoundException, InstantiationException, + IllegalAccessException, IOException { + + didGetMethodBytes = new AtomicBoolean(false); + didGetReturnBytes = new AtomicBoolean(false); + didServerSendConnect = new AtomicBoolean(false); + didServerRecvConnect = new AtomicBoolean(false); + rmiUtil = new IoTRMIUtil(); + methodBytes = null; + retValueBytes = null; + methodQueue = new ConcurrentLinkedQueue(); + returnQueue = new ConcurrentLinkedQueue(); + mapSkeletonId = new HashMap(); + mapStubId = new HashMap(); + //rmiServerSend = new IoTSocketServer(_portSend); + //rmiServerSend.connect(); + //rmiServerRecv = new IoTSocketServer(_portRecv); + //rmiServerRecv.connect(); + rmiServerSend = new IoTSocketServer(_portSend); + rmiServerRecv = new IoTSocketServer(_portRecv); + waitForConnectionOnServerSend(); + waitForConnectionOnServerRecv(); + while(!didServerSendConnect.get()); // Wait until server is connected + while(!didServerRecvConnect.get()); // Wait until server is connected + rmiClientSend = null; + rmiClientRecv = null; + waitForPacketsOnServer(); + wakeUpThreadOnMethodCall(); + wakeUpThreadOnReturnValue(); + } + + + /** + * waitForConnectionOnServerSend() starts a thread that waits server connection + */ + public void waitForConnectionOnServerSend() { + + Thread thread = new Thread() { + public void run() { + try { + rmiServerSend.connect(); + didServerSendConnect.set(true); + } catch (Exception ex) { + ex.printStackTrace(); + throw new Error("IoTRMIComm: Error receiving return value bytes on client!"); + } + } + }; + thread.start(); + } + + + /** + * waitForConnectionOnServerRecv() starts a thread that waits server connection + */ + public void waitForConnectionOnServerRecv() { + + Thread thread = new Thread() { + public void run() { + try { + rmiServerRecv.connect(); + didServerRecvConnect.set(true); + } catch (Exception ex) { + ex.printStackTrace(); + throw new Error("IoTRMIComm: Error receiving return value bytes on client!"); + } + } + }; + thread.start(); + } + + + /** + * Constructor (for stub) - send and recv from the perspective of RMI socket servers + */ + public IoTRMIComm(int _localPortSend, int _localPortRecv, int _portSend, int _portRecv, String _address, int _rev) throws + ClassNotFoundException, InstantiationException, + IllegalAccessException, IOException { + + didGetMethodBytes = new AtomicBoolean(false); + didGetReturnBytes = new AtomicBoolean(false); + didServerSendConnect = null; + didServerRecvConnect = null; + rmiUtil = new IoTRMIUtil(); + methodBytes = null; + retValueBytes = null; + methodQueue = new ConcurrentLinkedQueue(); + returnQueue = new ConcurrentLinkedQueue(); + mapSkeletonId = new HashMap(); + mapStubId = new HashMap(); + rmiServerSend = null; + rmiServerRecv = null; + rmiClientRecv = new IoTSocketClient(_localPortSend, _portSend, _address, _rev); + rmiClientSend = new IoTSocketClient(_localPortRecv, _portRecv, _address, _rev); + waitForPacketsOnClient(); + wakeUpThreadOnMethodCall(); + wakeUpThreadOnReturnValue(); + } + + + /** + * waitForPacketsOnServer() starts a thread that waits for packet bytes on server side + */ + public void waitForPacketsOnServer() { + + Thread thread = new Thread() { + public void run() { + byte[] packetBytes = null; + while(true) { + try { + packetBytes = rmiServerRecv.receiveBytes(packetBytes); + if (packetBytes != null) { + System.out.println("Packet received: " + Arrays.toString(packetBytes)); + int packetType = IoTRMIComm.getPacketType(packetBytes); + if (packetType == IoTRMIUtil.METHOD_TYPE) { + System.out.println("Method packet: " + Arrays.toString(packetBytes)); + methodQueue.offer(packetBytes); + } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) { + System.out.println("Return value packet: " + Arrays.toString(packetBytes)); + returnQueue.offer(packetBytes); + } else + throw new Error("IoTRMIComm: Packet type is unknown: " + packetType); + } //else + // Thread.sleep(100); + packetBytes = null; + } catch (Exception ex) { + ex.printStackTrace(); + throw new Error("IoTRMIComm: Error receiving return value bytes on client!"); + } + } + } + }; + thread.start(); + } + + + /** + * waitForPacketsOnClient() starts a thread that waits for packet bytes on client side + */ + public void waitForPacketsOnClient() { + + Thread thread = new Thread() { + public void run() { + byte[] packetBytes = null; + while(true) { + try { + packetBytes = rmiClientRecv.receiveBytes(packetBytes); + if (packetBytes != null) { + int packetType = IoTRMIComm.getPacketType(packetBytes); + if (packetType == IoTRMIUtil.METHOD_TYPE) { + System.out.println("Method packet: " + Arrays.toString(packetBytes)); + methodQueue.offer(packetBytes); + } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) { + System.out.println("Return value packet: " + Arrays.toString(packetBytes)); + returnQueue.offer(packetBytes); + } else + throw new Error("IoTRMIComm: Packet type is unknown: " + packetType); + } //else + // Thread.sleep(100); + packetBytes = null; + } catch (Exception ex) { + ex.printStackTrace(); + throw new Error("IoTRMIComm: Error receiving return value bytes on client!"); + } + } + } + }; + thread.start(); + } + + + /** + * wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call + */ + public void wakeUpThreadOnMethodCall() { + + Thread thread = new Thread() { + public void run() { + while(true) { + // Take the current method from the queue and wake up the correct thread + methodBytes = methodQueue.poll(); + if (methodBytes != null) { // If there is method bytes + int currObjId = getObjectId(methodBytes); + AtomicBoolean methRecv = mapSkeletonId.get(currObjId); + didGetMethodBytes.set(false); + while(!methRecv.compareAndSet(false, true)); + while(!didGetMethodBytes.get()); // While skeleton is still processing + } + } + } + }; + thread.start(); + } + + + /** + * wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value + */ + public void wakeUpThreadOnReturnValue() { + + Thread thread = new Thread() { + public void run() { + while(true) { + // Take the current method from the queue and wake up the correct thread + retValueBytes = returnQueue.poll(); + if (retValueBytes != null) { // If there is method bytes + System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes)); + int objectId = getObjectId(retValueBytes); + int methodId = getMethodId(retValueBytes); + String strKey = objectId + "-" + methodId; + AtomicBoolean retRecv = mapStubId.get(strKey); + //System.out.println("boolean status: " + retRecv + " with key: " + strKey); + didGetReturnBytes.set(false); + while(!retRecv.compareAndSet(false, true)); + //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size()); + while(!didGetReturnBytes.get()); // While skeleton is still processing + } + } + } + }; + thread.start(); + } + + + /** + * registerSkeleton() registers the skeleton to be woken up + + */ + public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) { + + mapSkeletonId.put(objectId, methodReceived); + } + + + /** + * registerStub() registers the skeleton to be woken up + */ + public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) { + + String strKey = objectId + "-" + methodId; + //System.out.println("Key exist? " + mapStubId.containsKey(strKey)); + mapStubId.put(strKey, retValueReceived); + //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n"); + } + + + /** + * getObjectIdCounter() gets object Id counter + */ + public int getObjectIdCounter() { + + return objectIdCounter; + } + + + /** + * setObjectIdCounter() sets object Id counter + */ + public void setObjectIdCounter(int objIdCounter) { + + objectIdCounter = objIdCounter; + } + + + /** + * decrementObjectIdCounter() gets object Id counter + */ + public void decrementObjectIdCounter() { + + objectIdCounter--; + } + + + /** + * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes + */ + public boolean setGetMethodBytes() { + + return didGetMethodBytes.compareAndSet(false, true); + } + + + /** + * setGetReturnBytes() set didGetReturnBytes if there is a new return value already + */ + public synchronized boolean setGetReturnBytes() { + + return didGetReturnBytes.compareAndSet(false, true); + } + + + /** + * getMethodBytes() get the method in bytes + */ + public byte[] getMethodBytes() throws IOException { + + // Just return the methodBytes content + return methodBytes; + } + + + /** + * static version of getObjectId() + */ + public static int getObjectId(byte[] packetBytes) { + + // Get object Id bytes + byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN]; + System.arraycopy(packetBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN); + // Get object Id + int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes); + return objectId; + } + + + /** + * static version of getMethodId() + */ + public static int getMethodId(byte[] packetBytes) { + + // Get method Id bytes + byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN]; + // Method Id is positioned after object Id in the byte array + System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN); + // Get method Id + int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes); + // Get method Id + return methodId; + } + + + /** + * static version of getPacketType() - either method or return value (position is after object Id and method Id) + */ + public static int getPacketType(byte[] packetBytes) { + + // Get packet type bytes + byte[] packetTypeBytes = new byte[IoTRMIUtil.PACKET_TYPE_LEN]; + int offset = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN; + System.arraycopy(packetBytes, offset, packetTypeBytes, 0, IoTRMIUtil.PACKET_TYPE_LEN); + // Get packet type (for now we assume 1 as method and -1 as return value + int packetType = IoTRMIUtil.byteArrayToInt(packetTypeBytes); + return packetType; + } + + + /** + * getMethodParams() gets method params based on byte array received + *

+ * Basically this is the format of a method in bytes: + * 1) 32-bit value of object ID + * 2) 32-bit value of method ID + * 3) m parameters with n-bit value each (m x n-bit) + * For the parameters that don't have definite length, + * we need to extract the length from a preceding 32-bit + * field in front of it. + * + * For primitive objects: + * | 32-bit object ID | 32-bit method ID | m-bit actual data (fixed length) | ... + * + * For string, arrays, and non-primitive objects: + * | 32-bit object ID | 32-bit method ID | 32-bit length | n-bit actual data | ... + * + */ + public Object[] getMethodParams(Class[] arrCls, Class[] arrGenValCls, byte[] methodBytes) { + + // Byte scanning position + int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN; + Object[] paramObj = new Object[arrCls.length]; + for (int i=0; i < arrCls.length; i++) { + + String paramType = arrCls[i].getSimpleName(); + int paramSize = rmiUtil.getTypeSize(paramType); + // Get the 32-bit field in the byte array to get the actual + // length (this is a param with indefinite length) + if (paramSize == -1) { + byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN]; + System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN); + pos = pos + IoTRMIUtil.PARAM_LEN; + paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen); + } + byte[] paramBytes = new byte[paramSize]; + System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize); + pos = pos + paramSize; + paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes); + } + + return paramObj; + } + + + /** + * sendReturnObj() static version + */ + public synchronized void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException { + + // Send back return value + byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj); + // Send return value together with OBJECT_ID and METHOD_ID for arbitration + int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN; + int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN; + byte[] retAllBytes = new byte[headerLen + retObjBytes.length]; + // Copy OBJECT_ID and METHOD_ID + System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen); + int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value + byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType); + System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN); + // Copy array of bytes (return object) + System.arraycopy(retObjBytes, 0, retAllBytes, headerLen, retObjBytes.length); + if (rmiServerSend == null) + rmiClientSend.sendBytes(retAllBytes); + else + rmiServerSend.sendBytes(retAllBytes); + } + + + /** + * sendReturnObj() overloaded to send multiple return objects for structs + */ + public synchronized void sendReturnObj(Class[] retCls, Object[] retObj, byte[] methodBytes) throws IOException { + + // Send back return value + byte[] retObjBytes = returnToBytes(retCls, retObj); + // Send return value together with OBJECT_ID and METHOD_ID for arbitration + int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN; + int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN; + byte[] retAllBytes = new byte[headerLen + retObjBytes.length]; + // Copy OBJECT_ID and METHOD_ID + System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen); + int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value + byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType); + System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN); + // Copy array of bytes (return object) + if (rmiServerSend == null) + rmiClientSend.sendBytes(retAllBytes); + else + rmiServerSend.sendBytes(retAllBytes); + } + + + /** + * returnToBytes() takes array of objects and generates bytes + */ + public byte[] returnToBytes(Class[] retCls, Object[] retObj) { + + // Get byte arrays and calculate method bytes length + int numbRet = retObj.length; + int retLen = 0; + byte[][] objBytesArr = new byte[numbRet][]; + for (int i = 0; i < numbRet; i++) { + // Get byte arrays for the objects + objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]); + String clsName = retCls[i].getSimpleName(); + int retObjLen = rmiUtil.getTypeSize(clsName); + if (retObjLen == -1) { // indefinite length - store the length first + retLen = retLen + IoTRMIUtil.RETURN_LEN; + } + retLen = retLen + objBytesArr[i].length; + } + // Construct return in byte array + byte[] retBytes = new byte[retLen]; + int pos = 0; + // Iteration for copying bytes + for (int i = 0; i < numbRet; i++) { + + String clsName = retCls[i].getSimpleName(); + int retObjLen = rmiUtil.getTypeSize(clsName); + if (retObjLen == -1) { // indefinite length + retObjLen = objBytesArr[i].length; + byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen); + System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN); + pos = pos + IoTRMIUtil.RETURN_LEN; + } + System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen); + pos = pos + retObjLen; + } + + return retBytes; + } + + + /** + * remoteCall() calls a method remotely by passing in parameters + */ + public synchronized void remoteCall(int objectId, int methodId, Class[] paramCls, Object[] paramObj) { + + // Send method info + byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj); + try { + if (rmiClientSend == null) + rmiServerSend.sendBytes(methodBytes); + else + rmiClientSend.sendBytes(methodBytes); + } catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IoTRMICall: Error when sending bytes in remoteCall()!"); + } + } + + + /** + * getReturnValue() returns return value object + */ + public Object getReturnValue(Class retType, Class retGenTypeVal) { + + // Receive return value and return it to caller + // Now just strip off the object ID and method ID + int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN; + int valByteLen = retValueBytes.length - headerLen; + byte[] retValBytes = new byte[valByteLen]; + // Method Id is positioned after object Id in the byte array + System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen); + Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes); + // This means the right object and method have gotten the return value, so we set this back to false + return retObj; + } + + + /** + * methodToBytes() returns byte representation of a method + */ + public byte[] methodToBytes(int objectId, int methId, Class[] paramCls, Object[] paramObj) { + + // Initialized to the length of method ID + int methodLen = IoTRMIUtil.OBJECT_ID_LEN; + byte[] objId = IoTRMIUtil.intToByteArray(objectId); + // Get method ID in bytes + byte[] methodId = IoTRMIUtil.intToByteArray(methId); + // Get byte arrays and calculate method bytes length + int numbParam = paramObj.length; + methodLen = methodLen + IoTRMIUtil.METHOD_ID_LEN; + methodLen = methodLen + IoTRMIUtil.PACKET_TYPE_LEN; + byte[][] objBytesArr = new byte[numbParam][]; + for (int i = 0; i < numbParam; i++) { + // Get byte arrays for the objects + objBytesArr[i] = IoTRMIUtil.getObjectBytes(paramObj[i]); + String clsName = paramCls[i].getSimpleName(); + int paramLen = rmiUtil.getTypeSize(clsName); + if (paramLen == -1) { // indefinite length - store the length first + methodLen = methodLen + IoTRMIUtil.PARAM_LEN; + } + methodLen = methodLen + objBytesArr[i].length; + } + // Construct method in byte array + byte[] method = new byte[methodLen]; + int pos = 0; + System.arraycopy(objId, 0, method, 0, IoTRMIUtil.METHOD_ID_LEN); + pos = pos + IoTRMIUtil.OBJECT_ID_LEN; + System.arraycopy(methodId, 0, method, pos, IoTRMIUtil.METHOD_ID_LEN); + pos = pos + IoTRMIUtil.METHOD_ID_LEN; + int packetType = IoTRMIUtil.METHOD_TYPE; // This is a method + byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType); + System.arraycopy(packetTypeBytes, 0, method, pos, IoTRMIUtil.PACKET_TYPE_LEN); + pos = pos + IoTRMIUtil.PACKET_TYPE_LEN; + // Second iteration for copying bytes + for (int i = 0; i < numbParam; i++) { + + String clsName = paramCls[i].getSimpleName(); + int paramLen = rmiUtil.getTypeSize(clsName); + if (paramLen == -1) { // indefinite length + paramLen = objBytesArr[i].length; + byte[] paramLenBytes = IoTRMIUtil.intToByteArray(paramLen); + System.arraycopy(paramLenBytes, 0, method, pos, IoTRMIUtil.PARAM_LEN); + pos = pos + IoTRMIUtil.PARAM_LEN; + } + System.arraycopy(objBytesArr[i], 0, method, pos, paramLen); + pos = pos + paramLen; + } + + return method; + } + + + /** + * getStructObjects() calls a method remotely by passing in parameters and getting a return Object + */ + /*public synchronized Object[] getStructObjects(Class[] retType, Class[] retGenTypeVal) { + + // Receive return value and return it to caller + Object[] retObj = null; + byte[] retObjBytes = null; + try { + retObjBytes = rmiClientRecv.receiveBytes(retObjBytes); + } catch (IOException ex) { + ex.printStackTrace(); + throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()"); + } + retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal); + + return retObj; + }*/ + + + /** + * remoteCall() calls a method remotely by passing in parameters and getting a return Object + */ + public Object[] getReturnObjects(byte[] retBytes, Class[] arrCls, Class[] arrGenValCls) { + + // Byte scanning position + int pos = 0; + Object[] retObj = new Object[arrCls.length]; + for (int i=0; i < arrCls.length; i++) { + + String retType = arrCls[i].getSimpleName(); + int retSize = rmiUtil.getTypeSize(retType); + // Get the 32-bit field in the byte array to get the actual + // length (this is a param with indefinite length) + if (retSize == -1) { + byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN]; + System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN); + pos = pos + IoTRMIUtil.RETURN_LEN; + retSize = IoTRMIUtil.byteArrayToInt(bytRetLen); + } + byte[] retObjBytes = new byte[retSize]; + System.arraycopy(retBytes, pos, retObjBytes, 0, retSize); + pos = pos + retSize; + retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes); + } + + return retObj; + } + +} diff --git a/iotjava/iotrmi/Java/IoTRMIObject.java b/iotjava/iotrmi/Java/IoTRMIObject.java index dfbe9c6..7ac2c3e 100644 --- a/iotjava/iotrmi/Java/IoTRMIObject.java +++ b/iotjava/iotrmi/Java/IoTRMIObject.java @@ -311,11 +311,16 @@ public class IoTRMIObject { /** * sendReturnObj() overloaded to send multiple return objects for structs */ - public void sendReturnObj(Class[] retCls, Object[] retObj) throws IOException { + public void sendReturnObj(Class[] retCls, Object[] retObj, byte[] methodBytes) throws IOException { // Send back return value byte[] retObjBytes = returnToBytes(retCls, retObj); - rmiServer.sendBytes(retObjBytes); + // Send return value together with OBJECT_ID and METHOD_ID for arbitration + byte[] retAllBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + retObjBytes.length]; + // Copy OBJECT_ID and METHOD_ID + System.arraycopy(methodBytes, 0, retAllBytes, 0, IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN); + // Copy array of bytes (return object) + rmiServer.sendBytes(retAllBytes); } diff --git a/iotjava/iotrmi/Java/IoTRMIUtil.java b/iotjava/iotrmi/Java/IoTRMIUtil.java index c4d383b..59b3a49 100644 --- a/iotjava/iotrmi/Java/IoTRMIUtil.java +++ b/iotjava/iotrmi/Java/IoTRMIUtil.java @@ -37,10 +37,13 @@ public class IoTRMIUtil { /** * Class Constants */ - public final static int OBJECT_ID_LEN = 4; // 4 bytes = 32 bits - public final static int METHOD_ID_LEN = 4; // 4 bytes = 32 bits - public final static int PARAM_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the param) - public final static int RETURN_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the return object) + public final static int OBJECT_ID_LEN = 4; // 4 bytes = 32 bits + public final static int METHOD_ID_LEN = 4; // 4 bytes = 32 bits + public final static int PACKET_TYPE_LEN = 4; // 4 bytes = 32 bits + public final static int PARAM_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the param) + public final static int RETURN_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the return object) + public final static int RET_VAL_TYPE = -1; // Packet type of return value + public final static int METHOD_TYPE = 1; // Packet type of method public final static int SHT_LEN = 2; public final static int INT_LEN = 4; diff --git a/iotjava/iotrmi/Java/IoTSocket.java b/iotjava/iotrmi/Java/IoTSocket.java index 08e7325..2cdb258 100644 --- a/iotjava/iotrmi/Java/IoTSocket.java +++ b/iotjava/iotrmi/Java/IoTSocket.java @@ -7,6 +7,8 @@ import java.awt.*; import java.util.*; import java.nio.ByteBuffer; +import java.util.concurrent.Semaphore; + /** Class IoTSocket is the basic class for IoT RMI * socket communication. This class will be extended @@ -32,6 +34,8 @@ public abstract class IoTSocket { protected BufferedInputStream input; protected BufferedOutputStream output; + //protected static Semaphore sendRecvMutex = new Semaphore(1); + /** * Class Constant */ @@ -68,13 +72,15 @@ public abstract class IoTSocket { ByteBuffer bb = ByteBuffer.allocate(MSG_LEN_SIZE); bb.putInt(len); output.write(bb.array(), 0, MSG_LEN_SIZE); + System.out.println("Sender about to send: " + Arrays.toString(bb.array())); output.flush(); // Write the byte array output.write(vals, 0, len); + System.out.println("Sender sending: " + len); output.flush(); - //System.out.println("Sender about to receive ACK!"); + System.out.println("Sender about to receive ACK!"); receiveAck(); - //System.out.println("Sender about to send ACK!\n\n"); + System.out.println("Sender about to send ACK!\n\n"); sendAck(); } @@ -89,14 +95,17 @@ public abstract class IoTSocket { int numbytes; // Wait until input is available -// while(input.available() == 0); - if (input.available() == 0) + if (input.available() == 0) { return null; + } + System.out.println("Receiver about to receive: " + input.available()); // Read the maxlen first - read 4 bytes here byte[] lenBytes = new byte[MSG_LEN_SIZE]; input.read(lenBytes, 0, MSG_LEN_SIZE); + System.out.println("Receiver lenBytes: " + Arrays.toString(lenBytes)); int maxlen = ByteBuffer.wrap(lenBytes).getInt(); + System.out.println("Receiver received length: " + maxlen); // Receive until maxlen if (maxlen>BUFFSIZE) { System.out.println("IoTSocketClient/Server: Sending more bytes then will fit in buffer! Number of bytes: " + maxlen); @@ -119,9 +128,9 @@ public abstract class IoTSocket { } // we now send an acknowledgement to the server to let them // know we've got it - //System.out.println("Receiver about to send ACK!"); + System.out.println("Receiver about to send ACK!"); sendAck(); - //System.out.println("Receiver about to receive ACK!\n\n"); + System.out.println("Receiver about to receive ACK!\n\n"); receiveAck(); return val; diff --git a/iotjava/iotrmi/Java/basics/CallBack.java b/iotjava/iotrmi/Java/basics/CallBack.java index 1fb79e2..5f731da 100644 --- a/iotjava/iotrmi/Java/basics/CallBack.java +++ b/iotjava/iotrmi/Java/basics/CallBack.java @@ -30,7 +30,8 @@ public class CallBack implements CallBackInterface { public void needCallback(TestClassComplete tc) { - System.out.println("Going to invoke getShort()!"); + //System.out.println("Going to invoke getShort()!"); + //for(int i=0; i<10; i++) System.out.println("Short from TestClass: " + tc.getShort((short)1234)); } } diff --git a/iotjava/iotrmi/Java/basics/TestClass.java b/iotjava/iotrmi/Java/basics/TestClass.java index 48748d0..f9666fe 100644 --- a/iotjava/iotrmi/Java/basics/TestClass.java +++ b/iotjava/iotrmi/Java/basics/TestClass.java @@ -40,11 +40,11 @@ public class TestClass implements TestClassInterface { System.out.println("Callback called! cblist: " + cblist.size()); for (CallBackInterfaceWithCallBack cb : cblist) { sum = sum + cb.printInt(); - //TestClass tci = new TestClass(); - cb.needCallback(this); //cb.needCallback(this); - //cb.needCallback(tci); - System.out.println("Inside the loop!"); + TestClass tci = new TestClass(); + cb.needCallback(this); + cb.needCallback(tci); + System.out.println("\n\nInside the loop! Sum is now: " + sum + "\n\n"); } System.out.println("Executed callback of callback! Returning value: " + sum + "\n\n"); return sum; diff --git a/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java b/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java index 81b20dd..1b0bc08 100644 --- a/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java +++ b/iotjava/iotrmi/Java/basics/TestClassCallbacks_Stub.java @@ -9,10 +9,12 @@ public class TestClassCallbacks_Stub { CommunicationHandler comHan = new CommunicationHandler(true); int numOfPorts = 2; - int[] ports = comHan.getCallbackPorts(numOfPorts); + //int[] ports = comHan.getCallbackPorts(numOfPorts); - int localport = 5011; - int port = 5010; + int localportsend = 5011; + int localportrecv = 6011; + int portsend = 5000; + int portrecv = 6000; //String address = "localhost"; //String address = "192.168.2.191"; // RPi2 //String skeletonAddress = "128.195.136.170"; // dc-9.calit2.uci.edu @@ -22,18 +24,22 @@ public class TestClassCallbacks_Stub { //String callbackAddress = "192.168.2.191"; // RPi2 int rev = 0; - TestClassComplete_Stub tcstub = new TestClassComplete_Stub(localport, port, skeletonAddress, callbackAddress, rev, ports); + TestClassComplete_Stub tcstub = new TestClassComplete_Stub(localportsend, localportrecv, portsend, portrecv, + skeletonAddress, rev); System.out.println("==== CALLBACKS ===="); CallBackInterface cbSingle = new CallBack(2354); tcstub.registerCallback(cbSingle); System.out.println("Registered callback!"); - //CallBackInterface cbSingle1 = new CallBack(2356); - //tcstub.registerCallback(cbSingle1); + CallBackInterface cbSingle1 = new CallBack(2356); + tcstub.registerCallback(cbSingle1); + System.out.println("Registered callback!"); + CallBackInterface cbSingle2 = new CallBack(2360); + tcstub.registerCallback(cbSingle2); System.out.println("Registered callback!"); System.out.println("Return value from callback 1: " + tcstub.callBack() + "\n\n"); //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n"); - System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n"); + //System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n"); //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n"); //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n"); while(true) {} diff --git a/iotjava/iotrmi/Java/basics/TestClass_Skeleton.java b/iotjava/iotrmi/Java/basics/TestClass_Skeleton.java index 76e2db7..c85842d 100644 --- a/iotjava/iotrmi/Java/basics/TestClass_Skeleton.java +++ b/iotjava/iotrmi/Java/basics/TestClass_Skeleton.java @@ -4,9 +4,10 @@ public class TestClass_Skeleton { public static void main(String[] args) throws Exception { - int port = 5010; + int portsend = 5000; + int portrecv = 6000; String callbackAddress = InetAddress.getLocalHost().getHostAddress(); TestClass tc = new TestClass(3, 5f, "7911"); - TestClassInterface_Skeleton tcSkel = new TestClassInterface_Skeleton(tc, callbackAddress, port); + TestClassInterface_Skeleton tcSkel = new TestClassInterface_Skeleton(tc, portsend, portrecv); } }