1 /** Class IoTRMIComm combines the functionalities
2 * of IoTRMICall and IoTRMIObject to create a single
3 * communication class with two sockets serving one
4 * directional traffic for each.
6 * @author Rahmadi Trimananda <rtrimana @ uci.edu>
10 #ifndef _IOTRMICOMM_HPP__
11 #define _IOTRMICOMM_HPP__
20 #include "IoTSocketServer.hpp"
21 #include "IoTSocketClient.hpp"
22 #include "ConcurrentLinkedListQueue.cpp"
26 std::atomic<bool> didGetMethodBytes(false);
27 std::atomic<bool> didGetReturnBytes(false);
32 mutex remoteCallMutex;
33 mutex sendReturnObjMutex;
40 virtual void sendReturnObj(void* retObj, string type, char* methodBytes) = 0;
41 virtual void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) = 0;
42 int returnLength(void* retObj[], string retCls[], int numRet);
43 char* returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet);
44 char* getMethodBytes();
45 int getMethodLength();
46 int getObjectIdFromMethod();
47 static int getObjectId(char* packetBytes);
48 static int getMethodId(char* packetBytes);
49 static int getPacketType(char* packetBytes);
50 void** getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes);
51 void registerSkeleton(int objectId, bool* methodReceived);
52 void registerStub(int objectId, int methodId, bool* retValueReceived);
53 int getObjectIdCounter();
54 void setObjectIdCounter(int objIdCounter);
55 void decrementObjectIdCounter();
57 int methodLength(string paramCls[], void* paramObj[], int numParam);
58 char* methodToBytes(int objectId, int methId, string paramCls[], void* paramObj[],
59 char* method, int numParam);
60 virtual void remoteCall(int objectId, int methodId, string paramCls[],
61 void* paramObj[], int numParam) = 0;
62 void* getReturnValue(string retType, void* retObj);
64 void** getStructObjects(string retType[], int numRet, void* retObj[]);
65 void** getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
73 ConcurrentLinkedListQueue methodQueue;
74 ConcurrentLinkedListQueue returnQueue;
75 map<int,bool*> mapSkeletonId;
76 map<string,bool*> mapStubId;
77 int objectIdCounter = std::numeric_limits<int>::max();
81 void wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm);
82 void wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm);
87 IoTRMIComm::IoTRMIComm() {
89 rmiUtil = new IoTRMIUtil();
94 thread th1 (&IoTRMIComm::wakeUpThreadOnMethodCall, this, this);
96 thread th2 (&IoTRMIComm::wakeUpThreadOnReturnValue, this, this);
103 IoTRMIComm::~IoTRMIComm() {
106 if (rmiUtil != NULL) {
113 void IoTRMIComm::wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm) {
116 //cout << "Starting wakeUpThreadOnMethodCall()" << endl;
118 // Convert back to char*
119 char* queueHead = rmiComm->methodQueue.deQAndGetLength(&methLen);
120 if (queueHead != NULL) {
121 rmiComm->methodBytes = queueHead;
122 rmiComm->methodLen = methLen;
123 //IoTRMIUtil::printBytes(rmiComm->methodBytes, rmiComm->methodLen, false);
124 int currObjId = rmiComm->getObjectId(rmiComm->methodBytes);
125 auto search = rmiComm->mapSkeletonId.find(currObjId);
126 bool* methRecv = search->second;
127 didGetMethodBytes.exchange(false);
129 while(!didGetMethodBytes);
135 void IoTRMIComm::wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm) {
138 //cout << "Starting wakeUpThreadOnReturnValue()" << endl;
140 // Convert back to char*
141 char* queueHead = rmiComm->returnQueue.deQAndGetLength(&retLen);
142 if (queueHead != NULL) {
143 rmiComm->retValueBytes = queueHead;
144 rmiComm->retValueLen = retLen;
145 //IoTRMIUtil::printBytes(rmiComm->retValueBytes, rmiComm->retValueLen, false);
146 int objectId = rmiComm->getObjectId(rmiComm->retValueBytes);
147 int methodId = rmiComm->getMethodId(rmiComm->retValueBytes);
148 string strKey = to_string(objectId) + "-" + to_string(methodId);
149 auto search = rmiComm->mapStubId.find(strKey);
150 bool* retRecv = search->second;
151 didGetReturnBytes.exchange(false);
153 while(!didGetReturnBytes);
159 // registerSkeleton() registers the skeleton to be woken up
160 void IoTRMIComm::registerSkeleton(int objectId, bool* methodReceived) {
162 lock_guard<mutex> guard(regSkelMutex);
163 mapSkeletonId.insert(make_pair(objectId, methodReceived));
167 // registerStub() registers the skeleton to be woken up
168 void IoTRMIComm::registerStub(int objectId, int methodId, bool* retValueReceived) {
170 lock_guard<mutex> guard(regStubMutex);
171 string strKey = to_string(objectId) + "-" + to_string(methodId);
172 mapStubId.insert(make_pair(strKey, retValueReceived));
176 // getObjectIdCounter() gets object Id counter
177 int IoTRMIComm::getObjectIdCounter() {
179 return objectIdCounter;
183 // setObjectIdCounter() sets object Id counter
184 void IoTRMIComm::setObjectIdCounter(int objIdCounter) {
186 objectIdCounter = objIdCounter;
190 // decrementObjectIdCounter() gets object Id counter
191 void IoTRMIComm::decrementObjectIdCounter() {
197 // Get method bytes from the socket
198 char* IoTRMIComm::getMethodBytes() {
205 // Get method length from the socket
206 int IoTRMIComm::getMethodLength() {
213 // Get object Id from bytes
214 int IoTRMIComm::getObjectIdFromMethod() {
216 char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
217 memcpy(objectIdBytes, methodBytes, IoTRMIUtil::OBJECT_ID_LEN);
218 // Get method signature
220 IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
226 // Get object Id from bytes (static version)
227 int IoTRMIComm::getObjectId(char* packetBytes) {
229 char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
230 memcpy(objectIdBytes, packetBytes, IoTRMIUtil::OBJECT_ID_LEN);
231 // Get method signature
233 IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
239 // Get methodId from bytes (static version)
240 int IoTRMIComm::getMethodId(char* packetBytes) {
243 char methodIdBytes[IoTRMIUtil::METHOD_ID_LEN];
244 int offset = IoTRMIUtil::OBJECT_ID_LEN;
245 memcpy(methodIdBytes, packetBytes + offset, IoTRMIUtil::METHOD_ID_LEN);
246 // Get method signature
248 IoTRMIUtil::byteArrayToInt(&methodId, methodIdBytes);
254 // Get methodId from bytes (static version)
255 int IoTRMIComm::getPacketType(char* packetBytes) {
258 char packetTypeBytes[IoTRMIUtil::METHOD_ID_LEN];
259 int offset = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
260 memcpy(packetTypeBytes, packetBytes + offset, IoTRMIUtil::PACKET_TYPE_LEN);
261 // Get method signature
263 IoTRMIUtil::byteArrayToInt(&packetType, packetTypeBytes);
269 // Get method parameters and return an array of parameter objects
271 // For primitive objects:
272 // | 32-bit method ID | m-bit actual data (fixed length) |
274 // For string, arrays, and non-primitive objects:
275 // | 32-bit method ID | 32-bit length | n-bit actual data | ...
276 void** IoTRMIComm::getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes) {
278 // Byte scanning position
279 int pos = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
280 for (int i = 0; i < numParam; i++) {
281 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
282 // Get the 32-bit field in the byte array to get the actual
283 // length (this is a param with indefinite length)
284 if (paramLen == -1) {
285 char bytPrmLen[IoTRMIUtil::PARAM_LEN];
286 memcpy(bytPrmLen, methodBytes + pos, IoTRMIUtil::PARAM_LEN);
287 pos = pos + IoTRMIUtil::PARAM_LEN;
288 int* prmLenPtr = IoTRMIUtil::byteArrayToInt(¶mLen, bytPrmLen);
289 paramLen = *prmLenPtr;
291 char paramBytes[paramLen];
292 memcpy(paramBytes, methodBytes + pos, paramLen);
293 pos = pos + paramLen;
294 paramObj[i] = IoTRMIUtil::getParamObject(paramObj[i], paramCls[i].c_str(), paramBytes, paramLen);
301 // Find the bytes length of a return object (struct that has more than 1 member)
302 int IoTRMIComm::returnLength(void* retObj[], string retCls[], int numRet) {
304 // Get byte arrays and calculate return bytes length
305 int returnLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
306 for (int i = 0; i < numRet; i++) {
307 // Find the return length
308 int retObjLen = rmiUtil->getTypeSize(retCls[i]);
309 if (retObjLen == -1) { // Store the length of the field - indefinite length
310 retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
311 // Some space for return length, i.e. 32 bits for integer
312 returnLen = returnLen + IoTRMIUtil::RETURN_LEN;
314 // Calculate returnLen
315 returnLen = returnLen + retObjLen;
322 // Convert return object (struct members) into bytes
323 char* IoTRMIComm::returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet) {
326 // Get byte arrays and calculate return bytes length
327 for (int i = 0; i < numRet; i++) {
328 // Find the return length
329 int retObjLen = rmiUtil->getTypeSize(retCls[i]);
330 if (retObjLen == -1) { // Store the length of the field - indefinite length
331 retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
332 // Write the return length
333 char retLenBytes[IoTRMIUtil::RETURN_LEN];
334 IoTRMIUtil::intToByteArray(retObjLen, retLenBytes);
335 memcpy(retBytes + pos, retLenBytes, IoTRMIUtil::RETURN_LEN);
336 pos = pos + IoTRMIUtil::RETURN_LEN;
338 // Get array of bytes and put it in the array of array of bytes
339 char objBytes[retObjLen];
340 IoTRMIUtil::getObjectBytes(objBytes, retObj[i], retCls[i].c_str());
341 memcpy(retBytes + pos, objBytes, retObjLen);
342 pos = pos + retObjLen;
349 // Get return value for single values (non-structs)
350 void* IoTRMIComm::getReturnValue(string retType, void* retObj) {
352 // Receive return value and return it to caller
353 lock_guard<mutex> guard(retValMutex);
354 // Copy just the actual return value bytes
355 int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
356 int retActualLen = retValueLen - headerLen;
357 //char *retActualBytes = new char[retActualLen];
358 char retActualBytes[retActualLen];
359 memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
360 //IoTRMIUtil::printBytes(retActualBytes, retActualLen, false);
361 retObj = IoTRMIUtil::getParamObject(retObj, retType.c_str(), retActualBytes, retActualLen);
362 // Delete received bytes object
363 delete[] retValueBytes;
364 //delete[] retActualBytes;
370 // Get a set of return objects (struct)
371 void** IoTRMIComm::getStructObjects(string retType[], int numRet, void* retObj[]) {
373 // Critical section that is used by different objects
374 lock_guard<mutex> guard(retValMutex);
375 // Copy just the actual return value bytes
376 int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
377 int retActualLen = retValueLen - headerLen;
378 char retActualBytes[retActualLen];
379 memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
380 // Return size of array of struct
381 retObj = getReturnObjects(retActualBytes, retType, numRet, retObj);
382 // Delete received bytes object
383 delete[] retValueBytes;
389 // Find the bytes length of a method
390 int IoTRMIComm::methodLength(string paramCls[], void* paramObj[], int numParam) {
392 // Get byte arrays and calculate method bytes length
393 // Start from the object Id + method Id...
394 int methodLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
395 for (int i = 0; i < numParam; i++) {
396 // Find the parameter length
397 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
398 if (paramLen == -1) { // Store the length of the field - indefinite length
399 paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
400 // Some space for param length, i.e. 32 bits for integer
401 methodLen = methodLen + IoTRMIUtil::PARAM_LEN;
403 // Calculate methodLen
404 methodLen = methodLen + paramLen;
410 // Convert method and its parameters into bytes
411 char* IoTRMIComm::methodToBytes(int objectId, int methId, string paramCls[],
412 void* paramObj[], char* method, int numParam) {
414 // Get object Id in bytes
415 char objId[IoTRMIUtil::OBJECT_ID_LEN];
416 IoTRMIUtil::intToByteArray(objectId, objId);
417 memcpy(method, objId, IoTRMIUtil::OBJECT_ID_LEN);
418 int pos = IoTRMIUtil::OBJECT_ID_LEN;
419 // Get method Id in bytes
420 char methodId[IoTRMIUtil::METHOD_ID_LEN];
421 IoTRMIUtil::intToByteArray(methId, methodId);
422 memcpy(method + pos, methodId, IoTRMIUtil::METHOD_ID_LEN);
423 pos = pos + IoTRMIUtil::METHOD_ID_LEN;
424 char packetType[IoTRMIUtil::PACKET_TYPE_LEN];
425 IoTRMIUtil::intToByteArray(IoTRMIUtil::METHOD_TYPE, methodId);
426 memcpy(method + pos, methodId, IoTRMIUtil::PACKET_TYPE_LEN);
427 pos = pos + IoTRMIUtil::PACKET_TYPE_LEN;
428 // Get byte arrays and calculate method bytes length
429 for (int i = 0; i < numParam; i++) {
430 // Find the parameter length
431 int paramLen = rmiUtil->getTypeSize(paramCls[i]);
432 if (paramLen == -1) { // Store the length of the field - indefinite length
433 paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
434 // Write the parameter length
435 char prmLenBytes[IoTRMIUtil::PARAM_LEN];
436 IoTRMIUtil::intToByteArray(paramLen, prmLenBytes);
437 memcpy(method + pos, prmLenBytes, IoTRMIUtil::PARAM_LEN);
438 pos = pos + IoTRMIUtil::PARAM_LEN;
440 // Get array of bytes and put it in the array of array of bytes
441 char objBytes[paramLen];
442 IoTRMIUtil::getObjectBytes(objBytes, paramObj[i], paramCls[i].c_str());
443 memcpy(method + pos, objBytes, paramLen);
444 pos = pos + paramLen;
451 // Get return objects for structs
452 void** IoTRMIComm::getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]) {
454 // Byte scanning position
456 for (int i = 0; i < numRet; i++) {
457 int retLen = rmiUtil->getTypeSize(retCls[i]);
458 // Get the 32-bit field in the byte array to get the actual
459 // length (this is a param with indefinite length)
461 char bytRetLen[IoTRMIUtil::RETURN_LEN];
462 memcpy(bytRetLen, retBytes + pos, IoTRMIUtil::RETURN_LEN);
463 pos = pos + IoTRMIUtil::RETURN_LEN;
464 int* retLenPtr = IoTRMIUtil::byteArrayToInt(&retLen, bytRetLen);
467 char retObjBytes[retLen];
468 memcpy(retObjBytes, retBytes + pos, retLen);
470 retObj[i] = IoTRMIUtil::getParamObject(retObj[i], retCls[i].c_str(), retObjBytes, retLen);