3 import java.io.IOException;
4 import java.nio.ByteBuffer;
5 import java.util.Arrays;
6 import java.util.ArrayList;
7 import java.util.HashMap;
11 import java.lang.reflect.*;
13 import java.util.concurrent.*;
14 import java.util.concurrent.atomic.AtomicBoolean;
17 /** Abstract class IoTRMIComm is a class that combines IoTRMIObject and IoTRMICall
19 * We will arbitrate packets into 2 queues and wake up the right threads/callers.
20 * We separate traffics one-directionally.
22 * @author Rahmadi Trimananda <rtrimana @ uci.edu>
26 public abstract class IoTRMIComm {
31 protected IoTRMIUtil rmiUtil;
32 protected byte[] methodBytes;
33 protected byte[] retValueBytes;
34 protected ConcurrentLinkedQueue<byte[]> methodQueue;
35 protected ConcurrentLinkedQueue<byte[]> returnQueue;
36 protected Map<Integer,AtomicBoolean> mapSkeletonId;
37 protected Map<String,AtomicBoolean> mapStubId;
38 protected AtomicBoolean didGetMethodBytes;
39 protected AtomicBoolean didGetReturnBytes;
40 protected int objectIdCounter = Integer.MAX_VALUE;
43 * Constructor (for skeleton)
45 public IoTRMIComm() throws
46 ClassNotFoundException, InstantiationException,
47 IllegalAccessException, IOException {
49 rmiUtil = new IoTRMIUtil();
52 methodQueue = new ConcurrentLinkedQueue<byte[]>();
53 returnQueue = new ConcurrentLinkedQueue<byte[]>();
54 mapSkeletonId = new HashMap<Integer,AtomicBoolean>();
55 mapStubId = new HashMap<String,AtomicBoolean>();
56 didGetMethodBytes = new AtomicBoolean(false);
57 didGetReturnBytes = new AtomicBoolean(false);
58 wakeUpThreadOnMethodCall();
59 wakeUpThreadOnReturnValue();
64 * wakeUpThreadOnMethodCall() wakes up the correct thread when receiving method call
66 public void wakeUpThreadOnMethodCall() {
68 Thread thread = new Thread() {
71 // Take the current method from the queue and wake up the correct thread
72 methodBytes = methodQueue.poll();
73 if (methodBytes != null) { // If there is method bytes
74 int currObjId = getObjectId(methodBytes);
75 AtomicBoolean methRecv = mapSkeletonId.get(currObjId);
76 didGetMethodBytes.set(false);
77 while(!methRecv.compareAndSet(false, true));
78 while(!didGetMethodBytes.get()); // While skeleton is still processing
88 * wakeUpThreadOnReturnValue() wakes up the correct thread when receiving return value
90 public void wakeUpThreadOnReturnValue() {
92 Thread thread = new Thread() {
95 // Take the current method from the queue and wake up the correct thread
96 retValueBytes = returnQueue.poll();
97 if (retValueBytes != null) { // If there is method bytes
98 System.out.println("retValBytes in wake up thread: " + Arrays.toString(retValueBytes));
99 int objectId = getObjectId(retValueBytes);
100 int methodId = getMethodId(retValueBytes);
101 String strKey = objectId + "-" + methodId;
102 AtomicBoolean retRecv = mapStubId.get(strKey);
103 //System.out.println("boolean status: " + retRecv + " with key: " + strKey);
104 didGetReturnBytes.set(false);
105 while(!retRecv.compareAndSet(false, true));
106 //System.out.println("boolean status: " + retRecv + " - map has: " + mapStubId.size());
107 while(!didGetReturnBytes.get()); // While skeleton is still processing
117 * registerSkeleton() registers the skeleton to be woken up
120 public synchronized void registerSkeleton(int objectId, AtomicBoolean methodReceived) {
122 mapSkeletonId.put(objectId, methodReceived);
127 * registerStub() registers the skeleton to be woken up
129 public synchronized void registerStub(int objectId, int methodId, AtomicBoolean retValueReceived) {
131 String strKey = objectId + "-" + methodId;
132 //System.out.println("Key exist? " + mapStubId.containsKey(strKey));
133 mapStubId.put(strKey, retValueReceived);
134 //System.out.println("\n\nAdding keyBytes: " + strKey + " now size: " + mapStubId.size() + "\n\n");
139 * getObjectIdCounter() gets object Id counter
141 public int getObjectIdCounter() {
143 return objectIdCounter;
148 * setObjectIdCounter() sets object Id counter
150 public void setObjectIdCounter(int objIdCounter) {
152 objectIdCounter = objIdCounter;
157 * decrementObjectIdCounter() gets object Id counter
159 public void decrementObjectIdCounter() {
166 * setGetMethodBytes() set didGetMethodBytes to true after getting the bytes
168 public boolean setGetMethodBytes() {
170 return didGetMethodBytes.compareAndSet(false, true);
175 * setGetReturnBytes() set didGetReturnBytes if there is a new return value already
177 public synchronized boolean setGetReturnBytes() {
179 return didGetReturnBytes.compareAndSet(false, true);
184 * getMethodBytes() get the method in bytes
186 public byte[] getMethodBytes() throws IOException {
188 // Just return the methodBytes content
194 * static version of getObjectId()
196 public static int getObjectId(byte[] packetBytes) {
198 // Get object Id bytes
199 byte[] objectIdBytes = new byte[IoTRMIUtil.OBJECT_ID_LEN];
200 System.arraycopy(packetBytes, 0, objectIdBytes, 0, IoTRMIUtil.OBJECT_ID_LEN);
202 int objectId = IoTRMIUtil.byteArrayToInt(objectIdBytes);
208 * static version of getMethodId()
210 public static int getMethodId(byte[] packetBytes) {
212 // Get method Id bytes
213 byte[] methodIdBytes = new byte[IoTRMIUtil.METHOD_ID_LEN];
214 // Method Id is positioned after object Id in the byte array
215 System.arraycopy(packetBytes, IoTRMIUtil.OBJECT_ID_LEN, methodIdBytes, 0, IoTRMIUtil.METHOD_ID_LEN);
217 int methodId = IoTRMIUtil.byteArrayToInt(methodIdBytes);
224 * static version of getPacketType() - either method or return value (position is after object Id and method Id)
226 public static int getPacketType(byte[] packetBytes) {
228 // Get packet type bytes
229 byte[] packetTypeBytes = new byte[IoTRMIUtil.PACKET_TYPE_LEN];
230 int offset = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN;
231 System.arraycopy(packetBytes, offset, packetTypeBytes, 0, IoTRMIUtil.PACKET_TYPE_LEN);
232 // Get packet type (for now we assume 1 as method and -1 as return value
233 int packetType = IoTRMIUtil.byteArrayToInt(packetTypeBytes);
239 * getMethodParams() gets method params based on byte array received
241 * Basically this is the format of a method in bytes:
242 * 1) 32-bit value of object ID
243 * 2) 32-bit value of method ID
244 * 3) m parameters with n-bit value each (m x n-bit)
245 * For the parameters that don't have definite length,
246 * we need to extract the length from a preceding 32-bit
247 * field in front of it.
249 * For primitive objects:
250 * | 32-bit object ID | 32-bit method ID | m-bit actual data (fixed length) | ...
252 * For string, arrays, and non-primitive objects:
253 * | 32-bit object ID | 32-bit method ID | 32-bit length | n-bit actual data | ...
256 public Object[] getMethodParams(Class<?>[] arrCls, Class<?>[] arrGenValCls, byte[] methodBytes) {
258 // Byte scanning position
259 int pos = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
260 Object[] paramObj = new Object[arrCls.length];
261 for (int i=0; i < arrCls.length; i++) {
263 String paramType = arrCls[i].getSimpleName();
264 int paramSize = rmiUtil.getTypeSize(paramType);
265 // Get the 32-bit field in the byte array to get the actual
266 // length (this is a param with indefinite length)
267 if (paramSize == -1) {
268 byte[] bytPrmLen = new byte[IoTRMIUtil.PARAM_LEN];
269 System.arraycopy(methodBytes, pos, bytPrmLen, 0, IoTRMIUtil.PARAM_LEN);
270 pos = pos + IoTRMIUtil.PARAM_LEN;
271 paramSize = IoTRMIUtil.byteArrayToInt(bytPrmLen);
273 byte[] paramBytes = new byte[paramSize];
274 System.arraycopy(methodBytes, pos, paramBytes, 0, paramSize);
275 pos = pos + paramSize;
276 paramObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], paramBytes);
284 * sendReturnObj() abstract version
286 public abstract void sendReturnObj(Object retObj, byte[] methodBytes);
290 * sendReturnObj() abstract version
292 public abstract void sendReturnObj(Class<?>[] retCls, Object[] retObj, byte[] methodBytes);
296 * returnToBytes() takes array of objects and generates bytes
298 public byte[] returnToBytes(Class<?>[] retCls, Object[] retObj) {
300 // Get byte arrays and calculate method bytes length
301 int numbRet = retObj.length;
303 byte[][] objBytesArr = new byte[numbRet][];
304 for (int i = 0; i < numbRet; i++) {
305 // Get byte arrays for the objects
306 objBytesArr[i] = IoTRMIUtil.getObjectBytes(retObj[i]);
307 String clsName = retCls[i].getSimpleName();
308 int retObjLen = rmiUtil.getTypeSize(clsName);
309 if (retObjLen == -1) { // indefinite length - store the length first
310 retLen = retLen + IoTRMIUtil.RETURN_LEN;
312 retLen = retLen + objBytesArr[i].length;
314 // Construct return in byte array
315 byte[] retBytes = new byte[retLen];
317 // Iteration for copying bytes
318 for (int i = 0; i < numbRet; i++) {
320 String clsName = retCls[i].getSimpleName();
321 int retObjLen = rmiUtil.getTypeSize(clsName);
322 if (retObjLen == -1) { // indefinite length
323 retObjLen = objBytesArr[i].length;
324 byte[] retLenBytes = IoTRMIUtil.intToByteArray(retObjLen);
325 System.arraycopy(retLenBytes, 0, retBytes, pos, IoTRMIUtil.RETURN_LEN);
326 pos = pos + IoTRMIUtil.RETURN_LEN;
328 System.arraycopy(objBytesArr[i], 0, retBytes, pos, retObjLen);
329 pos = pos + retObjLen;
337 * remoteCall() abstract version
339 public abstract void remoteCall(int objectId, int methodId, Class<?>[] paramCls, Object[] paramObj);
343 * getReturnValue() returns return value object
345 public Object getReturnValue(Class<?> retType, Class<?> retGenTypeVal) {
347 // Receive return value and return it to caller
348 // Now just strip off the object ID and method ID
349 int headerLen = IoTRMIUtil.OBJECT_ID_LEN + IoTRMIUtil.METHOD_ID_LEN + IoTRMIUtil.PACKET_TYPE_LEN;
350 int valByteLen = retValueBytes.length - headerLen;
351 byte[] retValBytes = new byte[valByteLen];
352 // Method Id is positioned after object Id in the byte array
353 System.arraycopy(retValueBytes, headerLen, retValBytes, 0, valByteLen);
354 Object retObj = IoTRMIUtil.getParamObject(retType, retGenTypeVal, retValBytes);
355 // This means the right object and method have gotten the return value, so we set this back to false
361 * methodToBytes() returns byte representation of a method
363 public byte[] methodToBytes(int objectId, int methId, Class<?>[] paramCls, Object[] paramObj) {
365 // Initialized to the length of method ID
366 int methodLen = IoTRMIUtil.OBJECT_ID_LEN;
367 byte[] objId = IoTRMIUtil.intToByteArray(objectId);
368 // Get method ID in bytes
369 byte[] methodId = IoTRMIUtil.intToByteArray(methId);
370 // Get byte arrays and calculate method bytes length
371 int numbParam = paramObj.length;
372 methodLen = methodLen + IoTRMIUtil.METHOD_ID_LEN;
373 methodLen = methodLen + IoTRMIUtil.PACKET_TYPE_LEN;
374 byte[][] objBytesArr = new byte[numbParam][];
375 for (int i = 0; i < numbParam; i++) {
376 // Get byte arrays for the objects
377 objBytesArr[i] = IoTRMIUtil.getObjectBytes(paramObj[i]);
378 String clsName = paramCls[i].getSimpleName();
379 int paramLen = rmiUtil.getTypeSize(clsName);
380 if (paramLen == -1) { // indefinite length - store the length first
381 methodLen = methodLen + IoTRMIUtil.PARAM_LEN;
383 methodLen = methodLen + objBytesArr[i].length;
385 // Construct method in byte array
386 byte[] method = new byte[methodLen];
388 System.arraycopy(objId, 0, method, 0, IoTRMIUtil.METHOD_ID_LEN);
389 pos = pos + IoTRMIUtil.OBJECT_ID_LEN;
390 System.arraycopy(methodId, 0, method, pos, IoTRMIUtil.METHOD_ID_LEN);
391 pos = pos + IoTRMIUtil.METHOD_ID_LEN;
392 int packetType = IoTRMIUtil.METHOD_TYPE; // This is a method
393 byte[] packetTypeBytes = IoTRMIUtil.intToByteArray(packetType);
394 System.arraycopy(packetTypeBytes, 0, method, pos, IoTRMIUtil.PACKET_TYPE_LEN);
395 pos = pos + IoTRMIUtil.PACKET_TYPE_LEN;
396 // Second iteration for copying bytes
397 for (int i = 0; i < numbParam; i++) {
399 String clsName = paramCls[i].getSimpleName();
400 int paramLen = rmiUtil.getTypeSize(clsName);
401 if (paramLen == -1) { // indefinite length
402 paramLen = objBytesArr[i].length;
403 byte[] paramLenBytes = IoTRMIUtil.intToByteArray(paramLen);
404 System.arraycopy(paramLenBytes, 0, method, pos, IoTRMIUtil.PARAM_LEN);
405 pos = pos + IoTRMIUtil.PARAM_LEN;
407 System.arraycopy(objBytesArr[i], 0, method, pos, paramLen);
408 pos = pos + paramLen;
416 * getStructObjects() calls a method remotely by passing in parameters and getting a return Object
418 /*public synchronized Object[] getStructObjects(Class<?>[] retType, Class<?>[] retGenTypeVal) {
420 // Receive return value and return it to caller
421 Object[] retObj = null;
422 byte[] retObjBytes = null;
424 retObjBytes = rmiClientRecv.receiveBytes(retObjBytes);
425 } catch (IOException ex) {
426 ex.printStackTrace();
427 throw new Error("IoTRMICall: Error when receiving bytes - rmiClient.receiveBytes()");
429 retObj = getReturnObjects(retObjBytes, retType, retGenTypeVal);
436 * remoteCall() calls a method remotely by passing in parameters and getting a return Object
438 public Object[] getReturnObjects(byte[] retBytes, Class<?>[] arrCls, Class<?>[] arrGenValCls) {
440 // Byte scanning position
442 Object[] retObj = new Object[arrCls.length];
443 for (int i=0; i < arrCls.length; i++) {
445 String retType = arrCls[i].getSimpleName();
446 int retSize = rmiUtil.getTypeSize(retType);
447 // Get the 32-bit field in the byte array to get the actual
448 // length (this is a param with indefinite length)
450 byte[] bytRetLen = new byte[IoTRMIUtil.RETURN_LEN];
451 System.arraycopy(retBytes, pos, bytRetLen, 0, IoTRMIUtil.RETURN_LEN);
452 pos = pos + IoTRMIUtil.RETURN_LEN;
453 retSize = IoTRMIUtil.byteArrayToInt(bytRetLen);
455 byte[] retObjBytes = new byte[retSize];
456 System.arraycopy(retBytes, pos, retObjBytes, 0, retSize);
458 retObj[i] = IoTRMIUtil.getParamObject(arrCls[i], arrGenValCls[i], retObjBytes);