import java.util.concurrent.atomic.AtomicBoolean;
-/** Class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
+/** Abstract class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
* <p>
* We will arbitrate packets into 2 queues and wake up the right threads/callers.
* We separate traffics one-directionally.
* @version 1.0
* @since 2017-01-27
*/
-public class IoTRMIComm {
+public abstract class IoTRMIComm {
/**
* Class Properties
*/
- private List<String> listMethodId2Sign; // List of method signature (we use list index as method Id)
- private IoTRMIUtil rmiUtil;
- private IoTSocketServer rmiServerSend;
- private IoTSocketServer rmiServerRecv;
- private IoTSocketClient rmiClientSend;
- private IoTSocketClient rmiClientRecv;
- private byte[] methodBytes;
- private byte[] retValueBytes;
- private ConcurrentLinkedQueue<byte[]> methodQueue;
- private ConcurrentLinkedQueue<byte[]> returnQueue;
- private Map<Integer,AtomicBoolean> mapSkeletonId;
- private Map<String,AtomicBoolean> mapStubId;
- private AtomicBoolean didGetMethodBytes;
- private AtomicBoolean didGetReturnBytes;
- private AtomicBoolean didServerSendConnect;
- private AtomicBoolean didServerRecvConnect;
- private int objectIdCounter = Integer.MAX_VALUE;
+ protected IoTRMIUtil rmiUtil;
+ protected byte[] methodBytes;
+ protected byte[] retValueBytes;
+ protected ConcurrentLinkedQueue<byte[]> methodQueue;
+ protected ConcurrentLinkedQueue<byte[]> returnQueue;
+ protected Map<Integer,AtomicBoolean> mapSkeletonId;
+ protected Map<String,AtomicBoolean> mapStubId;
+ protected AtomicBoolean didGetMethodBytes;
+ protected AtomicBoolean didGetReturnBytes;
+ protected int objectIdCounter = Integer.MAX_VALUE;
/**
* Constructor (for skeleton)
*/
- public IoTRMIComm(int _portSend, int _portRecv) throws
+ public IoTRMIComm() throws
ClassNotFoundException, InstantiationException,
IllegalAccessException, IOException {
- didGetMethodBytes = new AtomicBoolean(false);
- didGetReturnBytes = new AtomicBoolean(false);
- didServerSendConnect = new AtomicBoolean(false);
- didServerRecvConnect = new AtomicBoolean(false);
rmiUtil = new IoTRMIUtil();
methodBytes = null;
retValueBytes = null;
returnQueue = new ConcurrentLinkedQueue<byte[]>();
mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
mapStubId = new HashMap<String,AtomicBoolean>();
- //rmiServerSend = new IoTSocketServer(_portSend);
- //rmiServerSend.connect();
- //rmiServerRecv = new IoTSocketServer(_portRecv);
- //rmiServerRecv.connect();
- rmiServerSend = new IoTSocketServer(_portSend);
- rmiServerRecv = new IoTSocketServer(_portRecv);
- waitForConnectionOnServerSend();
- waitForConnectionOnServerRecv();
- while(!didServerSendConnect.get()); // Wait until server is connected
- while(!didServerRecvConnect.get()); // Wait until server is connected
- rmiClientSend = null;
- rmiClientRecv = null;
- waitForPacketsOnServer();
- wakeUpThreadOnMethodCall();
- wakeUpThreadOnReturnValue();
- }
-
-
- /**
- * waitForConnectionOnServerSend() starts a thread that waits server connection
- */
- public void waitForConnectionOnServerSend() {
-
- Thread thread = new Thread() {
- public void run() {
- try {
- rmiServerSend.connect();
- didServerSendConnect.set(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * waitForConnectionOnServerRecv() starts a thread that waits server connection
- */
- public void waitForConnectionOnServerRecv() {
-
- Thread thread = new Thread() {
- public void run() {
- try {
- rmiServerRecv.connect();
- didServerRecvConnect.set(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * Constructor (for stub) - send and recv from the perspective of RMI socket servers
- */
- public IoTRMIComm(int _localPortSend, int _localPortRecv, int _portSend, int _portRecv, String _address, int _rev) throws
- ClassNotFoundException, InstantiationException,
- IllegalAccessException, IOException {
-
didGetMethodBytes = new AtomicBoolean(false);
didGetReturnBytes = new AtomicBoolean(false);
- didServerSendConnect = null;
- didServerRecvConnect = null;
- rmiUtil = new IoTRMIUtil();
- methodBytes = null;
- retValueBytes = null;
- methodQueue = new ConcurrentLinkedQueue<byte[]>();
- returnQueue = new ConcurrentLinkedQueue<byte[]>();
- mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
- mapStubId = new HashMap<String,AtomicBoolean>();
- rmiServerSend = null;
- rmiServerRecv = null;
- rmiClientRecv = new IoTSocketClient(_localPortSend, _portSend, _address, _rev);
- rmiClientSend = new IoTSocketClient(_localPortRecv, _portRecv, _address, _rev);
- waitForPacketsOnClient();
wakeUpThreadOnMethodCall();
wakeUpThreadOnReturnValue();
}
- /**
- * waitForPacketsOnServer() starts a thread that waits for packet bytes on server side
- */
- public void waitForPacketsOnServer() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] packetBytes = null;
- while(true) {
- try {
- packetBytes = rmiServerRecv.receiveBytes(packetBytes);
- if (packetBytes != null) {
- System.out.println("Packet received: " + Arrays.toString(packetBytes));
- int packetType = IoTRMIComm.getPacketType(packetBytes);
- if (packetType == IoTRMIUtil.METHOD_TYPE) {
- System.out.println("Method packet: " + Arrays.toString(packetBytes));
- methodQueue.offer(packetBytes);
- } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
- System.out.println("Return value packet: " + Arrays.toString(packetBytes));
- returnQueue.offer(packetBytes);
- } else
- throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
- } //else
- // Thread.sleep(100);
- packetBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- }
- };
- thread.start();
- }
-
-
- /**
- * waitForPacketsOnClient() starts a thread that waits for packet bytes on client side
- */
- public void waitForPacketsOnClient() {
-
- Thread thread = new Thread() {
- public void run() {
- byte[] packetBytes = null;
- while(true) {
- try {
- packetBytes = rmiClientRecv.receiveBytes(packetBytes);
- if (packetBytes != null) {
- int packetType = IoTRMIComm.getPacketType(packetBytes);
- if (packetType == IoTRMIUtil.METHOD_TYPE) {
- System.out.println("Method packet: " + Arrays.toString(packetBytes));
- methodQueue.offer(packetBytes);
- } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
- System.out.println("Return value packet: " + Arrays.toString(packetBytes));
- returnQueue.offer(packetBytes);
- } else
- throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
- } //else
- // Thread.sleep(100);
- packetBytes = null;
- } catch (Exception ex) {
- ex.printStackTrace();
- throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
- }
- }
- }
- };
- thread.start();
- }
-
-
/**
* wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
*/
/**
- * sendReturnObj() static version
+ * sendReturnObj() abstract version
*/
- public synchronized void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
-
- // Send back return value
- byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
- byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
- int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
- byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
- System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
- // Copy array of bytes (return object)
- System.arraycopy(retObjBytes, 0, retAllBytes, headerLen, retObjBytes.length);
- if (rmiServerSend == null)
- rmiClientSend.sendBytes(retAllBytes);
- else
- rmiServerSend.sendBytes(retAllBytes);
- }
+ public abstract void sendReturnObj(Object retObj, byte[] methodBytes);
/**
- * sendReturnObj() overloaded to send multiple return objects for structs
+ * sendReturnObj() abstract version
*/
- public synchronized void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
-
- // Send back return value
- byte[] retObjBytes = returnToBytes(retCls, retObj);
- // Send return value together with OBJECT_ID and METHOD_ID for arbitration
- int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
- int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
- byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
- // Copy OBJECT_ID and METHOD_ID
- System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
- int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
- byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
- System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
- // Copy array of bytes (return object)
- if (rmiServerSend == null)
- rmiClientSend.sendBytes(retAllBytes);
- else
- rmiServerSend.sendBytes(retAllBytes);
- }
+ public abstract void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes);
/**
/**
- * remoteCall() calls a method remotely by passing in parameters
+ * remoteCall() abstract version
*/
- public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
-
- // Send method info
- byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
- try {
- if (rmiClientSend == null)
- rmiServerSend.sendBytes(methodBytes);
- else
- rmiClientSend.sendBytes(methodBytes);
- } catch (IOException ex) {
- ex.printStackTrace();
- throw new Error("IoTRMICall: Error when sending bytes in remoteCall()!");
- }
- }
+ public abstract void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj);
/**