import java.util.concurrent.atomic.AtomicBoolean;
-/** Class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
+/** Abstract class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
* <p>
* We will arbitrate packets into 2 queues and wake up the right threads/callers.
* We separate traffics one-directionally.
* @version 1.0
* @since 2017-01-27
*/
-public class IoTRMIComm {
+public abstract class IoTRMIComm {
/**
* Class Properties
*/
- private List<String> listMethodId2Sign; // List of method signature (we use list index as method Id)
- private IoTRMIUtil rmiUtil;
- private IoTSocketServer rmiServerSend;
- private IoTSocketServer rmiServerRecv;
- private IoTSocketClient rmiClientSend;
- private IoTSocketClient rmiClientRecv;
- private byte[] methodBytes;
- private byte[] retValueBytes;
- private ConcurrentLinkedQueue<byte[]> methodQueue;
- private ConcurrentLinkedQueue<byte[]> returnQueue;
- private Map<Integer,AtomicBoolean> mapSkeletonId;
- private Map<String,AtomicBoolean> mapStubId;
- private AtomicBoolean didGetMethodBytes;
- private AtomicBoolean didGetReturnBytes;
- private AtomicBoolean didServerSendConnect;
- private AtomicBoolean didServerRecvConnect;
- private int objectIdCounter = Integer.MAX_VALUE;
+ protected IoTRMIUtil rmiUtil;
+ protected byte[] methodBytes;
+ protected byte[] retValueBytes;
+ protected ConcurrentLinkedQueue<byte[]> methodQueue;
+ protected ConcurrentLinkedQueue<byte[]> returnQueue;
+ protected Map<Integer,AtomicBoolean> mapSkeletonId;
+ protected Map<String,AtomicBoolean> mapStubId;
+ protected AtomicBoolean didGetMethodBytes;
+ protected AtomicBoolean didGetReturnBytes;
+ protected int objectIdCounter = Integer.MAX_VALUE;
/**
* Constructor (for skeleton)
*/
- public IoTRMIComm(int _portSend, int _portRecv) throws
+ public IoTRMIComm() throws
ClassNotFoundException, InstantiationException,
IllegalAccessException, IOException {
- didGetMethodBytes = new AtomicBoolean(false);
- didGetReturnBytes = new AtomicBoolean(false);
- didServerSendConnect = new AtomicBoolean(false);
- didServerRecvConnect = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
methodBytes = null;
retValueBytes = null;
returnQueue = new ConcurrentLinkedQueue<byte[]>();
mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
mapStubId = new HashMap<String,AtomicBoolean>();
- //rmiServerSend = new IoTSocketServer(_portSend);
- //rmiServerSend.connect();
- //rmiServerRecv = new IoTSocketServer(_portRecv);
- //rmiServerRecv.connect();
- rmiServerSend = new IoTSocketServer(_portSend);
- rmiServerRecv = new IoTSocketServer(_portRecv);
- waitForConnectionOnServerSend();
- waitForConnectionOnServerRecv();
- while(!didServerSendConnect.get()); // Wait until server is connected
- while(!didServerRecvConnect.get()); // Wait until server is connected
- rmiClientSend = null;
- rmiClientRecv = null;
- waitForPacketsOnServer();
- wakeUpThreadOnMethodCall();
- wakeUpThreadOnReturnValue();
- }
-
-
- /**
- * waitForConnectionOnServerSend() starts a thread that waits server connection
- */
- public void waitForConnectionOnServerSend() {
-
- Thread thread = new Thread() {
- public void run() {
- try {
- rmiServerSend.connect();
- didServerSendConnect.set(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * waitForConnectionOnServerRecv() starts a thread that waits server connection
- */
- public void waitForConnectionOnServerRecv() {
-
- Thread thread = new Thread() {
- public void run() {
- try {
- rmiServerRecv.connect();
- didServerRecvConnect.set(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * Constructor (for stub) - send and recv from the perspective of RMI socket servers
- */
- public IoTRMIComm(int _localPortSend, int _localPortRecv, int _portSend, int _portRecv, String _address, int _rev) throws
- ClassNotFoundException, InstantiationException,
- IllegalAccessException, IOException {
-
didGetMethodBytes = new AtomicBoolean(false);
didGetReturnBytes = new AtomicBoolean(false);
- didServerSendConnect = null;
- didServerRecvConnect = null;
- rmiUtil = new IoTRMIUtil();
- methodBytes = null;
- retValueBytes = null;
- methodQueue = new ConcurrentLinkedQueue<byte[]>();
- returnQueue = new ConcurrentLinkedQueue<byte[]>();
- mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
- mapStubId = new HashMap<String,AtomicBoolean>();
- rmiServerSend = null;
- rmiServerRecv = null;
- rmiClientRecv = new IoTSocketClient(_localPortSend, _portSend, _address, _rev);
- rmiClientSend = new IoTSocketClient(_localPortRecv, _portRecv, _address, _rev);
- waitForPacketsOnClient();
wakeUpThreadOnMethodCall();
wakeUpThreadOnReturnValue();
}
- /**
- * waitForPacketsOnServer() starts a thread that waits for packet bytes on server side
- */
- public void waitForPacketsOnServer() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] packetBytes = null;
- while(true) {
- try {
- packetBytes = rmiServerRecv.receiveBytes(packetBytes);
- if (packetBytes != null) {
- System.out.println("Packet received: " + Arrays.toString(packetBytes));
- int packetType = IoTRMIComm.getPacketType(packetBytes);
- if (packetType == IoTRMIUtil.METHOD_TYPE) {
- System.out.println("Method packet: " + Arrays.toString(packetBytes));
- methodQueue.offer(packetBytes);
- } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
- System.out.println("Return value packet: " + Arrays.toString(packetBytes));
- returnQueue.offer(packetBytes);
- } else
- throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
- } //else
- // Thread.sleep(100);
- packetBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * waitForPacketsOnClient() starts a thread that waits for packet bytes on client side
- */
- public void waitForPacketsOnClient() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] packetBytes = null;
- while(true) {
- try {
- packetBytes = rmiClientRecv.receiveBytes(packetBytes);
- if (packetBytes != null) {
- int packetType = IoTRMIComm.getPacketType(packetBytes);
- if (packetType == IoTRMIUtil.METHOD_TYPE) {
- System.out.println("Method packet: " + Arrays.toString(packetBytes));
- methodQueue.offer(packetBytes);
- } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
- System.out.println("Return value packet: " + Arrays.toString(packetBytes));
- returnQueue.offer(packetBytes);
- } else
- throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
- } //else
- // Thread.sleep(100);
- packetBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- }
- };
- thread.start();
- }
-
-
/**
* wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
*/
- public void wakeUpThreadOnMethodCall() {
+ private void wakeUpThreadOnMethodCall() {
Thread thread = new Thread() {
public void run() {
/**
* wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value
*/
- public void wakeUpThreadOnReturnValue() {
+ private void wakeUpThreadOnReturnValue() {
Thread thread = new Thread() {
public void run() {
// Take the current method from the queue and wake up the correct thread
retValueBytes = returnQueue.poll();
if (retValueBytes != null) { // If there is method bytes
- System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes));
int objectId = getObjectId(retValueBytes);
int methodId = getMethodId(retValueBytes);
String strKey = objectId + "-" + methodId;
AtomicBoolean retRecv = mapStubId.get(strKey);
- //System.out.println("boolean status: " + retRecv + " with key: " + strKey);
didGetReturnBytes.set(false);
while(!retRecv.compareAndSet(false, true));
- //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
while(!didGetReturnBytes.get()); // While skeleton is still processing
}
}
/**
* registerSkeleton() registers the skeleton to be woken up
-
*/
public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
String strKey = objectId + "-" + methodId;
- //System.out.println("Key exist? " + mapStubId.containsKey(strKey));
mapStubId.put(strKey, retValueReceived);
- //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
}
System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
// Get method Id
int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
- // Get method Id
return methodId;
}
/**
- * sendReturnObj() static version
+ * sendReturnObj() abstract version
*/
- public synchronized void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
-
- // Send back return value
- byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
- byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
- int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
- byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
- System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
- // Copy array of bytes (return object)
- System.arraycopy(retObjBytes, 0, retAllBytes, headerLen, retObjBytes.length);
- if (rmiServerSend == null)
- rmiClientSend.sendBytes(retAllBytes);
- else
- rmiServerSend.sendBytes(retAllBytes);
- }
+ public abstract void sendReturnObj(Object retObj, byte[] methodBytes);
/**
- * sendReturnObj() overloaded to send multiple return objects for structs
+ * sendReturnObj() abstract version
*/
- public synchronized void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
-
- // Send back return value
- byte[] retObjBytes = returnToBytes(retCls, retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
- byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
- int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
- byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
- System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
- // Copy array of bytes (return object)
- if (rmiServerSend == null)
- rmiClientSend.sendBytes(retAllBytes);
- else
- rmiServerSend.sendBytes(retAllBytes);
- }
+ public abstract void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes);
/**
/**
- * remoteCall() calls a method remotely by passing in parameters
- */
- public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
-
- // Send method info
- byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
- try {
- if (rmiClientSend == null)
- rmiServerSend.sendBytes(methodBytes);
- else
- rmiClientSend.sendBytes(methodBytes);
- } catch (IOException ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error when sending bytes in remoteCall()!");
- }
- }
-
-
- /**
- * getReturnValue() returns return value object
+ * remoteCall() abstract version
*/
- public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
-
- // Receive return value and return it to caller
- // Now just strip off the object ID and method ID
- int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
- int valByteLen = retValueBytes.length - headerLen;
- byte[] retValBytes = new byte[valByteLen];
- // Method Id is positioned after object Id in the byte array
- System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
- Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
- // This means the right object and method have gotten the return value, so we set this back to false
- return retObj;
- }
+ public abstract void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj);
/**
/**
- * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
+ * getReturnValue() returns return value object
*/
- /*public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
+ public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
// Receive return value and return it to caller
- Object[] retObj = null;
- byte[] retObjBytes = null;
- try {
- retObjBytes = rmiClientRecv.receiveBytes(retObjBytes);
- } catch (IOException ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
+ // Now just strip off the object ID and method ID
+ int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
+ int valByteLen = retValueBytes.length - headerLen;
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ Object retObj = null;
+ if (valByteLen != 0) {
+ System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
+ retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
}
- retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
+ // This means the right object and method have gotten the return value, so we set this back to false
+ return retObj;
+ }
+
+
+ /**
+ * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
+ */
+ public Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
+
+ // Receive return value and return it to caller
+ // Now just strip off the object ID and method ID
+ int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
+ int valByteLen = retValueBytes.length - headerLen;
+ byte[] retValBytes = new byte[valByteLen];
+ // Method Id is positioned after object Id in the byte array
+ System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
+ Object[] retObj = getReturnObjects(retValBytes, retType, retGenTypeVal);
return retObj;
- }*/
+ }
/**