3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.util.Arrays;
6 import java.util.ArrayList;
7 import java.util.HashMap;
11 import java.lang.reflect.*;
13 import java.util.concurrent.*;
14 import java.util.concurrent.atomic.AtomicBoolean;
17 /** Class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
19 * We will arbitrate packets into 2 queues and wake up the right threads/callers.
20 * We separate traffics one-directionally.
22 * @author Rahmadi Trimananda <rtrimana @ uci.edu>
26 public class IoTRMIComm {
31 private List<String> listMethodId2Sign; // List of method signature (we use list index as method Id)
32 private IoTRMIUtil rmiUtil;
33 private IoTSocketServer rmiServerSend;
34 private IoTSocketServer rmiServerRecv;
35 private IoTSocketClient rmiClientSend;
36 private IoTSocketClient rmiClientRecv;
37 private byte[] methodBytes;
38 private byte[] retValueBytes;
39 private ConcurrentLinkedQueue<byte[]> methodQueue;
40 private ConcurrentLinkedQueue<byte[]> returnQueue;
41 private Map<Integer,AtomicBoolean> mapSkeletonId;
42 private Map<String,AtomicBoolean> mapStubId;
43 private AtomicBoolean didGetMethodBytes;
44 private AtomicBoolean didGetReturnBytes;
45 private AtomicBoolean didServerSendConnect;
46 private AtomicBoolean didServerRecvConnect;
47 private int objectIdCounter = Integer.MAX_VALUE;
50 * Constructor (for skeleton)
52 public IoTRMIComm(int _portSend, int _portRecv) throws
53 ClassNotFoundException, InstantiationException,
54 IllegalAccessException, IOException {
56 didGetMethodBytes = new AtomicBoolean(false);
57 didGetReturnBytes = new AtomicBoolean(false);
58 didServerSendConnect = new AtomicBoolean(false);
59 didServerRecvConnect = new AtomicBoolean(false);
60 rmiUtil = new IoTRMIUtil();
63 methodQueue = new ConcurrentLinkedQueue<byte[]>();
64 returnQueue = new ConcurrentLinkedQueue<byte[]>();
65 mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
66 mapStubId = new HashMap<String,AtomicBoolean>();
67 //rmiServerSend = new IoTSocketServer(_portSend);
68 //rmiServerSend.connect();
69 //rmiServerRecv = new IoTSocketServer(_portRecv);
70 //rmiServerRecv.connect();
71 rmiServerSend = new IoTSocketServer(_portSend);
72 rmiServerRecv = new IoTSocketServer(_portRecv);
73 waitForConnectionOnServerSend();
74 waitForConnectionOnServerRecv();
75 while(!didServerSendConnect.get()); // Wait until server is connected
76 while(!didServerRecvConnect.get()); // Wait until server is connected
79 waitForPacketsOnServer();
80 wakeUpThreadOnMethodCall();
81 wakeUpThreadOnReturnValue();
86 * waitForConnectionOnServerSend() starts a thread that waits server connection
88 public void waitForConnectionOnServerSend() {
90 Thread thread = new Thread() {
93 rmiServerSend.connect();
94 didServerSendConnect.set(true);
95 } catch (Exception ex) {
97 throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
106 * waitForConnectionOnServerRecv() starts a thread that waits server connection
108 public void waitForConnectionOnServerRecv() {
110 Thread thread = new Thread() {
113 rmiServerRecv.connect();
114 didServerRecvConnect.set(true);
115 } catch (Exception ex) {
116 ex.printStackTrace();
117 throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
126 * Constructor (for stub) - send and recv from the perspective of RMI socket servers
128 public IoTRMIComm(int _localPortSend, int _localPortRecv, int _portSend, int _portRecv, String _address, int _rev) throws
129 ClassNotFoundException, InstantiationException,
130 IllegalAccessException, IOException {
132 didGetMethodBytes = new AtomicBoolean(false);
133 didGetReturnBytes = new AtomicBoolean(false);
134 didServerSendConnect = null;
135 didServerRecvConnect = null;
136 rmiUtil = new IoTRMIUtil();
138 retValueBytes = null;
139 methodQueue = new ConcurrentLinkedQueue<byte[]>();
140 returnQueue = new ConcurrentLinkedQueue<byte[]>();
141 mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
142 mapStubId = new HashMap<String,AtomicBoolean>();
143 rmiServerSend = null;
144 rmiServerRecv = null;
145 rmiClientRecv = new IoTSocketClient(_localPortSend, _portSend, _address, _rev);
146 rmiClientSend = new IoTSocketClient(_localPortRecv, _portRecv, _address, _rev);
147 waitForPacketsOnClient();
148 wakeUpThreadOnMethodCall();
149 wakeUpThreadOnReturnValue();
154 * waitForPacketsOnServer() starts a thread that waits for packet bytes on server side
156 public void waitForPacketsOnServer() {
158 Thread thread = new Thread() {
160 byte[] packetBytes = null;
163 packetBytes = rmiServerRecv.receiveBytes(packetBytes);
164 if (packetBytes != null) {
165 System.out.println("Packet received: " + Arrays.toString(packetBytes));
166 int packetType = IoTRMIComm.getPacketType(packetBytes);
167 if (packetType == IoTRMIUtil.METHOD_TYPE) {
168 System.out.println("Method packet: " + Arrays.toString(packetBytes));
169 methodQueue.offer(packetBytes);
170 } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
171 System.out.println("Return value packet: " + Arrays.toString(packetBytes));
172 returnQueue.offer(packetBytes);
174 throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
176 // Thread.sleep(100);
178 } catch (Exception ex) {
179 ex.printStackTrace();
180 throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
190 * waitForPacketsOnClient() starts a thread that waits for packet bytes on client side
192 public void waitForPacketsOnClient() {
194 Thread thread = new Thread() {
196 byte[] packetBytes = null;
199 packetBytes = rmiClientRecv.receiveBytes(packetBytes);
200 if (packetBytes != null) {
201 int packetType = IoTRMIComm.getPacketType(packetBytes);
202 if (packetType == IoTRMIUtil.METHOD_TYPE) {
203 System.out.println("Method packet: " + Arrays.toString(packetBytes));
204 methodQueue.offer(packetBytes);
205 } else if (packetType == IoTRMIUtil.RET_VAL_TYPE) {
206 System.out.println("Return value packet: " + Arrays.toString(packetBytes));
207 returnQueue.offer(packetBytes);
209 throw new Error("IoTRMIComm: Packet type is unknown: " + packetType);
211 // Thread.sleep(100);
213 } catch (Exception ex) {
214 ex.printStackTrace();
215 throw new Error("IoTRMIComm: Error receiving return value bytes on client!");
225 * wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
227 public void wakeUpThreadOnMethodCall() {
229 Thread thread = new Thread() {
232 // Take the current method from the queue and wake up the correct thread
233 methodBytes = methodQueue.poll();
234 if (methodBytes != null) { // If there is method bytes
235 int currObjId = getObjectId(methodBytes);
236 AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
237 didGetMethodBytes.set(false);
238 while(!methRecv.compareAndSet(false, true));
239 while(!didGetMethodBytes.get()); // While skeleton is still processing
249 * wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value
251 public void wakeUpThreadOnReturnValue() {
253 Thread thread = new Thread() {
256 // Take the current method from the queue and wake up the correct thread
257 retValueBytes = returnQueue.poll();
258 if (retValueBytes != null) { // If there is method bytes
259 System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes));
260 int objectId = getObjectId(retValueBytes);
261 int methodId = getMethodId(retValueBytes);
262 String strKey = objectId + "-" + methodId;
263 AtomicBoolean retRecv = mapStubId.get(strKey);
264 //System.out.println("boolean status: " + retRecv + " with key: " + strKey);
265 didGetReturnBytes.set(false);
266 while(!retRecv.compareAndSet(false, true));
267 //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
268 while(!didGetReturnBytes.get()); // While skeleton is still processing
278 * registerSkeleton() registers the skeleton to be woken up
281 public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
283 mapSkeletonId.put(objectId, methodReceived);
288 * registerStub() registers the skeleton to be woken up
290 public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
292 String strKey = objectId + "-" + methodId;
293 //System.out.println("Key exist? " + mapStubId.containsKey(strKey));
294 mapStubId.put(strKey, retValueReceived);
295 //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
300 * getObjectIdCounter() gets object Id counter
302 public int getObjectIdCounter() {
304 return objectIdCounter;
309 * setObjectIdCounter() sets object Id counter
311 public void setObjectIdCounter(int objIdCounter) {
313 objectIdCounter = objIdCounter;
318 * decrementObjectIdCounter() gets object Id counter
320 public void decrementObjectIdCounter() {
327 * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
329 public boolean setGetMethodBytes() {
331 return didGetMethodBytes.compareAndSet(false, true);
336 * setGetReturnBytes() set didGetReturnBytes if there is a new return value already
338 public synchronized boolean setGetReturnBytes() {
340 return didGetReturnBytes.compareAndSet(false, true);
345 * getMethodBytes() get the method in bytes
347 public byte[] getMethodBytes() throws IOException {
349 // Just return the methodBytes content
355 * static version of getObjectId()
357 public static int getObjectId(byte[] packetBytes) {
359 // Get object Id bytes
360 byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
361 System.arraycopy(packetBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
363 int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
369 * static version of getMethodId()
371 public static int getMethodId(byte[] packetBytes) {
373 // Get method Id bytes
374 byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
375 // Method Id is positioned after object Id in the byte array
376 System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
378 int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
385 * static version of getPacketType() - either method or return value (position is after object Id and method Id)
387 public static int getPacketType(byte[] packetBytes) {
389 // Get packet type bytes
390 byte[] packetTypeBytes = new byte[IoTRMIUtil.PACKET_TYPE_LEN];
391 int offset = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
392 System.arraycopy(packetBytes, offset, packetTypeBytes, 0, IoTRMIUtil.PACKET_TYPE_LEN);
393 // Get packet type (for now we assume 1 as method and -1 as return value
394 int packetType = IoTRMIUtil.byteArrayToInt(packetTypeBytes);
400 * getMethodParams() gets method params based on byte array received
402 * Basically this is the format of a method in bytes:
403 * 1) 32-bit value of object ID
404 * 2) 32-bit value of method ID
405 * 3) m parameters with n-bit value each (m x n-bit)
406 * For the parameters that don't have definite length,
407 * we need to extract the length from a preceding 32-bit
408 * field in front of it.
410 * For primitive objects:
411 * | 32-bit object ID | 32-bit method ID | m-bit actual data (fixed length) | ...
413 * For string, arrays, and non-primitive objects:
414 * | 32-bit object ID | 32-bit method ID | 32-bit length | n-bit actual data | ...
417 public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
419 // Byte scanning position
420 int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
421 Object[] paramObj = new Object[arrCls.length];
422 for (int i=0; i < arrCls.length; i++) {
424 String paramType = arrCls[i].getSimpleName();
425 int paramSize = rmiUtil.getTypeSize(paramType);
426 // Get the 32-bit field in the byte array to get the actual
427 // length (this is a param with indefinite length)
428 if (paramSize == -1) {
429 byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
430 System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
431 pos = pos + IoTRMIUtil.PARAM_LEN;
432 paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
434 byte[] paramBytes = new byte[paramSize];
435 System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
436 pos = pos + paramSize;
437 paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
445 * sendReturnObj() static version
447 public synchronized void sendReturnObj(Object retObj, byte[] methodBytes) throws IOException {
449 // Send back return value
450 byte[] retObjBytes = IoTRMIUtil.getObjectBytes(retObj);
451 // Send return value together with OBJECT_ID and METHOD_ID for arbitration
452 int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
453 int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
454 byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
455 // Copy OBJECT_ID and METHOD_ID
456 System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
457 int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
458 byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
459 System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
460 // Copy array of bytes (return object)
461 System.arraycopy(retObjBytes, 0, retAllBytes, headerLen, retObjBytes.length);
462 if (rmiServerSend == null)
463 rmiClientSend.sendBytes(retAllBytes);
465 rmiServerSend.sendBytes(retAllBytes);
470 * sendReturnObj() overloaded to send multiple return objects for structs
472 public synchronized void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes) throws IOException {
474 // Send back return value
475 byte[] retObjBytes = returnToBytes(retCls, retObj);
476 // Send return value together with OBJECT_ID and METHOD_ID for arbitration
477 int objMethIdLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
478 int headerLen = objMethIdLen + IoTRMIUtil.PACKET_TYPE_LEN;
479 byte[] retAllBytes = new byte[headerLen + retObjBytes.length];
480 // Copy OBJECT_ID and METHOD_ID
481 System.arraycopy(methodBytes, 0, retAllBytes, 0, objMethIdLen);
482 int packetType = IoTRMIUtil.RET_VAL_TYPE; // This is a return value
483 byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
484 System.arraycopy(packetTypeBytes, 0, retAllBytes, objMethIdLen, IoTRMIUtil.PACKET_TYPE_LEN);
485 // Copy array of bytes (return object)
486 if (rmiServerSend == null)
487 rmiClientSend.sendBytes(retAllBytes);
489 rmiServerSend.sendBytes(retAllBytes);
494 * returnToBytes() takes array of objects and generates bytes
496 public byte[] returnToBytes(Class<?>[] retCls, Object[] retObj) {
498 // Get byte arrays and calculate method bytes length
499 int numbRet = retObj.length;
501 byte[][] objBytesArr = new byte[numbRet][];
502 for (int i = 0; i < numbRet; i++) {
503 // Get byte arrays for the objects
504 objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]);
505 String clsName = retCls[i].getSimpleName();
506 int retObjLen = rmiUtil.getTypeSize(clsName);
507 if (retObjLen == -1) { // indefinite length - store the length first
508 retLen = retLen + IoTRMIUtil.RETURN_LEN;
510 retLen = retLen + objBytesArr[i].length;
512 // Construct return in byte array
513 byte[] retBytes = new byte[retLen];
515 // Iteration for copying bytes
516 for (int i = 0; i < numbRet; i++) {
518 String clsName = retCls[i].getSimpleName();
519 int retObjLen = rmiUtil.getTypeSize(clsName);
520 if (retObjLen == -1) { // indefinite length
521 retObjLen = objBytesArr[i].length;
522 byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen);
523 System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN);
524 pos = pos + IoTRMIUtil.RETURN_LEN;
526 System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen);
527 pos = pos + retObjLen;
535 * remoteCall() calls a method remotely by passing in parameters
537 public synchronized void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj) {
540 byte[] methodBytes = methodToBytes(objectId, methodId, paramCls, paramObj);
542 if (rmiClientSend == null)
543 rmiServerSend.sendBytes(methodBytes);
545 rmiClientSend.sendBytes(methodBytes);
546 } catch (IOException ex) {
547 ex.printStackTrace();
548 throw new Error("IoTRMICall: Error when sending bytes in remoteCall()!");
554 * getReturnValue() returns return value object
556 public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
558 // Receive return value and return it to caller
559 // Now just strip off the object ID and method ID
560 int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
561 int valByteLen = retValueBytes.length - headerLen;
562 byte[] retValBytes = new byte[valByteLen];
563 // Method Id is positioned after object Id in the byte array
564 System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
565 Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
566 // This means the right object and method have gotten the return value, so we set this back to false
572 * methodToBytes() returns byte representation of a method
574 public byte[] methodToBytes(int objectId, int methId, Class<?>[] paramCls, Object[] paramObj) {
576 // Initialized to the length of method ID
577 int methodLen = IoTRMIUtil.OBJECT_ID_LEN;
578 byte[] objId = IoTRMIUtil.intToByteArray(objectId);
579 // Get method ID in bytes
580 byte[] methodId = IoTRMIUtil.intToByteArray(methId);
581 // Get byte arrays and calculate method bytes length
582 int numbParam = paramObj.length;
583 methodLen = methodLen + IoTRMIUtil.METHOD_ID_LEN;
584 methodLen = methodLen + IoTRMIUtil.PACKET_TYPE_LEN;
585 byte[][] objBytesArr = new byte[numbParam][];
586 for (int i = 0; i < numbParam; i++) {
587 // Get byte arrays for the objects
588 objBytesArr[i] = IoTRMIUtil.getObjectBytes(paramObj[i]);
589 String clsName = paramCls[i].getSimpleName();
590 int paramLen = rmiUtil.getTypeSize(clsName);
591 if (paramLen == -1) { // indefinite length - store the length first
592 methodLen = methodLen + IoTRMIUtil.PARAM_LEN;
594 methodLen = methodLen + objBytesArr[i].length;
596 // Construct method in byte array
597 byte[] method = new byte[methodLen];
599 System.arraycopy(objId, 0, method, 0, IoTRMIUtil.METHOD_ID_LEN);
600 pos = pos + IoTRMIUtil.OBJECT_ID_LEN;
601 System.arraycopy(methodId, 0, method, pos, IoTRMIUtil.METHOD_ID_LEN);
602 pos = pos + IoTRMIUtil.METHOD_ID_LEN;
603 int packetType = IoTRMIUtil.METHOD_TYPE; // This is a method
604 byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
605 System.arraycopy(packetTypeBytes, 0, method, pos, IoTRMIUtil.PACKET_TYPE_LEN);
606 pos = pos + IoTRMIUtil.PACKET_TYPE_LEN;
607 // Second iteration for copying bytes
608 for (int i = 0; i < numbParam; i++) {
610 String clsName = paramCls[i].getSimpleName();
611 int paramLen = rmiUtil.getTypeSize(clsName);
612 if (paramLen == -1) { // indefinite length
613 paramLen = objBytesArr[i].length;
614 byte[] paramLenBytes = IoTRMIUtil.intToByteArray(paramLen);
615 System.arraycopy(paramLenBytes, 0, method, pos, IoTRMIUtil.PARAM_LEN);
616 pos = pos + IoTRMIUtil.PARAM_LEN;
618 System.arraycopy(objBytesArr[i], 0, method, pos, paramLen);
619 pos = pos + paramLen;
627 * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
629 /*public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
631 // Receive return value and return it to caller
632 Object[] retObj = null;
633 byte[] retObjBytes = null;
635 retObjBytes = rmiClientRecv.receiveBytes(retObjBytes);
636 } catch (IOException ex) {
637 ex.printStackTrace();
638 throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
640 retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
647 * remoteCall() calls a method remotely by passing in parameters and getting a return Object
649 public Object[] getReturnObjects(byte[] retBytes, Class<?>[] arrCls, Class<?>[] arrGenValCls) {
651 // Byte scanning position
653 Object[] retObj = new Object[arrCls.length];
654 for (int i=0; i < arrCls.length; i++) {
656 String retType = arrCls[i].getSimpleName();
657 int retSize = rmiUtil.getTypeSize(retType);
658 // Get the 32-bit field in the byte array to get the actual
659 // length (this is a param with indefinite length)
661 byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN];
662 System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN);
663 pos = pos + IoTRMIUtil.RETURN_LEN;
664 retSize = IoTRMIUtil.byteArrayToInt(bytRetLen);
666 byte[] retObjBytes = new byte[retSize];
667 System.arraycopy(retBytes, pos, retObjBytes, 0, retSize);
669 retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes);