#define DEBUG_ACK
static const int SOCKET_BUFF_SIZE = 64000;
+// Before, it was too short as we were just using 1 byte to receive the length
+// Now, we allocate 4 bytes (a size of integer) to receive the message length
+static const int MSG_LEN_SIZE = 4;
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <unistd.h>
+#include <mutex>
+
+#include "IoTRMIUtil.hpp"
// Duplicated from winsock2.h
#define SD_RECEIVE 0x00
#define SD_SEND 0x01
#define SD_BOTH 0x02
+mutex sendBytesMutex;
+mutex recvBytesMutex;
+mutex sendAckMutex;
+mutex recvAckMutex;
class IoTSocket {
public:
IoTSocket(int iPort, bool* pResult);
~IoTSocket();
- bool close(); // Close the socket
- bool sendBytes(char* pVals, int _iLen); // Send a set of bytes
- char* receiveBytes(char* pVals, int* len); // Receive a set of bytes
+ bool close(); // Close the socket
+ bool sendBytes(char* pVals, int _iLen); // Send a set of bytes
+ char* receiveBytes(char* pVals, int* len); // Receive a set of bytes
protected:
- int m_iPort; // Port I'm listening on
- int m_iSock; // Socket connection
- struct sockaddr_in m_addrRemote; // Connector's address information
- double* m_pBuffer; // Reuse the same memory for buffer
+ int m_iPort; // Port I'm listening on
+ int m_iSock; // Socket connection
+ struct sockaddr_in m_addrRemote; // Connector's address information
+ double* m_pBuffer; // Reuse the same memory for buffer
private:
- bool receiveAck();
- bool sendAck();
+ bool receiveAck();
+ bool sendAck();
};
// Send bytes over the wire
-bool IoTSocket::sendBytes(char* pVals, int _iLen) {
+bool IoTSocket::sendBytes(char* pVals, int iLen) {
+
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendBytesMutex);
int i = 0;
- int size[1];
- int iLen = _iLen;
- size[0] = iLen;
+ char size[MSG_LEN_SIZE];
+ // Convert int to byte array and fix endianness
+ IoTRMIUtil::intToByteArray(iLen, size);
- if (send(m_iSock, size, 1, 0) == -1) {
+ if (send(m_iSock, size, MSG_LEN_SIZE, 0) == -1) {
perror("IoTSocket: Send size error!");
return false;
}
perror("IoTSocket: Send bytes error!");
return false;
}
+
#ifdef DEBUG_ACK
if (!receiveAck())
return false;
// Generate an array of char on the heap and return it
char* IoTSocket::receiveBytes(char* pVals, int* len)
{
- int i = 0;
- int j = 0;
- char* pTemp = NULL;
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(recvBytesMutex);
+
+ int i = 0;
+ int j = 0;
+ char* pTemp = NULL;
int iTotalBytes = 0;
int iNumBytes = 0;
- bool bEnd = false;
+ bool bEnd = false;
int iTotal = 0;
int iResult = 0;
- char size[1];
+ char size[MSG_LEN_SIZE];
+
while ((iTotal < 1) && (iResult != -1)) {
- iResult = recv(m_iSock, size, 1, 0);
+ iResult = recv(m_iSock, size, MSG_LEN_SIZE, 0);
iTotal += iResult;
}
+
if (iResult == -1) {
perror("IoTSocket: Receive size error!");
return NULL;
}
- int iLen = (int) size[0];
+ // Convert byte to int array based on correct endianness
+ int iLen = 0;
+ IoTRMIUtil::byteArrayToInt(&iLen, size);
+
// To be returned from this method...
*len = iLen;
pVals = new char[iLen];
if (iTotalBytes == iLen)
bEnd = true;
}
+
#ifdef DEBUG_ACK
if (!sendAck())
return NULL;
if (!receiveAck())
return NULL;
#endif
- cout << "Socket 6!" << endl;
return pVals;
}
// Receive a short ack from the client
bool IoTSocket::receiveAck()
{
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(recvAckMutex);
char temp[1];
int iTotal = 0;
int iResult = 0;
// Send a short ack to the client
bool IoTSocket::sendAck()
{
+ // Critical section that is used by different objects
+ lock_guard<mutex> guard(sendAckMutex);
char temp[1];
temp[0] = 42;