using namespace std;
-Node::Node(void* val) {
+Node::Node(char* val, int len) {
value = val;
+ length = len;
next = NULL;
}
}
-void* Node::getValue() {
+char* Node::getValue() {
return value;
}
+int Node::getLength() {
+
+ return length;
+}
+
+
Node* Node::getNext() {
return next;
ConcurrentLinkedListQueue::~ConcurrentLinkedListQueue() {
- void* val = NULL;
+ char* val = NULL;
do { // Dequeue and free everything up
val = dequeue();
} while(val != NULL);
}
-void ConcurrentLinkedListQueue::enqueue(void* value) {
+void ConcurrentLinkedListQueue::enqueue(char* value, int length) {
- lock_guard<mutex> guard(mtx);
+ lock_guard<mutex> guard(queueMutex);
if (tail == NULL && head == NULL) { // first element
- tail = new Node(value);
+ tail = new Node(value, length);
head = tail; // Both tail and head point to the first element
} else { // Next elements
- Node* newEl = new Node(value);
+ Node* newEl = new Node(value, length);
tail->setNext(newEl);
tail = newEl;
}
// Return element and remove from list
-void* ConcurrentLinkedListQueue::dequeue() {
+char* ConcurrentLinkedListQueue::dequeue() {
- lock_guard<mutex> guard(mtx);
+ lock_guard<mutex> guard(queueMutex);
if (tail == NULL && head == NULL) { // empty
return NULL;
} else {
tail = NULL;
} else
head = head->getNext();
- void* retVal = retEl->getValue();
+ char* retVal = retEl->getValue();
// Prepare retEl for deletion
retEl->setNext(NULL);
delete retEl;
}
}
+// Return element, length, and remove it from list
+char* ConcurrentLinkedListQueue::deQAndGetLength(int* length) {
-void enQ(ConcurrentLinkedListQueue* methodQueue, char** test) {
-
- /*char* test = new char[3];
- test[0] = 'a';
- test[1] = 'b';
- test[2] = 'c';*/
- void* ptr = test;
- for(int i=0; i<10; i++ ) {
- cout << "Enqueuing: " << test << " address: " << ptr << endl;
- methodQueue->enqueue(ptr);
- }
-}
-
-
-void deQ(ConcurrentLinkedListQueue* methodQueue) {
-
- for(int i=0; i<12; i++) {
- void* result = methodQueue->dequeue();
- if (result != NULL) {
- cout << "Dequeue result: " << result << endl;
- cout << "Dequeue result: " << *((char**) result) << endl;
- } else {
- cout << "Result is NULL!!! End of queue!" << endl;
- }
+ lock_guard<mutex> guard(queueMutex);
+ if (tail == NULL && head == NULL) { // empty
+ *length = 0;
+ return 0;
+ } else {
+ Node* retEl = head;
+ if (head->getNext() == NULL) {
+ head = NULL;
+ tail = NULL;
+ } else
+ head = head->getNext();
+ char* retVal = retEl->getValue();
+ *length = retEl->getLength();
+ // Prepare retEl for deletion
+ retEl->setNext(NULL);
+ delete retEl;
+ // Return just the value
+ //cout << "Print bytes inside dequeue: ";
+ //IoTRMIUtil::printBytes(*((char**) retVal), *length, false);
+ //cout << "Dequeuing: " << *((char**) retVal) << endl;
+ //cout << "Dequeuing address: " << std::ref(retVal) << endl;
+ return retVal;
}
}
-int main(int argc, char *argv[])
-{
- ConcurrentLinkedListQueue* methodQueue = new ConcurrentLinkedListQueue();
- /*cout << "Dequeue result: " << methodQueue->dequeue() << endl;
- string str = "this is a test string";
- const char* test = str.c_str();
- const char** test2 = &test;
- cout << "Initial result: " << test << endl;
- cout << "Initial result 2: " << *test2 << endl;
- void* ptr = &test;
- cout << "Pointer: " << ptr << endl;
- methodQueue->enqueue(ptr);
- methodQueue->enqueue(ptr);
- void* result = methodQueue->dequeue();
- cout << "Result: " << result << endl;
- cout << "Dequeue result: " << *((const char**) result) << endl;
- void* result2 = methodQueue->dequeue();
- cout << "Dequeue result: " << *((const char**) result2) << endl;
- void* result3 = methodQueue->dequeue();
- cout << "Dequeue result: " << result3 << endl;*/
- //thread t1,t2;
-
- //t1 = thread(enQ, methodQueue);
- //t2 = thread(deQ, methodQueue);
-
- //t1.join();
- //t2.join();
-
- char* test = new char[3];
- test[0] = 'a';
- test[1] = 'b';
- test[2] = 'c';
- void* ptr = &test;
- methodQueue->enqueue(ptr);
- void* result = methodQueue->dequeue();
- cout << "Dequeue result: " << *((char**) result) << endl;
-
- thread t1,t2;
-
- t1 = thread(enQ, methodQueue, &test);
- t2 = thread(deQ, methodQueue);
-
- t1.join();
- t2.join();
-
- return 0;
-}
#include <iostream>
#include <mutex>
+#include "IoTRMIUtil.hpp"
+
/** Class ConcurrentLinkedListQueue is a queue that can handle
* concurrent requests and packets for IoT communication via socket.
* <p>
- * It stores object through a void pointer.
+ * It stores object through a char pointer.
*
* @author Rahmadi Trimananda <rtrimana @ uci.edu>
* @version 1.0
using namespace std;
-mutex mtx;
+mutex queueMutex;
class Node {
private:
Node* next;
- void* value;
+ char* value;
+ int length;
public:
- Node(void* val);
+ Node(char* val, int len);
~Node();
- void* getValue();
+ char* getValue();
+ int getLength();
Node* getNext();
void setNext(Node* nxt);
public:
ConcurrentLinkedListQueue();
~ConcurrentLinkedListQueue();
- void enqueue(void* value); // Enqueue to tail
- void* dequeue(); // Dequeue from tail
+ void enqueue(char* value, int length); // Enqueue to tail
+ char* dequeue(); // Dequeue from tail
+ char* deQAndGetLength(int* length); // Dequeue from tail and return length
};
#endif
void** getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
private:
- map<string,int> mapSign2MethodId;
IoTRMIUtil *rmiUtil;
IoTSocketClient *rmiClient;
--- /dev/null
+/** Class IoTRMIComm combines the functionalities
+ * of IoTRMICall and IoTRMIObject to create a single
+ * communication class with two sockets serving one
+ * directional traffic for each.
+ *
+ * @author Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version 1.0
+ * @since 2017-01-28
+ */
+#ifndef _IOTRMICOMM_HPP__
+#define _IOTRMICOMM_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTSocketServer.hpp"
+#include "IoTSocketClient.hpp"
+#include "ConcurrentLinkedListQueue.cpp"
+
+using namespace std;
+
+std::atomic<bool> didGetMethodBytes(false);
+std::atomic<bool> didGetReturnBytes(false);
+
+mutex regSkelMutex;
+mutex regStubMutex;
+mutex retValMutex;
+mutex remoteCallMutex;
+mutex sendReturnObjMutex;
+
+class IoTRMIComm {
+ public:
+ IoTRMIComm();
+ ~IoTRMIComm();
+ // Public methods
+ virtual void sendReturnObj(void* retObj, string type, char* methodBytes) = 0;
+ virtual void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) = 0;
+ int returnLength(void* retObj[], string retCls[], int numRet);
+ char* returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet);
+ char* getMethodBytes();
+ int getMethodLength();
+ int getObjectIdFromMethod();
+ static int getObjectId(char* packetBytes);
+ static int getMethodId(char* packetBytes);
+ static int getPacketType(char* packetBytes);
+ void** getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes);
+ void registerSkeleton(int objectId, bool* methodReceived);
+ void registerStub(int objectId, int methodId, bool* retValueReceived);
+ int getObjectIdCounter();
+ void setObjectIdCounter(int objIdCounter);
+ void decrementObjectIdCounter();
+
+ int methodLength(string paramCls[], void* paramObj[], int numParam);
+ char* methodToBytes(int objectId, int methId, string paramCls[], void* paramObj[],
+ char* method, int numParam);
+ virtual void remoteCall(int objectId, int methodId, string paramCls[],
+ void* paramObj[], int numParam) = 0;
+ void* getReturnValue(string retType, void* retObj);
+
+ void** getStructObjects(string retType[], int numRet, void* retObj[]);
+ void** getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]);
+
+ protected:
+ IoTRMIUtil *rmiUtil;
+ char* methodBytes;
+ int methodLen;
+ char* retValueBytes;
+ int retValueLen;
+ ConcurrentLinkedListQueue methodQueue;
+ ConcurrentLinkedListQueue returnQueue;
+ map<int,bool*> mapSkeletonId;
+ map<string,bool*> mapStubId;
+ int objectIdCounter = std::numeric_limits<int>::max();
+
+ private:
+ // Private methods
+ void wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm);
+ void wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm);
+};
+
+
+// Constructor
+IoTRMIComm::IoTRMIComm() {
+
+ rmiUtil = new IoTRMIUtil();
+ methodBytes = NULL;
+ retValueBytes = NULL;
+ methodLen = 0;
+ retValueLen = 0;
+ thread th1 (&IoTRMIComm::wakeUpThreadOnMethodCall, this, this);
+ th1.detach();
+ thread th2 (&IoTRMIComm::wakeUpThreadOnReturnValue, this, this);
+ th2.detach();
+
+}
+
+
+// Destructor
+IoTRMIComm::~IoTRMIComm() {
+
+ // Clean up
+ if (rmiUtil != NULL) {
+ delete rmiUtil;
+ rmiUtil = NULL;
+ }
+}
+
+
+void IoTRMIComm::wakeUpThreadOnMethodCall(IoTRMIComm* rmiComm) {
+
+ int methLen = 0;
+ //cout << "Starting wakeUpThreadOnMethodCall()" << endl;
+ while(true) {
+ // Convert back to char*
+ char* queueHead = rmiComm->methodQueue.deQAndGetLength(&methLen);
+ if (queueHead != NULL) {
+ rmiComm->methodBytes = queueHead;
+ rmiComm->methodLen = methLen;
+ //IoTRMIUtil::printBytes(rmiComm->methodBytes, rmiComm->methodLen, false);
+ int currObjId = rmiComm->getObjectId(rmiComm->methodBytes);
+ auto search = rmiComm->mapSkeletonId.find(currObjId);
+ bool* methRecv = search->second;
+ didGetMethodBytes.exchange(false);
+ *methRecv = true;
+ while(!didGetMethodBytes);
+ }
+ }
+}
+
+
+void IoTRMIComm::wakeUpThreadOnReturnValue(IoTRMIComm* rmiComm) {
+
+ int retLen = 0;
+ //cout << "Starting wakeUpThreadOnReturnValue()" << endl;
+ while(true) {
+ // Convert back to char*
+ char* queueHead = rmiComm->returnQueue.deQAndGetLength(&retLen);
+ if (queueHead != NULL) {
+ rmiComm->retValueBytes = queueHead;
+ rmiComm->retValueLen = retLen;
+ //IoTRMIUtil::printBytes(rmiComm->retValueBytes, rmiComm->retValueLen, false);
+ int objectId = rmiComm->getObjectId(rmiComm->retValueBytes);
+ int methodId = rmiComm->getMethodId(rmiComm->retValueBytes);
+ string strKey = to_string(objectId) + "-" + to_string(methodId);
+ auto search = rmiComm->mapStubId.find(strKey);
+ bool* retRecv = search->second;
+ didGetReturnBytes.exchange(false);
+ *retRecv = true;
+ while(!didGetReturnBytes);
+ }
+ }
+}
+
+
+// registerSkeleton() registers the skeleton to be woken up
+void IoTRMIComm::registerSkeleton(int objectId, bool* methodReceived) {
+
+ lock_guard<mutex> guard(regSkelMutex);
+ mapSkeletonId.insert(make_pair(objectId, methodReceived));
+}
+
+
+// registerStub() registers the skeleton to be woken up
+void IoTRMIComm::registerStub(int objectId, int methodId, bool* retValueReceived) {
+
+ lock_guard<mutex> guard(regStubMutex);
+ string strKey = to_string(objectId) + "-" + to_string(methodId);
+ mapStubId.insert(make_pair(strKey, retValueReceived));
+}
+
+
+// getObjectIdCounter() gets object Id counter
+int IoTRMIComm::getObjectIdCounter() {
+
+ return objectIdCounter;
+}
+
+
+// setObjectIdCounter() sets object Id counter
+void IoTRMIComm::setObjectIdCounter(int objIdCounter) {
+
+ objectIdCounter = objIdCounter;
+}
+
+
+// decrementObjectIdCounter() gets object Id counter
+void IoTRMIComm::decrementObjectIdCounter() {
+
+ objectIdCounter--;
+}
+
+
+// Get method bytes from the socket
+char* IoTRMIComm::getMethodBytes() {
+
+ // Get method bytes
+ return methodBytes;
+}
+
+
+// Get method length from the socket
+int IoTRMIComm::getMethodLength() {
+
+ // Get method bytes
+ return methodLen;
+}
+
+
+// Get object Id from bytes
+int IoTRMIComm::getObjectIdFromMethod() {
+
+ char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
+ memcpy(objectIdBytes, methodBytes, IoTRMIUtil::OBJECT_ID_LEN);
+ // Get method signature
+ int objectId = 0;
+ IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
+
+ return objectId;
+}
+
+
+// Get object Id from bytes (static version)
+int IoTRMIComm::getObjectId(char* packetBytes) {
+
+ char objectIdBytes[IoTRMIUtil::OBJECT_ID_LEN];
+ memcpy(objectIdBytes, packetBytes, IoTRMIUtil::OBJECT_ID_LEN);
+ // Get method signature
+ int objectId = 0;
+ IoTRMIUtil::byteArrayToInt(&objectId, objectIdBytes);
+
+ return objectId;
+}
+
+
+// Get methodId from bytes (static version)
+int IoTRMIComm::getMethodId(char* packetBytes) {
+
+ // Get method Id
+ char methodIdBytes[IoTRMIUtil::METHOD_ID_LEN];
+ int offset = IoTRMIUtil::OBJECT_ID_LEN;
+ memcpy(methodIdBytes, packetBytes + offset, IoTRMIUtil::METHOD_ID_LEN);
+ // Get method signature
+ int methodId = 0;
+ IoTRMIUtil::byteArrayToInt(&methodId, methodIdBytes);
+
+ return methodId;
+}
+
+
+// Get methodId from bytes (static version)
+int IoTRMIComm::getPacketType(char* packetBytes) {
+
+ // Get method Id
+ char packetTypeBytes[IoTRMIUtil::METHOD_ID_LEN];
+ int offset = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+ memcpy(packetTypeBytes, packetBytes + offset, IoTRMIUtil::PACKET_TYPE_LEN);
+ // Get method signature
+ int packetType = 0;
+ IoTRMIUtil::byteArrayToInt(&packetType, packetTypeBytes);
+
+ return packetType;
+}
+
+
+// Get method parameters and return an array of parameter objects
+//
+// For primitive objects:
+// | 32-bit method ID | m-bit actual data (fixed length) |
+//
+// For string, arrays, and non-primitive objects:
+// | 32-bit method ID | 32-bit length | n-bit actual data | ...
+void** IoTRMIComm::getMethodParams(string paramCls[], int numParam, void* paramObj[], char* methodBytes) {
+
+ // Byte scanning position
+ int pos = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+ for (int i = 0; i < numParam; i++) {
+ int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+ // Get the 32-bit field in the byte array to get the actual
+ // length (this is a param with indefinite length)
+ if (paramLen == -1) {
+ char bytPrmLen[IoTRMIUtil::PARAM_LEN];
+ memcpy(bytPrmLen, methodBytes + pos, IoTRMIUtil::PARAM_LEN);
+ pos = pos + IoTRMIUtil::PARAM_LEN;
+ int* prmLenPtr = IoTRMIUtil::byteArrayToInt(¶mLen, bytPrmLen);
+ paramLen = *prmLenPtr;
+ }
+ char paramBytes[paramLen];
+ memcpy(paramBytes, methodBytes + pos, paramLen);
+ pos = pos + paramLen;
+ paramObj[i] = IoTRMIUtil::getParamObject(paramObj[i], paramCls[i].c_str(), paramBytes, paramLen);
+ }
+
+ return paramObj;
+}
+
+
+// Find the bytes length of a return object (struct that has more than 1 member)
+int IoTRMIComm::returnLength(void* retObj[], string retCls[], int numRet) {
+
+ // Get byte arrays and calculate return bytes length
+ int returnLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+ for (int i = 0; i < numRet; i++) {
+ // Find the return length
+ int retObjLen = rmiUtil->getTypeSize(retCls[i]);
+ if (retObjLen == -1) { // Store the length of the field - indefinite length
+ retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
+ // Some space for return length, i.e. 32 bits for integer
+ returnLen = returnLen + IoTRMIUtil::RETURN_LEN;
+ }
+ // Calculate returnLen
+ returnLen = returnLen + retObjLen;
+ }
+
+ return returnLen;
+}
+
+
+// Convert return object (struct members) into bytes
+char* IoTRMIComm::returnToBytes(void* retObj[], string retCls[], char* retBytes, int numRet) {
+
+ int pos = 0;
+ // Get byte arrays and calculate return bytes length
+ for (int i = 0; i < numRet; i++) {
+ // Find the return length
+ int retObjLen = rmiUtil->getTypeSize(retCls[i]);
+ if (retObjLen == -1) { // Store the length of the field - indefinite length
+ retObjLen = rmiUtil->getVarTypeSize(retCls[i], retObj[i]);
+ // Write the return length
+ char retLenBytes[IoTRMIUtil::RETURN_LEN];
+ IoTRMIUtil::intToByteArray(retObjLen, retLenBytes);
+ memcpy(retBytes + pos, retLenBytes, IoTRMIUtil::RETURN_LEN);
+ pos = pos + IoTRMIUtil::RETURN_LEN;
+ }
+ // Get array of bytes and put it in the array of array of bytes
+ char objBytes[retObjLen];
+ IoTRMIUtil::getObjectBytes(objBytes, retObj[i], retCls[i].c_str());
+ memcpy(retBytes + pos, objBytes, retObjLen);
+ pos = pos + retObjLen;
+ }
+
+ return retBytes;
+}
+
+
+// Get return value for single values (non-structs)
+void* IoTRMIComm::getReturnValue(string retType, void* retObj) {
+
+ // Receive return value and return it to caller
+ lock_guard<mutex> guard(retValMutex);
+ // Copy just the actual return value bytes
+ int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+ int retActualLen = retValueLen - headerLen;
+ //char *retActualBytes = new char[retActualLen];
+ char retActualBytes[retActualLen];
+ memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
+ //IoTRMIUtil::printBytes(retActualBytes, retActualLen, false);
+ retObj = IoTRMIUtil::getParamObject(retObj, retType.c_str(), retActualBytes, retActualLen);
+ // Delete received bytes object
+ delete[] retValueBytes;
+ //delete[] retActualBytes;
+
+ return retObj;
+}
+
+
+// Get a set of return objects (struct)
+void** IoTRMIComm::getStructObjects(string retType[], int numRet, void* retObj[]) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(retValMutex);
+ // Copy just the actual return value bytes
+ int headerLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+ int retActualLen = retValueLen - headerLen;
+ char retActualBytes[retActualLen];
+ memcpy(retActualBytes, retValueBytes + headerLen, retActualLen);
+ // Return size of array of struct
+ retObj = getReturnObjects(retActualBytes, retType, numRet, retObj);
+ // Delete received bytes object
+ delete[] retValueBytes;
+
+ return retObj;
+}
+
+
+// Find the bytes length of a method
+int IoTRMIComm::methodLength(string paramCls[], void* paramObj[], int numParam) {
+
+ // Get byte arrays and calculate method bytes length
+ // Start from the object Id + method Id...
+ int methodLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN + IoTRMIUtil::PACKET_TYPE_LEN;
+ for (int i = 0; i < numParam; i++) {
+ // Find the parameter length
+ int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+ if (paramLen == -1) { // Store the length of the field - indefinite length
+ paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
+ // Some space for param length, i.e. 32 bits for integer
+ methodLen = methodLen + IoTRMIUtil::PARAM_LEN;
+ }
+ // Calculate methodLen
+ methodLen = methodLen + paramLen;
+ }
+ return methodLen;
+}
+
+
+// Convert method and its parameters into bytes
+char* IoTRMIComm::methodToBytes(int objectId, int methId, string paramCls[],
+ void* paramObj[], char* method, int numParam) {
+
+ // Get object Id in bytes
+ char objId[IoTRMIUtil::OBJECT_ID_LEN];
+ IoTRMIUtil::intToByteArray(objectId, objId);
+ memcpy(method, objId, IoTRMIUtil::OBJECT_ID_LEN);
+ int pos = IoTRMIUtil::OBJECT_ID_LEN;
+ // Get method Id in bytes
+ char methodId[IoTRMIUtil::METHOD_ID_LEN];
+ IoTRMIUtil::intToByteArray(methId, methodId);
+ memcpy(method + pos, methodId, IoTRMIUtil::METHOD_ID_LEN);
+ pos = pos + IoTRMIUtil::METHOD_ID_LEN;
+ char packetType[IoTRMIUtil::PACKET_TYPE_LEN];
+ IoTRMIUtil::intToByteArray(IoTRMIUtil::METHOD_TYPE, methodId);
+ memcpy(method + pos, methodId, IoTRMIUtil::PACKET_TYPE_LEN);
+ pos = pos + IoTRMIUtil::PACKET_TYPE_LEN;
+ // Get byte arrays and calculate method bytes length
+ for (int i = 0; i < numParam; i++) {
+ // Find the parameter length
+ int paramLen = rmiUtil->getTypeSize(paramCls[i]);
+ if (paramLen == -1) { // Store the length of the field - indefinite length
+ paramLen = rmiUtil->getVarTypeSize(paramCls[i], paramObj[i]);
+ // Write the parameter length
+ char prmLenBytes[IoTRMIUtil::PARAM_LEN];
+ IoTRMIUtil::intToByteArray(paramLen, prmLenBytes);
+ memcpy(method + pos, prmLenBytes, IoTRMIUtil::PARAM_LEN);
+ pos = pos + IoTRMIUtil::PARAM_LEN;
+ }
+ // Get array of bytes and put it in the array of array of bytes
+ char objBytes[paramLen];
+ IoTRMIUtil::getObjectBytes(objBytes, paramObj[i], paramCls[i].c_str());
+ memcpy(method + pos, objBytes, paramLen);
+ pos = pos + paramLen;
+ }
+
+ return method;
+}
+
+
+// Get return objects for structs
+void** IoTRMIComm::getReturnObjects(char* retBytes, string retCls[], int numRet, void* retObj[]) {
+
+ // Byte scanning position
+ int pos = 0;
+ for (int i = 0; i < numRet; i++) {
+ int retLen = rmiUtil->getTypeSize(retCls[i]);
+ // Get the 32-bit field in the byte array to get the actual
+ // length (this is a param with indefinite length)
+ if (retLen == -1) {
+ char bytRetLen[IoTRMIUtil::RETURN_LEN];
+ memcpy(bytRetLen, retBytes + pos, IoTRMIUtil::RETURN_LEN);
+ pos = pos + IoTRMIUtil::RETURN_LEN;
+ int* retLenPtr = IoTRMIUtil::byteArrayToInt(&retLen, bytRetLen);
+ retLen = *retLenPtr;
+ }
+ char retObjBytes[retLen];
+ memcpy(retObjBytes, retBytes + pos, retLen);
+ pos = pos + retLen;
+ retObj[i] = IoTRMIUtil::getParamObject(retObj[i], retCls[i].c_str(), retObjBytes, retLen);
+ }
+
+ return retObj;
+}
+#endif
+
+
--- /dev/null
+/** Class IoTRMICommClient implements the client side
+ * of IoTRMIComm class.
+ *
+ * @author Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version 1.0
+ * @since 2017-01-28
+ */
+#ifndef _IOTRMICOMMCLIENT_HPP__
+#define _IOTRMICOMMCLIENT_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTRMIComm.hpp"
+
+using namespace std;
+
+mutex clientRemoteCallMutex;
+mutex clientSendReturnObjMutex;
+
+class IoTRMICommClient : public IoTRMIComm {
+ public:
+ IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult);
+ ~IoTRMICommClient();
+ // Public methods
+ void sendReturnObj(void* retObj, string type, char* methodBytes);
+ void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes);
+ void remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam);
+ //void waitForPackets();
+ //void waitForPackets(IoTRMICommClient* rmiComm);
+
+ private:
+ IoTSocketClient *rmiClientSend;
+ IoTSocketClient *rmiClientRecv;
+
+ // Private methods
+ void waitForPackets(IoTRMICommClient* rmiComm);
+};
+
+
+// Constructor
+IoTRMICommClient::IoTRMICommClient(int _portSend, int _portRecv, const char* _address, int _rev, bool* _bResult) : IoTRMIComm() {
+
+ rmiClientRecv = new IoTSocketClient(_portSend, _address, _rev, _bResult);
+ rmiClientSend = new IoTSocketClient(_portRecv, _address, _rev, _bResult);
+ thread th1 (&IoTRMICommClient::waitForPackets, this, this);
+ th1.detach();
+
+}
+
+
+// Destructor
+IoTRMICommClient::~IoTRMICommClient() {
+
+ // Clean up
+ if (rmiClientRecv != NULL) {
+ delete rmiClientRecv;
+ rmiClientRecv = NULL;
+ }
+ if (rmiClientSend != NULL) {
+ delete rmiClientSend;
+ rmiClientSend = NULL;
+ }
+}
+
+
+void IoTRMICommClient::waitForPackets(IoTRMICommClient* rmiComm) {
+
+ char* packetBytes = NULL;
+ int packetLen = 0;
+ while(true) {
+ fflush(NULL);
+ packetBytes = rmiClientRecv->receiveBytes(packetBytes, &packetLen);
+ fflush(NULL);
+ if (packetBytes != NULL) { // If there is method bytes
+ //IoTRMIUtil::printBytes(packetBytes, packetLen, false);
+ //packetBytesPtr = &packetBytes;
+ int packetType = getPacketType(packetBytes);
+ if (packetType == IoTRMIUtil::METHOD_TYPE) {
+ rmiComm->methodQueue.enqueue(packetBytes, packetLen);
+ } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) {
+ rmiComm->returnQueue.enqueue(packetBytes, packetLen);
+ } else {
+ // TODO: We need to log error message when we come to running this using IoTSlave
+ // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel)
+ cerr << "IoTRMICommClient: Packet type is unknown: " << packetType << endl;
+ exit(1);
+ }
+ }
+ packetBytes = NULL;
+ packetLen = 0;
+ }
+}
+
+
+// Send return values in bytes to the caller
+void IoTRMICommClient::sendReturnObj(void* retObj, string type, char* methodBytes) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendReturnObjMutex);
+ // Find the length of return object in bytes
+ int retLen = rmiUtil->getTypeSize(type);
+ if (retLen == -1) {
+ retLen = rmiUtil->getVarTypeSize(type, retObj);
+ }
+ // Copy the header and object bytes
+ int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+ int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+ char retAllObjBytes[headerLen+retLen];
+ // Copy object and method Id first
+ memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+ // Copy objectId + methodId + packet type in bytes
+ char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+ IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+ memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+ // Copy object into byte array
+ char retObjBytes[retLen];
+ IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str());
+ memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+ fflush(NULL);
+ rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen);
+ fflush(NULL);
+}
+
+
+// Send return values in bytes to the caller (for more than one object - struct)
+void IoTRMICommClient::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendReturnObjMutex);
+ // Find the length of return object in bytes
+ int retLen = returnLength(retObj, type, numRet);
+ // Copy the header and object bytes
+ int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+ int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+ char retAllObjBytes[headerLen+retLen];
+ // Copy object and method Id first
+ memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+ // Copy objectId + methodId + packet type in bytes
+ char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+ IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+ memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+ // Copy object into byte array
+ char retObjBytes[retLen];
+ returnToBytes(retObj, type, retObjBytes, numRet);
+ memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+ fflush(NULL);
+ rmiClientSend->sendBytes(retAllObjBytes, headerLen+retLen);
+ fflush(NULL);
+}
+
+
+// Calls a method remotely by passing in parameters and getting a return object
+void IoTRMICommClient::remoteCall(int objectId, int methodId, string paramCls[],
+ void* paramObj[], int numParam) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(remoteCallMutex);
+ // Send input parameters
+ int len = methodLength(paramCls, paramObj, numParam);
+ char method[len];
+ methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam);
+ // Send bytes
+ fflush(NULL);
+ rmiClientSend->sendBytes(method, len);
+ fflush(NULL);
+
+}
+#endif
+
+
--- /dev/null
+/** Class IoTRMICommServer implements the server side
+ * of IoTRMIComm class.
+ *
+ * @author Rahmadi Trimananda <rtrimana @ uci.edu>
+ * @version 1.0
+ * @since 2017-01-28
+ */
+#ifndef _IOTRMICOMMSERVER_HPP__
+#define _IOTRMICOMMSERVER_HPP__
+
+#include <iostream>
+#include <string>
+#include <atomic>
+#include <limits>
+#include <thread>
+#include <mutex>
+
+#include "IoTRMIComm.hpp"
+
+using namespace std;
+
+
+class IoTRMICommServer : public IoTRMIComm {
+ public:
+ IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult);
+ ~IoTRMICommServer();
+ // Public methods
+ void sendReturnObj(void* retObj, string type, char* methodBytes);
+ void sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes);
+ void remoteCall(int objectId, int methodId, string paramCls[], void* paramObj[], int numParam);
+
+ private:
+ IoTSocketServer *rmiServerSend;
+ IoTSocketServer *rmiServerRecv;
+
+ // Private methods
+ void waitForConnectionOnServerRecv();
+ void waitForConnectionOnServerSend();
+ void waitForPackets(IoTRMICommServer* rmiComm);
+};
+
+
+// Constructor
+IoTRMICommServer::IoTRMICommServer(int _portSend, int _portRecv, bool* _bResult) : IoTRMIComm() {
+
+ rmiServerSend = new IoTSocketServer(_portSend, _bResult);
+ rmiServerRecv = new IoTSocketServer(_portRecv, _bResult);
+ thread th1 (&IoTRMICommServer::waitForConnectionOnServerSend, this);
+ thread th2 (&IoTRMICommServer::waitForConnectionOnServerRecv, this);
+ th1.join();
+ th2.join();
+ thread th3 (&IoTRMICommServer::waitForPackets, this, this);
+ th3.detach();
+}
+
+
+// Destructor
+IoTRMICommServer::~IoTRMICommServer() {
+
+ // Clean up
+ if (rmiServerSend != NULL) {
+ delete rmiServerSend;
+ rmiServerSend = NULL;
+ }
+ if (rmiServerRecv != NULL) {
+ delete rmiServerRecv;
+ rmiServerRecv = NULL;
+ }
+}
+
+
+void IoTRMICommServer::waitForConnectionOnServerRecv() {
+
+ cout << "Wait on connection ServerRecv!" << endl;
+ rmiServerRecv->connect();
+ cout << "Connected on connection ServerRecv!" << endl;
+}
+
+
+void IoTRMICommServer::waitForConnectionOnServerSend() {
+
+ cout << "Wait on connection ServerSend!" << endl;
+ rmiServerSend->connect();
+ cout << "Connected on connection ServerSend!" << endl;
+}
+
+
+void IoTRMICommServer::waitForPackets(IoTRMICommServer* rmiComm) {
+
+ char* packetBytes = NULL;
+ int packetLen = 0;
+ //cout << "Starting waitForPacketsOnServer()" << endl;
+ while(true) {
+ fflush(NULL);
+ packetBytes = rmiComm->rmiServerRecv->receiveBytes(packetBytes, &packetLen);
+ fflush(NULL);
+ if (packetBytes != NULL) { // If there is method bytes
+ //IoTRMIUtil::printBytes(packetBytes, packetLen, false);
+ int packetType = IoTRMIComm::getPacketType(packetBytes);
+ if (packetType == IoTRMIUtil::METHOD_TYPE) {
+ rmiComm->methodQueue.enqueue(packetBytes, packetLen);
+ } else if (packetType == IoTRMIUtil::RET_VAL_TYPE) {
+ rmiComm->returnQueue.enqueue(packetBytes, packetLen);
+ } else {
+ // TODO: We need to log error message when we come to running this using IoTSlave
+ // TODO: Beware that using "cout" in the process will kill it (as IoTSlave is loaded in Sentinel)
+ cerr << "IoTRMICommServer: Packet type is unknown: " << packetType << endl;
+ exit(1);
+ }
+ }
+ packetBytes = NULL;
+ packetLen = 0;
+ }
+}
+
+
+// Send return values in bytes to the caller
+void IoTRMICommServer::sendReturnObj(void* retObj, string type, char* methodBytes) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendReturnObjMutex);
+ // Find the length of return object in bytes
+ int retLen = rmiUtil->getTypeSize(type);
+ if (retLen == -1) {
+ retLen = rmiUtil->getVarTypeSize(type, retObj);
+ }
+ // Copy the header and object bytes
+ int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+ int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+ char retAllObjBytes[headerLen+retLen];
+ // Copy object and method Id first
+ memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+ // Copy objectId + methodId + packet type in bytes
+ char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+ IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+ memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+ // Copy object into byte array
+ char retObjBytes[retLen];
+ IoTRMIUtil::getObjectBytes(retObjBytes, retObj, type.c_str());
+ memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+ fflush(NULL);
+ IoTRMIUtil::printBytes(retAllObjBytes, headerLen+retLen, false);
+ rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen);
+ fflush(NULL);
+}
+
+
+// Send return values in bytes to the caller (for more than one object - struct)
+void IoTRMICommServer::sendReturnObj(void* retObj[], string type[], int numRet, char* methodBytes) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendReturnObjMutex);
+ // Find the length of return object in bytes
+ int retLen = returnLength(retObj, type, numRet);
+ // Copy the header and object bytes
+ int objAndMethIdLen = IoTRMIUtil::OBJECT_ID_LEN + IoTRMIUtil::METHOD_ID_LEN;
+ int headerLen = objAndMethIdLen + IoTRMIUtil::PACKET_TYPE_LEN;
+ char retAllObjBytes[headerLen+retLen];
+ // Copy object and method Id first
+ memcpy(retAllObjBytes, methodBytes, objAndMethIdLen);
+ // Copy objectId + methodId + packet type in bytes
+ char packType[IoTRMIUtil::PACKET_TYPE_LEN];
+ IoTRMIUtil::intToByteArray(IoTRMIUtil::RET_VAL_TYPE, packType);
+ memcpy(retAllObjBytes + objAndMethIdLen, packType, IoTRMIUtil::PACKET_TYPE_LEN);
+ // Copy object into byte array
+ char retObjBytes[retLen];
+ returnToBytes(retObj, type, retObjBytes, numRet);
+ memcpy(retAllObjBytes + headerLen, retObjBytes, retLen);
+ fflush(NULL);
+ rmiServerSend->sendBytes(retAllObjBytes, headerLen+retLen);
+ fflush(NULL);
+}
+
+
+// Calls a method remotely by passing in parameters and getting a return object
+void IoTRMICommServer::remoteCall(int objectId, int methodId, string paramCls[],
+ void* paramObj[], int numParam) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(remoteCallMutex);
+ // Send input parameters
+ int len = methodLength(paramCls, paramObj, numParam);
+ char method[len];
+ methodToBytes(objectId, methodId, paramCls, paramObj, method, numParam);
+ // Send bytes
+ fflush(NULL);
+ rmiServerSend->sendBytes(method, len);
+ fflush(NULL);
+
+}
+#endif
+
+
#include <cstdlib>
#include <memory>
#include <typeinfo>
+#include <map>
#include <iostream>
#include <string>
// Constants
const static int OBJECT_ID_LEN = 4; // 4 bytes = 32 bits
const static int METHOD_ID_LEN = 4; // 4 bytes = 32 bits
+ const static int PACKET_TYPE_LEN = 4;// 4 bytes = 32 bits
const static int PARAM_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the param)
const static int RETURN_LEN = 4; // 4 bytes = 32 bits (4-byte field that stores the length of the return object)
const static int CHAR_LEN = 2; // 2 bytes (we follow Java convention)
const static int BYTE_LEN = 1; // 1 byte
const static int BOOL_LEN = 1; // 1 byte
+ const static int METHOD_TYPE = 1; // Packet type of method
+ const static int RET_VAL_TYPE = -1; // Packet type of return value
+
+ // Static containers
+ static map<int,void*>* mapStub; // Map object to its stub ID
+ static map<void*,void*>* mapSkel; // Map object to its skeleton
+ static map<void*,int>* mapSkelId; // Map object to its skeleton ID
private:
map<string,string> mapPrimitives;
map<string,string> mapNonPrimitives;
};
+map<int,void*>* IoTRMIUtil::mapStub = new map<int,void*>();
+map<void*,void*>* IoTRMIUtil::mapSkel = new map<void*,void*>();
+map<void*,int>* IoTRMIUtil::mapSkelId = new map<void*,int>();
+
// Constructor
IoTRMIUtil::IoTRMIUtil() {
#define SD_SEND 0x01
#define SD_BOTH 0x02
+mutex sendBytesMutex;
+mutex recvBytesMutex;
+mutex sendAckMutex;
+mutex recvAckMutex;
class IoTSocket {
public:
// Send bytes over the wire
bool IoTSocket::sendBytes(char* pVals, int iLen) {
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendBytesMutex);
+
int i = 0;
char size[MSG_LEN_SIZE];
// Convert int to byte array and fix endianness
return false;
}
+ IoTRMIUtil::printBytes(size, 4, false);
+
if (send(m_iSock, (char *) pVals, iLen, 0) == -1) {
perror("IoTSocket: Send bytes error!");
return false;
}
+
#ifdef DEBUG_ACK
if (!receiveAck())
return false;
// Generate an array of char on the heap and return it
char* IoTSocket::receiveBytes(char* pVals, int* len)
{
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(recvBytesMutex);
+
int i = 0;
int j = 0;
char* pTemp = NULL;
if (iTotalBytes == iLen)
bEnd = true;
}
+
#ifdef DEBUG_ACK
if (!sendAck())
return NULL;
// Receive a short ack from the client
bool IoTSocket::receiveAck()
{
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(recvAckMutex);
char temp[1];
int iTotal = 0;
int iResult = 0;
// Send a short ack to the client
bool IoTSocket::sendAck()
{
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendAckMutex);
char temp[1];
temp[0] = 42;
void CallBack::needCallback(TestClassComplete* tc) {
- cout << "Short from TestClass: " << tc->getShort(1234);
+ cout << endl << "Short from TestClass: " << tc->getShort(1234) << endl << endl;
}
#endif
}
-void TestClass::registerCallback(CallBackInterfaceWithCallBack* _cb) {
-
- cbvec.push_back(_cb);
- cout << "Registering callback object!" << endl;
-}
-
-
void TestClass::registerCallbackArray(vector<CallBackInterfaceWithCallBack*> _cb) {
for (CallBackInterfaceWithCallBack* cb : _cb) {
}
+void TestClass::registerCallback(CallBackInterfaceWithCallBack* _cb) {
+
+ cbvec.push_back(_cb);
+ cout << "Registering callback object!" << endl;
+}
+
+
int TestClass::callBack() {
int sum = 0;
for (CallBackInterfaceWithCallBack* cb : cbvec) {
- //cb->needCallback(this);
+ //cout << "Sum: " << sum << endl;
sum = sum + cb->printInt();
+ cb->needCallback(this);
+ //cb->needCallback(this);
+ TestClass* tc = new TestClass();
+ cb->needCallback(tc);
+ //cout << "Sum after: " << sum << endl;
}
+ cout << "About to return sum: " << sum << endl;
return sum;
}
using namespace std;
-TestClassComplete_Stub::TestClassComplete_Stub(int _port, const char* _skeletonAddress, string _callbackAddress, int _rev, bool* _bResult, vector<int> _ports) {
- callbackAddress = _callbackAddress;
- ports = _ports;
- rmiCall = new IoTRMICall(_port, _skeletonAddress, _rev, _bResult);
- set0Allowed.insert(-9998);
- //thread th1 (&TestClassComplete_Stub::___initCallBack, this);
- //th1.detach();
- ___regCB();
+TestClassComplete_Stub::TestClassComplete_Stub(int _portSend, int _portRecv, const char* _skeletonAddress, int _rev, bool* _bResult) {
+ rmiComm = new IoTRMICommClient(_portSend, _portRecv, _skeletonAddress, _rev, _bResult);
+ rmiComm->registerStub(objectId, 0, &retValueReceived0);
+ rmiComm->registerStub(objectId, 2, &retValueReceived2);
+ IoTRMIUtil::mapStub->insert(make_pair(objectId, this));
}
-TestClassComplete_Stub::TestClassComplete_Stub(IoTRMICall* _rmiCall, string _callbackAddress, int _objIdCnt, vector<int> _ports) {
- callbackAddress = _callbackAddress;
- rmiCall = _rmiCall;
- objIdCnt = _objIdCnt;
- set0Allowed.insert(-9998);
- //thread th1 (&TestClassComplete_Stub::___initCallBack, this);
- //th1.detach();
- ___regCB();
+TestClassComplete_Stub::TestClassComplete_Stub(IoTRMIComm* _rmiComm, int _objectId) {
+ rmiComm = _rmiComm;
+ objectId = _objectId;
+ rmiComm->registerStub(objectId, 0, &retValueReceived0);
+ rmiComm->registerStub(objectId, 2, &retValueReceived2);
}
TestClassComplete_Stub::~TestClassComplete_Stub() {
- if (rmiCall != NULL) {
- delete rmiCall;
- rmiCall = NULL;
- }
- if (rmiObj != NULL) {
- delete rmiObj;
- rmiObj = NULL;
+ if (rmiComm != NULL) {
+ delete rmiComm;
+ rmiComm = NULL;
}
for(CallBackInterface* cb : vecCallbackObj) {
delete cb;
}
}
+mutex mtxMethodExec1; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
void TestClassComplete_Stub::registerCallback(CallBackInterface* _cb) {
- //CallBackInterface_CallbackSkeleton* skel0 = new CallBackInterface_CallbackSkeleton(_cb, callbackAddress, objIdCnt++);
- CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, callbackAddress, objIdCnt++);
- vecCallbackObj.push_back(skel0);
- int ___paramCB0 = 1;
+ lock_guard<mutex> guard(mtxMethodExec1);
+ int objIdSent = 0;
+ auto it = IoTRMIUtil::mapSkel->find(_cb);
+ if (it == IoTRMIUtil::mapSkel->end()) { // Not in the map, so new object
+ objIdSent = rmiComm->getObjectIdCounter();
+ rmiComm->decrementObjectIdCounter();
+ CallBackInterface_Skeleton* skel0 = new CallBackInterface_Skeleton(_cb, rmiComm, objIdSent);
+ vecCallbackObj.push_back(skel0);
+ IoTRMIUtil::mapSkel->insert(make_pair(_cb, skel0));
+ IoTRMIUtil::mapSkelId->insert(make_pair(_cb, objIdSent));
+ cout << "Create new skeleton for TestClass! ID=" << objIdSent << endl;
+ //thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0));
+ thread th0 (&CallBackInterface_Skeleton::___waitRequestInvokeMethod, std::ref(skel0), std::ref(skel0));
+ th0.detach();
+ //while(!didAlreadyInitWaitInvoke);
+ while(!skel0->didInitWaitInvoke());
+ } else {
+ auto itId = IoTRMIUtil::mapSkelId->find(_cb);
+ objIdSent = itId->second;
+ cout << "Skeleton exists for TestClass! ID=" << objIdSent << endl;
+ }
+
+ //int ___paramCB0 = 1;
+ int ___paramCB0 = objIdSent;
int methodId = 1;
string retType = "void";
int numParam = 1;
string paramCls[] = { "int" };
void* paramObj[] = { &___paramCB0 };
void* retObj = NULL;
- rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
-}
-
-void TestClassComplete_Stub::___regCB() {
- int numParam = 3;
- int methodId = -9999;
- string retType = "void";
- string paramCls[] = { "int*", "String", "int" };
- int rev = 0;
- void* paramObj[] = { &ports, &callbackAddress, &rev };
- void* retObj = NULL;
- rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+ rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
}
-short TestClassComplete_Stub::getShort(short in) {
+mutex mtxMethodExec0; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
+short TestClassComplete_Stub::getShort(short in) {
+ lock_guard<mutex> guard(mtxMethodExec0);
+ cout << "getShort() is called!!!" << endl << endl;
int methodId = 0;
string retType = "short";
int numParam = 1;
void* paramObj[] = { &in };
short retVal = 0;
void* retObj = &retVal;
- rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+ cout << "Calling remote call!" << endl;
+ rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
+ cout << "Finished calling remote call!" << endl;
+ // Waiting for return value
+ while(!retValueReceived0);
+ rmiComm->getReturnValue(retType, retObj);
+ //retValueReceived0.exchange(false);
+ retValueReceived0 = false;
+ didGetReturnBytes.exchange(true);
+ cout << "Getting return value for getShort(): " << retVal << endl;
+
return retVal;
}
-int TestClassComplete_Stub::callBack() {
+mutex mtxMethodExec2; // TODO: We probably need to correlate this always with class name, e.g. methodExecCallBackInterfaceWithCallBack
+int TestClassComplete_Stub::callBack() {
+ lock_guard<mutex> guard(mtxMethodExec2);
int methodId = 2;
string retType = "int";
int numParam = 0;
void* paramObj[] = { };
int retVal = 0;
void* retObj = &retVal;
- rmiCall->remoteCall(objectId, methodId, retType, paramCls, paramObj, numParam, retObj);
+ rmiComm->remoteCall(objectId, methodId, paramCls, paramObj, numParam);
+ // Waiting for return value
+ while(!retValueReceived2);
+ rmiComm->getReturnValue(retType, retObj);
+ //retValueReceived2.exchange(false);
+ retValueReceived2 = false;
+ didGetReturnBytes.exchange(true);
+
+ cout << "Getting return value for callback(): " << retVal << endl;
+
return retVal;
}
int main(int argc, char *argv[])
{
- int port = 5010;
+ int portSend = 5000;
+ int portRecv = 6000;
const char* address = "localhost";
//const char* address = "192.168.2.191"; // RPi2
//const char* skeletonAddress = "128.195.136.170"; // dc-9.calit2.uci.edu
//const char* callbackAddress = "192.168.2.191"; // RPi2
int rev = 0;
bool bResult = false;
- vector<int> ports;
- ports.push_back(12345);
- ports.push_back(22346);
+ //vector<int> ports;
+ //ports.push_back(12345);
+ //ports.push_back(22346);
//ports.push_back(32344);
//ports.push_back(43212);
- TestClassComplete *tcStub = new TestClassComplete_Stub(port, skeletonAddress, callbackAddress, rev, &bResult, ports);
+ TestClassComplete *tcStub = new TestClassComplete_Stub(portSend, portRecv, skeletonAddress, rev, &bResult);
+ //cout << "Getting return value from getShort(): " << tcStub->getShort(1234) << endl;
+ //cout << "Getting return value from getShort(): " << tcStub->getShort(4321) << endl;
+ //cout << "Getting return value from getShort(): " << tcStub->getShort(5678) << endl;
cout << "==== CALLBACK ====" << endl;
CallBackInterface *cbSingle = new CallBack(2354);
tcStub->registerCallback(cbSingle);
+ //tcStub->registerCallback(cbSingle);
+ CallBackInterface *cbSingle1 = new CallBack(2646);
+ tcStub->registerCallback(cbSingle1);
+ CallBackInterface *cbSingle2 = new CallBack(2000);
+ tcStub->registerCallback(cbSingle2);
cout << "Return value from callback: " << tcStub->callBack() << endl;
+ //cout << "Return value from callback: " << tcStub->callBack() << endl;
+
+ // TODO: we need this while loop at the end to keep the threads running
+ while(true);
return 0;
}
using namespace std;
-TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, string _callbackAddress, int _port) {
+TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _portSend, int _portRecv) {
bool _bResult = false;
mainObj = _mainObj;
- callbackAddress = _callbackAddress;
- rmiObj = new IoTRMIObject(_port, &_bResult);
- set0Allowed.insert(-9999);
- ___waitRequestInvokeMethod();
+ rmiComm = new IoTRMICommServer(_portSend, _portRecv, &_bResult);
+ IoTRMIUtil::mapSkel->insert(make_pair(_mainObj, this));
+ IoTRMIUtil::mapSkelId->insert(make_pair(_mainObj, objectId));
+ rmiComm->registerSkeleton(objectId, &methodReceived);
+ thread th1 (&TestClassInterface_Skeleton::___waitRequestInvokeMethod, this, this);
+// th1.detach();
+ th1.join();
}
-TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, int _objIdCnt, string _callbackAddress) {
+TestClassInterface_Skeleton::TestClassInterface_Skeleton(TestClassInterface *_mainObj, IoTRMIComm *_rmiComm, int _objectId) {
bool _bResult = false;
mainObj = _mainObj;
- objIdCnt = _objIdCnt;
- callbackAddress = _callbackAddress;
- set0Allowed.insert(-9999);
- ___waitRequestInvokeMethod();
+ rmiComm = _rmiComm;
+ objectId = _objectId;
+ rmiComm->registerSkeleton(objectId, &methodReceived);
}
TestClassInterface_Skeleton::~TestClassInterface_Skeleton() {
- if (rmiObj != NULL) {
- delete rmiObj;
- rmiObj = NULL;
- }
- if (rmiCall != NULL) {
- delete rmiCall;
- rmiCall = NULL;
+ if (rmiComm != NULL) {
+ delete rmiComm;
+ rmiComm = NULL;
}
for(CallBackInterfaceWithCallBack* cb : vecCallbackObj) {
delete cb;
}
}
+bool TestClassInterface_Skeleton::didInitWaitInvoke() {
+
+ return didAlreadyInitWaitInvoke;
+}
+
short TestClassInterface_Skeleton::getShort(short in) {
return mainObj->getShort(in);
}
mainObj->registerCallback(_cb);
}
-void TestClassInterface_Skeleton::___regCB() {
- int numParam = 3;
- vector<int> param1;
- string param2 = "";
- int param3 = 0;
- string paramCls[] = { "int*", "String", "int" };
- void* paramObj[] = { ¶m1, ¶m2, ¶m3 };
- rmiObj->getMethodParams(paramCls, numParam, paramObj);
- bool bResult = false;
- rmiCall = new IoTRMICall(param1[1], param2.c_str(), param3, &bResult);
-}
-
int TestClassInterface_Skeleton::callBack() {
return mainObj->callBack();
}
-void TestClassInterface_Skeleton::___getShort() {
+void TestClassInterface_Skeleton::___getShort(TestClassInterface_Skeleton* skel) {
+ char* localMethodBytes = new char[methodLen];
+ memcpy(localMethodBytes, skel->methodBytes, methodLen);
+ //cout << "Bytes inside getShort: " << endl;
+ //IoTRMIUtil::printBytes(localMethodBytes, methodLen, false);
+ didGetMethodBytes.exchange(true);
string paramCls[] = { "short" };
int numParam = 1;
short in;
void* paramObj[] = { &in };
- rmiObj->getMethodParams(paramCls, numParam, paramObj);
+ skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
short retVal = getShort(in);
+ cout << "Getting return value getShort(): " << retVal << endl;
void* retObj = &retVal;
- rmiObj->sendReturnObj(retObj, "short");
+ skel->rmiComm->sendReturnObj(retObj, "short", localMethodBytes);
+ cout << "Sent return value for getShort()" << endl;
+ delete[] localMethodBytes;
}
-void TestClassInterface_Skeleton::___registerCallback() {
+void TestClassInterface_Skeleton::___registerCallback(TestClassInterface_Skeleton* skel) {
+ char* localMethodBytes = new char[methodLen];
+ memcpy(localMethodBytes, skel->methodBytes, methodLen);
+ didGetMethodBytes.exchange(true);
string paramCls[] = { "int" };
int numParam = 1;
int numStubs0 = 0;
void* paramObj[] = { &numStubs0 };
- rmiObj->getMethodParams(paramCls, numParam, paramObj);
- //CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_CallbackStub(rmiCall, callbackAddress, objIdCnt, ports);
- CallBackInterfaceWithCallBack* stub0 = new CallBackInterfaceWithCallBack_Stub(rmiCall, callbackAddress, objIdCnt, ports);
- vecCallbackObj.push_back(stub0);
- objIdCnt++;
- registerCallback(stub0);
+ skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
+ // Choosing the right stub
+ int objIdRecv = numStubs0;
+ CallBackInterfaceWithCallBack* stub0 = NULL;
+ auto it = IoTRMIUtil::mapStub->find(objIdRecv);
+ if (it == IoTRMIUtil::mapStub->end()) { // Not in the map, so new object
+ stub0 = new CallBackInterfaceWithCallBack_Stub(rmiComm, objIdRecv);
+ IoTRMIUtil::mapStub->insert(make_pair(objIdRecv, stub0));
+ cout << "Create new stub for Callback! ID=" << objIdRecv << endl;
+ rmiComm->setObjectIdCounter(objIdRecv);
+ rmiComm->decrementObjectIdCounter();
+ } else {
+ stub0 = (CallBackInterfaceWithCallBack_Stub*) it->second;
+ cout << "Stub exists for Callback! ID=" << objIdRecv << endl;
+ }
+ skel->vecCallbackObj.push_back(stub0);
+ skel->registerCallback(stub0);
+ delete[] localMethodBytes;
}
-void TestClassInterface_Skeleton::___callBack() {
+void TestClassInterface_Skeleton::___callBack(TestClassInterface_Skeleton* skel) {
+ char* localMethodBytes = new char[methodLen];
+ memcpy(localMethodBytes, skel->methodBytes, methodLen);
+ didGetMethodBytes.exchange(true);
string paramCls[] = { };
int numParam = 0;
void* paramObj[] = { };
- rmiObj->getMethodParams(paramCls, numParam, paramObj);
+ skel->rmiComm->getMethodParams(paramCls, numParam, paramObj, localMethodBytes);
int retVal = callBack();
void* retObj = &retVal;
- rmiObj->sendReturnObj(retObj, "int");
+ skel->rmiComm->sendReturnObj(retObj, "int", localMethodBytes);
+ delete[] localMethodBytes;
}
-void TestClassInterface_Skeleton::___waitRequestInvokeMethod() {
+void TestClassInterface_Skeleton::___waitRequestInvokeMethod(TestClassInterface_Skeleton* skel) {
+ cout << "Running loop!" << endl;
+ //didAlreadyInitWaitInvoke.exchange(true);
+ skel->didAlreadyInitWaitInvoke = true;
while (true) {
- rmiObj->getMethodBytes();
- int _objectId = rmiObj->getObjectId();
- int methodId = rmiObj->getMethodId();
- if (_objectId == object0Id) {
- if (set0Allowed.find(methodId) == set0Allowed.end()) {
+ if (!methodReceived)
+ continue;
+ skel->methodBytes = skel->rmiComm->getMethodBytes();
+ skel->methodLen = skel->rmiComm->getMethodLength();
+ cout << endl;
+ // TODO: Get method length as well!!!
+ //methodReceived.exchange(false);
+ methodReceived = false;
+ int _objectId = skel->rmiComm->getObjectId(skel->methodBytes);
+ int methodId = skel->rmiComm->getMethodId(skel->methodBytes);
+ if (_objectId == objectId) {
+ if (skel->set0Allowed.find(methodId) == skel->set0Allowed.end()) {
cerr << "Object with object Id: " << _objectId << " is not allowed to access method: " << methodId << endl;
return;
}
}
- else {
- cerr << "Object Id: " << _objectId << " not recognized!" << endl;
- return;
- }
+ else
+ continue;
switch (methodId) {
- case 0: ___getShort(); break;
- case 1: ___registerCallback(); break;
- case 2: ___callBack(); break;
- case -9999: ___regCB(); break;
+ case 0: { thread th0 (&TestClassInterface_Skeleton::___getShort, std::ref(skel), skel); th0.detach(); break; }
+ //___getShort(skel); break;
+ case 1: { thread th1 (&TestClassInterface_Skeleton::___registerCallback, std::ref(skel), skel); th1.detach(); break; }
+ //___registerCallback(skel); break;
+ case 2: { thread th2 (&TestClassInterface_Skeleton::___callBack, std::ref(skel), skel); th2.detach(); break; }
+ //___callBack(skel); break;
default:
cerr << "Method Id " << methodId << " not recognized!" << endl;
- throw exception();
+ //throw exception();
+ return;
}
+ cout << "Out of switch statement!" << endl;
}
}
+
int main(int argc, char *argv[])
{
// First argument is port number
cout << argv3 << endl;
cout << argv4 << endl;*/
- int port = 5010;
+ int portSend = 5000;
+ int portRecv = 6000;
//TestClassInterface *tc = new TestClass(argv2, argv3, argv4);
TestClassInterface *tc = new TestClass(123, 2.345, "test");
- TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, "localhost", port);
+ TestClassInterface_Skeleton *tcSkel = new TestClassInterface_Skeleton(tc, portSend, portRecv);
//delete tc;
//delete tcSkel;
System.out.println("Registered callback!");
System.out.println("Return value from callback 1: " + tcstub.callBack() + "\n\n");
- //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n");
- //System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
- //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n");
- //System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n");
+ System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)4576) + "\n\n");
+ System.out.println("Return value from callback 2: " + tcstub.callBack() + "\n\n");
+ System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1233) + "\n\n");
+ System.out.println("\n\nCalling short one more time value: " + tcstub.getShort((short)1321) + "\n\n");
while(true) {}
}
}