From: adash <adash>
Date: Fri, 21 Mar 2008 21:20:16 +0000 (+0000)
Subject: Added send_data() and recv_data() methods for send() and recv()
X-Git-Tag: preEdgeChange~221
X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=b6e00731883c92ffe4c126b9e14877ca76320ef6;p=IRC.git

Added send_data() and recv_data() methods for send() and recv()
---

diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h
index 09684f9c..6c893cfc 100644
--- a/Robust/src/Runtime/DSTM/interface/dstm.h
+++ b/Robust/src/Runtime/DSTM/interface/dstm.h
@@ -5,13 +5,16 @@
 #define MSG_NOSIGNAL 0
 #endif
 
+/***********************************************************
+ *       Macros
+ **********************************************************/
 #define GET_NTUPLES(x) 	((int *)(x + sizeof(prefetchqelem_t)))
 #define GET_PTR_OID(x) 	((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
 #define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
 #define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-
-
-//Coordinator Messages
+/*****************************************
+ *  Coordinator Messages
+ ***************************************/
 #define READ_REQUEST 		1
 #define READ_MULT_REQUEST 	2
 #define MOVE_REQUEST 		3
@@ -22,7 +25,9 @@
 #define TRANS_PREFETCH		8
 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING	9
 
-//Participant Messages
+/*********************************
+ * Participant Messages
+ *******************************/
 #define OBJECT_FOUND			10
 #define OBJECT_NOT_FOUND		11
 #define OBJECTS_FOUND 			12
@@ -38,13 +43,13 @@
 #define THREAD_NOTIFY_RESPONSE		25
 #define TRANS_UNSUCESSFUL		26
 
-//Control bits for status of objects in Machine pile
-#define OBJ_LOCKED_BUT_VERSION_MATCH	14
-#define OBJ_UNLOCK_BUT_VERSION_MATCH	15
-#define VERSION_NO_MATCH		16
-
 //Max number of objects 
 #define MAX_OBJECTS  20
+//Max remote-machine connections
+#define NUM_MACHINES 2
+#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
+//Transaction id per machine
+#define TID_LEN 20
 
 
 #include <stdlib.h>
@@ -57,8 +62,6 @@
 #include "threadnotify.h"
 
 
-#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
-#define TID_LEN 20
 //bit designations for status field of objheader
 #define DIRTY 0x01
 #define NEW   0x02
@@ -127,6 +130,7 @@ typedef struct transrecord {
   struct ___Object___ * revertlist;
 #endif
 } transrecord_t;
+
 // Structure is a shared structure that keeps track of responses from the participants
 typedef struct thread_response {
   char rcv_status;
@@ -188,12 +192,14 @@ typedef struct local_thread_data_array {
 
 //Structure to store mid and socketid information
 typedef struct midSocketInfo {
-	unsigned int mid;
+	unsigned int mid;		/* To communicate with mid use sockid in this data structure*/
 	int sockid;
 } midSocketInfo_t;
 
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
+void send_data(int fd, void *buf, int buflen);
+void recv_data(int fd, void *buf, int buflen);
 
 /* Prototypes for object header */
 unsigned int getNewOID(void);
@@ -229,13 +235,13 @@ void mapObjMethod(unsigned short);
 void randomdelay();
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *, unsigned int);
-objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
+objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header
 int transCommit(transrecord_t *record); //return 0 if successful
 void *transRequest(void *);	//the C routine that the thread will execute when TRANS_REQUEST begins
 void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
 char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-void *handleLocalReq(void *);
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);// returns object header from main object store after object is copied into it from remote machine
+void *handleLocalReq(void *);//handles Local requests 
 int transComProcess(local_thread_data_array_t *);
 int transAbortProcess(local_thread_data_array_t *);
 void transAbort(transrecord_t *trans);
@@ -244,8 +250,8 @@ void prefetch(int, unsigned int *, unsigned short *, short*);
 void *transPrefetch(void *);
 void *mcqProcess(void *);
 void checkPrefetchTuples(prefetchqelem_t *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);
-prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
+prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets)
 void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
 int transPrefetchProcess(transrecord_t *, int **, short);
 void sendPrefetchReq(prefetchpile_t*, int);
diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c
index e76ab19f..672a955c 100644
--- a/Robust/src/Runtime/DSTM/interface/dstmserver.c
+++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c
@@ -27,6 +27,14 @@ extern int classsize[];
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
+/**********************************************************
+ * Global variables to map socketid and remote mid
+ * to  resuse sockets
+ **************************************************/
+midSocketInfo_t sockArray[NUM_MACHINES];
+int sockCount; //number of connections with all remote machines(one socket per mc)
+int sockIdFound; //track if socket file descriptor is already established
+pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
 
 /* This function initializes the main objects store and creates the 
  * global machine and location lookup table */
@@ -47,6 +55,14 @@ int dstmInit(void)
 	if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
 		return 1; //failure
 	
+	//Initialize mid to socketid mapping array
+	int t;
+	sockCount = 0;
+	for(t = 0; t < NUM_MACHINES; t++) {
+		sockArray[t].mid = 0;
+		sockArray[t].sockid = 0;
+	}
+
 	return 0;
 }
 
@@ -112,7 +128,7 @@ void *dstmListen()
  * and accordingly calls other functions to process new requests */
 void *dstmAccept(void *acceptfd)
 {
-	int val, retval, size, sum;
+	int val, retval, size, sum, sockid;
 	unsigned int oid;
 	char *buffer;
 	char control,ctrl;
@@ -130,18 +146,12 @@ void *dstmAccept(void *acceptfd)
 	transinfo.numnotfound = 0;
 
 	/* Receive control messages from other machines */
-	if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
-		printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
-		pthread_exit(NULL);
-	}
-	
+	recv_data((int)acceptfd, &control, sizeof(char));
+
 	switch(control) {
 		case READ_REQUEST:
 			/* Read oid requested and search if available */
-			if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
-				perror("Error: receiving 0x0 object from cooridnator\n");
-				pthread_exit(NULL);
-			}
+			recv_data((int)acceptfd, &oid, sizeof(unsigned int));
 			if((srcObj = mhashSearch(oid)) == NULL) {
 				printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
 				pthread_exit(NULL);
@@ -149,25 +159,17 @@ void *dstmAccept(void *acceptfd)
 			h = (objheader_t *) srcObj;
 			GETSIZE(size, h);
 			size += sizeof(objheader_t);
+			sockid = (int) acceptfd;
 
 			if (h == NULL) {
 				ctrl = OBJECT_NOT_FOUND;
-				if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-					perror("Error sending control msg to coordinator\n");
-					pthread_exit(NULL);
-				}
+				send_data(sockid, &ctrl, sizeof(char));
 			} else {
 				/* Type */
 				char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
 				*((int *)&msg[1])=size;
-				if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
-					perror("Error sending size of object to coordinator\n");
-					pthread_exit(NULL);
-				}
-				if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
-					perror("Error in sending object\n");
-					pthread_exit(NULL);
-				}
+				send_data(sockid, &msg, sizeof(msg));
+				send_data(sockid, h, size);
 			}
 			break;
 		
@@ -191,51 +193,34 @@ void *dstmAccept(void *acceptfd)
 			do {
 				if((val = prefetchReq((int)acceptfd)) != 0) {
 					printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
-					pthread_exit(NULL);
-				}
-
-				if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) < 0) {
-					printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
-					pthread_exit(NULL);
-				} else if(retval == 0) {
-					printf("%s() Error: socket closed at the requesting side\n");
-					pthread_exit(NULL);
+					break;
 				}
-
+				recv_data((int)acceptfd, &control, sizeof(char));
 			} while (control == TRANS_PREFETCH);
-
 			break;
 		case TRANS_PREFETCH_RESPONSE:
-			if((val = getPrefetchResponse((int) acceptfd)) != 0) {
-				printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
-				pthread_exit(NULL);
-			}
+			//do {
+				if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+					printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+					pthread_exit(NULL);
+				}
+			//} while (control == TRANS_PREFETCH_RESPONSE);
 			break;
 		case START_REMOTE_THREAD:
-			retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
-			if (retval <= 0)
-				perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
-			else if (retval != sizeof(unsigned int))
-				printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
-					retval, __FILE__, __LINE__);
-			else
-			{
-				objType = getObjType(oid);
-				startDSMthread(oid, objType);
-			}
+			recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+			objType = getObjType(oid);
+			startDSMthread(oid, objType);
 			break;
 
 		case THREAD_NOTIFY_REQUEST:
-			retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
+			recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
 			size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
 			if((buffer = calloc(1,size)) == NULL) {
 				printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
 				pthread_exit(NULL);
 			}
-			sum = 0;
-			do {
-				sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
-			} while(sum < size);
+
+			recv_data((int)acceptfd, buffer, size);
 
 			oidarry = calloc(numoid, sizeof(unsigned int)); 
 			memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
@@ -258,10 +243,7 @@ void *dstmAccept(void *acceptfd)
 				pthread_exit(NULL);
 			}
 
-			sum = 0;
-			do {
-				sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
-			} while(sum < size);
+			recv_data((int)acceptfd, buffer, size);
 
 			oid = *((unsigned int *)buffer);
 			size = sizeof(unsigned int);
@@ -290,41 +272,30 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
 	unsigned int *oidmod, oid;
 	fixed_data_t fixed;
 	objheader_t *headaddr;
-	int sum = 0, i, N, n, val;
+	int sum, i, size, n, val;
 
 	oidmod = NULL;
 
 	/* Read fixed_data_t data structure */ 
-	N = sizeof(fixed) - 1;
+	size = sizeof(fixed) - 1;
 	ptr = (char *)&fixed;;
 	fixed.control = TRANS_REQUEST;
-	do {
-		n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
-		sum += n;
-	} while(sum < N && n != 0); 
+	recv_data((int)acceptfd, ptr+1, size);
 
 	/* Read list of mids */
 	int mcount = fixed.mcount;
-	N = mcount * sizeof(unsigned int);
+	size = mcount * sizeof(unsigned int);
 	unsigned int listmid[mcount];
 	ptr = (char *) listmid;
-	sum = 0;
-	do {
-		n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
-		sum += n;
-	} while(sum < N && n != 0);
-
+	recv_data((int)acceptfd, ptr, size);
+	
 	/* Read oid and version tuples for those objects that are not modified in the transaction */
 	int numread = fixed.numread;
-	N = numread * (sizeof(unsigned int) + sizeof(unsigned short));
-	char objread[N];
+	size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+	char objread[size];
 	if(numread != 0) { //If pile contains more than one object to be read, 
 			  // keep reading all objects
-		sum = 0;
-		do {
-			n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0);
-			sum += n;
-		} while(sum < N && n != 0);
+		recv_data((int)acceptfd, objread, size);	
 	}
 	
 	/* Read modified objects */
@@ -333,11 +304,8 @@ int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
 			printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
 			return 1;
 		}
-		sum = 0;
-		do { // Recv the objs that are modified by the Coordinator
-			n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
-			sum += n;
-		} while (sum < fixed.sum_bytes && n != 0);
+		size = fixed.sum_bytes;
+		recv_data((int)acceptfd, modptr, size);	
 	}
 
 	/* Create an array of oids for modified objects */
@@ -391,10 +359,8 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 		return 1;
 	}
 
-	do {
-		retval = recv((int)acceptfd, &control, sizeof(char), 0);
-	} while(retval < sizeof(char));
-
+	recv_data((int)acceptfd, &control, sizeof(char));
+	
 	/* Process the new control message */
 	switch(control) {
 		case TRANS_ABORT:
@@ -411,17 +377,7 @@ int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
 
 			/* Send ack to Coordinator */
 			sendctrl = TRANS_UNSUCESSFUL;
-			if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-				perror("Error: In sending ACK to coordinator\n");
-				if (transinfo->objlocked != NULL) {
-					free(transinfo->objlocked);
-				}
-				if (transinfo->objnotfound != NULL) {
-					free(transinfo->objnotfound);
-				}
-
-				return 1;
-			}
+			send_data((int)acceptfd, &sendctrl, sizeof(char));
 			break;
 
 		case TRANS_COMMIT:
@@ -522,10 +478,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 						}
 						free(oidlocked);
 					}
-					if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-						perror("Error in sending control to the Coordinator\n");
-						return 0;
-					}
+					send_data(acceptfd, &control, sizeof(char));
 					return control;
 				}
 			} else {/* If Obj is not locked then lock object */
@@ -550,11 +503,7 @@ char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigne
 					}
 
 					/* Send TRANS_DISAGREE to Coordinator */
-					if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-						perror("Error in sending control to the Coordinator\n");
-						return 0;
-					}
-					
+					send_data(acceptfd, &control, sizeof(char));
 					return control;
 				}
 			}
@@ -583,34 +532,22 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
 	if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
 		control = TRANS_AGREE;
 		/* Send control message */
-		if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-			perror("Error in sending control to Coordinator\n");
-			return 0;
-		}
+		send_data(acceptfd, &control, sizeof(char));
 	}
 	/* Condition to send TRANS_SOFT_ABORT */
 	if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
 		control = TRANS_SOFT_ABORT;
 
 		/* Send control message */
-		if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
-			perror("Error in sending TRANS_SOFT_ABORT control\n");
-			return 0;
-		}
-
+		send_data(acceptfd, &control, sizeof(char));
+	
 		/* Send number of oids not found and the missing oids if objects are missing in the machine */
 		if(*(objnotfound) != 0) { 
 			int msg[1];
 			msg[0] = *(objnotfound);
-			if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
-				perror("Error in sending objects that are not found\n");
-				return 0;
-			}
+			send_data(acceptfd, &msg, sizeof(int));
 			int size = sizeof(unsigned int)* *(objnotfound);
-			if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
-				perror("Error in sending objects that are not found\n");
-				return 0;
-			}
+			send_data(acceptfd, oidnotfound, size);
 		}
 	}
 
@@ -668,11 +605,7 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 
 	/* Send ack to coordinator */
 	control = TRANS_SUCESSFUL;
-	if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-		perror("Error sending ACK to coordinator\n");
-		return 1;
-	}
-	
+	send_data((int)acceptfd, &control, sizeof(char));
 	return 0;
 }
 
@@ -683,27 +616,22 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 
 int prefetchReq(int acceptfd) {
 	int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
-	int length, sd;
+	int length, sd = -1;
 	char *recvbuffer, *sendbuffer, control;
 	unsigned int oid, mid;
 	short *offsetarry;
 	objheader_t *header;
 	struct sockaddr_in remoteAddr;
 
-	while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
-		if(length == -1) { //-1 is special character to represent end of sending oids and offsets
-			break;
-		} else {
-			numbytes = 0;
+	do {
+		recv_data((int)acceptfd, &length, sizeof(int));
+		if(length != -1) {
 			size = length - sizeof(int);
 			if((recvbuffer = calloc(1, size)) == NULL) {
 				printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
 				return -1;
 			}
-			while(numbytes < size) {
-				numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
-			}
-			
+			recv_data((int)acceptfd, recvbuffer, size);
 			oid = *((unsigned int *) recvbuffer);
 			mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
 			size = size - (2 * sizeof(unsigned int));
@@ -715,23 +643,54 @@ int prefetchReq(int acceptfd) {
 			}
 			memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
 			free(recvbuffer);
-
-			/* Create socket to send information */
-			if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-				perror("prefetchReq():socket()");
-				return -1;
+#if 0
+			pthread_mutex_lock(&sockLock);
+			sockIdFound = 0;
+			pthread_mutex_unlock(&sockLock);
+			for(i = 0; i < NUM_MACHINES; i++) {
+				if(sockArray[i].mid == mid) {
+					sd = sockArray[i].sockid;
+					pthread_mutex_lock(&sockLock);
+					sockIdFound = 1;
+					pthread_mutex_unlock(&sockLock);
+					break;
+				}
 			}
-			bzero(&remoteAddr, sizeof(remoteAddr));
-			remoteAddr.sin_family = AF_INET;
-			remoteAddr.sin_port = htons(LISTEN_PORT);
-			remoteAddr.sin_addr.s_addr = htonl(mid);
-
-			if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-				printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
-						inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-				close(sd);
-				return -1;
+
+			if(sockIdFound == 0) {
+				if(sockCount < NUM_MACHINES) {
+
+#endif
+					/* Create socket to send information */
+					if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+						perror("prefetchReq():socket()");
+						return -1;
+					}
+					bzero(&remoteAddr, sizeof(remoteAddr));
+					remoteAddr.sin_family = AF_INET;
+					remoteAddr.sin_port = htons(LISTEN_PORT);
+					remoteAddr.sin_addr.s_addr = htonl(mid);
+
+					if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+						printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+								inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+						close(sd);
+						return -1;
+					}
+
+#if 0
+					sockArray[sockCount].mid = mid;
+					sockArray[sockCount].sockid = sd;
+					pthread_mutex_lock(&sockLock);
+					sockCount++;
+					pthread_mutex_unlock(&sockLock);
+				} else {
+					//TODO Fix for connecting to more than 2 machines && close socket
+					printf("%s(): Error: Currently works for only 2 machines\n", __func__);
+					return -1;
+				}
 			}
+#endif
 
 			/*Process each oid */
 			if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
@@ -781,6 +740,7 @@ int prefetchReq(int acceptfd) {
 					close(sd);
 					return -1;
 				}
+
 				/* Calculate the oid corresponding to the offset value */
 				for(i = 0 ; i< numoffset ; i++) {
 					/* Check for arrays  */
@@ -852,29 +812,19 @@ int prefetchReq(int acceptfd) {
 				}
 				free(offsetarry);
 			}
+			close(sd);
 		}
-	}
-	close(sd);
+	} while (length != -1);
 	return 0;
 }
 
 int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
 	int numbytes = 0;
 
-	if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
-		printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
-		free(sendbuffer);
-		return -1;
-	}
-
+	send_data(sd, control, sizeof(char));
 	/* Send the buffer with its size */
-	if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
-		printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
-				__func__, __FILE__, __LINE__, numbytes, *(size));
-		free(sendbuffer);
-		return -1;
-	}
-
+	int length = *(size);
+	send_data(sd, sendbuffer, length);
 	free(sendbuffer);
 	return 0;
 }
@@ -933,21 +883,8 @@ checkversion:
 						*((unsigned short *)(&msg[1]+size)) = newversion;
 						size += sizeof(unsigned short);
 						*((unsigned int *)(&msg[1]+size)) = threadid;
-						bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
-						if (bytesSent < 0){
-							perror("processReqNotify():send()");
-							close(sd);
-							return;
-						} else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
-							printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n", 
-									bytesSent, __FILE__, __LINE__);
-							close(sd);
-							return;
-						} else {
-							close(sd);
-							return;
-						}
-
+						size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+						send_data(sd, msg, size);
 					}
 					close(sd);
 				}
@@ -961,4 +898,3 @@ checkversion:
 	free(oidarry);
 	free(versionarry);
 }
-
diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c
index e9d2d90b..485e7121 100644
--- a/Robust/src/Runtime/DSTM/interface/trans.c
+++ b/Robust/src/Runtime/DSTM/interface/trans.c
@@ -25,7 +25,6 @@
 #define LISTEN_PORT 2156
 #define NUM_THREADS 1
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
-#define NUM_MACHINES 2
 #define CONFIG_FILENAME "dstm.conf"
 
 /* Global Variables */
@@ -49,14 +48,52 @@ unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
 
-/* Global variables to track mapping of socketid and remote mid */
+/************************************************************
+ * Global variables to map socketid and remote mid to
+ * reuse sockets
+ ***********************************************************/
 midSocketInfo_t midSocketArray[NUM_MACHINES];
-int sockCount = 0;
-int sockIdFound;
+int sockCount; 		//number of connections with all remote machines(one socket per mc)
+int sockIdFound;	//track if socket file descriptor is already established 
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
 
+/*******************************
+ * Send and Recv function calls 
+ *******************************/
+void send_data(int fd , void *buf, int buflen) {
+	char *buffer = (char *)(buf); 
+	int size = buflen;
+	int numbytes; 
+	while (size > 0) {
+		numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
+		if (numbytes == -1) {
+			perror("send");
+			printf("error: at %s, %d\n", __FILE__, __LINE__);
+			exit(-1);
+		}
+		buffer += numbytes;
+		size -= numbytes;
+	}
+}
+
+void recv_data(int fd , void *buf, int buflen) {
+	char *buffer = (char *)(buf); 
+	int size = buflen;
+	int numbytes; 
+	while (size > 0) {
+		numbytes = recv(fd, buffer, size, 0);
+		if (numbytes == -1) {
+			perror("recv");
+			printf("error: at %s, %d\n", __FILE__, __LINE__);
+			exit(-1);
+		}
+		buffer += numbytes;
+		size -= numbytes;
+	}
+}
+
 void printhex(unsigned char *ptr, int numBytes)
 {
 	int i;
@@ -208,6 +245,7 @@ void transInit() {
 	pthread_detach(tPrefetch);
 
 	//Initialize mid to socketid mapping array
+	sockCount = 0;
 	for(t = 0; t < NUM_MACHINES; t++) {
 		midSocketArray[t].mid = 0;
 		midSocketArray[t].sockid = 0;
@@ -685,31 +723,18 @@ void *transRequest(void *threadarg) {
 	}
 
 	/* Send bytes of data with TRANS_REQUEST control message */
-	if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
-		perror("Error sending fixed bytes for thread\n");
-		close(sd);
-		pthread_exit(NULL);
-	}
-
+	send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
+	
 	/* Send list of machines involved in the transaction */
 	{
 		int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
-		if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
-			perror("Error sending list of machines for thread\n");
-			close(sd);
-			pthread_exit(NULL);
-		}
+		send_data(sd, tdata->buffer->listmid, size);
 	}
 
 	/* Send oids and version number tuples for objects that are read */
 	{
 		int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
-		
-		if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
-			perror("Error sending tuples for thread\n");
-			close(sd);
-			pthread_exit(NULL);
-		}
+		send_data(sd, tdata->buffer->objread, size);
 	}
 
 	/* Send objects that are modified */
@@ -718,20 +743,11 @@ void *transRequest(void *threadarg) {
 		headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
 		GETSIZE(size,headeraddr);
 		size+=sizeof(objheader_t);
-		if (send(sd, headeraddr, size, MSG_NOSIGNAL)  < size) {
-			perror("Error sending obj modified for thread\n");
-			close(sd);
-			pthread_exit(NULL);
-		}
+		send_data(sd, headeraddr, size);
 	}
 
 	/* Read control message from Participant */
-	if((n = read(sd, &control, sizeof(char))) <= 0) {
-		perror("Error in reading control message from Participant\n");
-		close(sd);
-		pthread_exit(NULL);
-	}
-
+	recv_data(sd, &control, sizeof(char));
 	recvcontrol = control;
 
 	/* Update common data structure and increment count */
@@ -760,9 +776,7 @@ void *transRequest(void *threadarg) {
 		pthread_exit(NULL);
 	}
 
-	do {
-	   retval = recv((int)sd, &control, sizeof(char), 0);
-	} while (retval < sizeof(char));
+	recv_data((int)sd, &control, sizeof(char)); 
 
 	if(control == TRANS_UNSUCESSFUL) {
 		//printf("DEBUG-> TRANS_ABORTED\n");
@@ -830,32 +844,28 @@ void decideResponse(thread_data_array_t *tdata) {
  * It returns a char that is only needed to check the correctness of execution of this function inside
  * transRequest()*/
 char sendResponse(thread_data_array_t *tdata, int sd) {
-	int n, N, sum, oidcount = 0, control;
+	int n, size, sum, oidcount = 0, control;
 	char *ptr, retval = 0;
 	unsigned int *oidnotfound;
 
 	control = *(tdata->replyctrl);
-	if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-		perror("Error sending ctrl message for participant\n");
-		return 0;
-	}
+	send_data(sd, &control, sizeof(char));
 
-	//FIXME read missing objects 
+	//TODO read missing objects to be used during object migration
 	/* If the decided response is due to a soft abort and missing objects at the Participant's side */
 	/*
 	if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
 		// Read list of objects missing  
-		if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
-			N = oidcount * sizeof(unsigned int);
+		recv_data(sd, &oidcount, sizeof(int));
+		//if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+		if(oidcount != 0) {
+			size = oidcount * sizeof(unsigned int);
 			if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
 				printf("Calloc error %s, %d\n", __FILE__, __LINE__);
 				return 0;
 			}
 			ptr = (char *) oidnotfound;
-			do {
-				n = read(sd, ptr+sum, N-sum);
-				sum += n;
-			} while(sum < N && n !=0);
+			recv_data(sd, ptr, size);
 		}
 		retval =  TRANS_SOFT_ABORT;
 	}
@@ -905,31 +915,20 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
 	char readrequest[sizeof(char)+sizeof(unsigned int)];
 	readrequest[0] = READ_REQUEST;
 	*((unsigned int *)(&readrequest[1])) = oid;
-	if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
-		perror("getRemoteObj(): error sending message\n");
-		return NULL;
-	}
+	send_data(sd, readrequest, sizeof(readrequest));
 
 	/* Read response from the Participant */
-	if((val = read(sd, &control, sizeof(char))) <= 0) {
-		printf("getRemoteObj(): error no response, %d\n", val);
-		return NULL;
-	}
+	recv_data(sd, &control, sizeof(char));
 
 	switch(control) {
 		case OBJECT_NOT_FOUND:
 			return NULL;
 		case OBJECT_FOUND:
 			/* Read object if found into local cache */
-			if((val = read(sd, &size, sizeof(int))) <= 0) {
-				perror("getRemoteObj(): error in reading size\n");
-				return NULL;
-			}
+			recv_data(sd, &size, sizeof(int));
 			objcopy = objstrAlloc(record->cache, size);
-			int sum = 0;
-			while (sum < size) {
-				sum += read(sd, (char *)objcopy+sum, size-sum);
-			}
+			recv_data(sd, objcopy, size);
+			
 			/* Insert into cache's lookup table */
 			chashInsert(record->lookupTable, oid, objcopy); 
 			break;
@@ -1599,10 +1598,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
 
 	/* Send TRANS_PREFETCH control message */
 	control = TRANS_PREFETCH;
-	if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
-		perror("sendPrefetchReq() Sending TRANS_PREFETCH");
-		return;
-	}
+	send_data(sd, &control, sizeof(char));
 
 	/* Send Oids and offsets in pairs */
 	tmp = mcpilenode->objpiles;
@@ -1622,21 +1618,14 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
 			*((short*)(oidnoffset + off)) = tmp->offset[i];
 			off+=sizeof(short);
 		}
-		if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) {
-			perror("Sending oids and offsets");
-			return;
-		}
-		
+		send_data(sd, oidnoffset, len);
 		tmp = tmp->next;
 	}
 
 	/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
 	endpair = -1;
-	if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
-		perror("Error sending endpair\n");
-		return;
-	}
-
+	send_data(sd, &endpair, sizeof(int));
+	
 	return;
 }
 
@@ -1646,67 +1635,61 @@ int getPrefetchResponse(int sd) {
 	unsigned int oid;
 	void *modptr, *oldptr;
 
-	if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
-		printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
+	recv_data((int)sd, &length, sizeof(int)); 
+	size = length - sizeof(int);
+	if((recvbuffer = calloc(1, size)) == NULL) {
+		printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
 		return -1;
-	} else {
+	}
+	recv_data((int)sd, recvbuffer, size);
+
+	control = *((char *) recvbuffer);
+	if(control == OBJECT_FOUND) {
 		numbytes = 0;
-		size = length - sizeof(int);
-		if((recvbuffer = calloc(1, size)) == NULL) {
-			printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+		oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+		size = size - (sizeof(char) + sizeof(unsigned int));
+		pthread_mutex_lock(&prefetchcache_mutex);
+		if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+			printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+			pthread_mutex_unlock(&prefetchcache_mutex);
+			free(recvbuffer);
 			return -1;
 		}
-		while(numbytes < size) {
-			numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
-		}
-		
-		control = *((char *) recvbuffer);
-		if(control == OBJECT_FOUND) {
-			numbytes = 0;
-			oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-			size = size - (sizeof(char) + sizeof(unsigned int));
-			pthread_mutex_lock(&prefetchcache_mutex);
-			if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
-				printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
-				pthread_mutex_unlock(&prefetchcache_mutex);
-				free(recvbuffer);
-				return -1;
-			}
-			pthread_mutex_unlock(&prefetchcache_mutex);
-			memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
-			/* Insert the oid and its address into the prefetch hash lookup table */
-			/* Do a version comparison if the oid exists */
-			if((oldptr = prehashSearch(oid)) != NULL) {
-				/* If older version then update with new object ptr */
-				if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
-					prehashRemove(oid);
-					prehashInsert(oid, modptr);
-				} else {
-					/* TODO modptr should be reference counted */
-				}
-			} else {/* Else add the object ptr to hash table*/
+		pthread_mutex_unlock(&prefetchcache_mutex);
+		memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+
+		/* Insert the oid and its address into the prefetch hash lookup table */
+		/* Do a version comparison if the oid exists */
+		if((oldptr = prehashSearch(oid)) != NULL) {
+			/* If older version then update with new object ptr */
+			if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+				prehashRemove(oid);
 				prehashInsert(oid, modptr);
+			} else {
+				/* TODO modptr should be reference counted */
 			}
-			/* Lock the Prefetch Cache look up table*/
-			pthread_mutex_lock(&pflookup.lock);
-			/* Broadcast signal on prefetch cache condition variable */ 
-			pthread_cond_broadcast(&pflookup.cond);
-			/* Unlock the Prefetch Cache look up table*/
-			pthread_mutex_unlock(&pflookup.lock);
-		} else if(control == OBJECT_NOT_FOUND) {
-			oid = *((unsigned int *)(recvbuffer + sizeof(char)));
-			/* TODO: For each object not found query DHT for new location and retrieve the object */
-			/* Throw an error */
-			printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
-			free(recvbuffer);
-			exit(-1);
-		} else {
-			printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
+		} else {/* Else add the object ptr to hash table*/
+			prehashInsert(oid, modptr);
 		}
+		/* Lock the Prefetch Cache look up table*/
+		pthread_mutex_lock(&pflookup.lock);
+		/* Broadcast signal on prefetch cache condition variable */ 
+		pthread_cond_broadcast(&pflookup.cond);
+		/* Unlock the Prefetch Cache look up table*/
+		pthread_mutex_unlock(&pflookup.lock);
+	} else if(control == OBJECT_NOT_FOUND) {
+		oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+		/* TODO: For each object not found query DHT for new location and retrieve the object */
+		/* Throw an error */
+		printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
 		free(recvbuffer);
+		exit(-1);
+	} else {
+		printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
 	}
 
+	free(recvbuffer);
+
 	return 0;
 }
 
@@ -1762,22 +1745,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid)
 	{
 		msg[0] = START_REMOTE_THREAD;
 		memcpy(&msg[1], &oid, sizeof(unsigned int));
-
-		bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
-		if (bytesSent < 0)
-		{
-			perror("startRemoteThread():send()");
-			status = -1;
-		}
-		else if (bytesSent != 1 + sizeof(unsigned int))
-		{
-			printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
-			status = -1;
-		}
-		else
-		{
-			status = 0;
-		}
+		send_data(sock, msg, 1 + sizeof(unsigned int));
 	}
 
 	close(sock);
@@ -1989,17 +1957,8 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n
 		*((unsigned int *)(&msg[1] + size)) = threadid;
 
 		pthread_mutex_lock(&(ndata->threadnotify));
-		bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0);
-		if (bytesSent < 0){
-			perror("reqNotify():send()");
-			status = -1;
-		} else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){
-			printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__);
-			status = -1;
-		} else {
-			status = 0;
-		}
-		
+		size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+		send_data(sock, msg, size);
 		pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
 		pthread_mutex_unlock(&(ndata->threadnotify));
 	}
@@ -2080,17 +2039,8 @@ int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
 			size+= sizeof(unsigned short);
 			*((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
 
-			bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0);
-			if (bytesSent < 0){
-				perror("notifyAll():send()");
-				status = -1;
-			} else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
-				printf("notifyAll(): error, sent %d bytes %s, %d\n", 
-						bytesSent, __FILE__, __LINE__);
-				status = -1;
-			} else {
-				status = 0;
-			}
+			size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+			send_data(sock, msg, size);
 		}
 		//close socket
 		close(sock);