#define MSG_NOSIGNAL 0
#endif
+/***********************************************************
+ * Macros
+ **********************************************************/
#define GET_NTUPLES(x) ((int *)(x + sizeof(prefetchqelem_t)))
#define GET_PTR_OID(x) ((unsigned int *)(x + sizeof(prefetchqelem_t) + sizeof(int)))
#define GET_PTR_EOFF(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int))))
#define GET_PTR_ARRYFLD(x,n) ((short *)(x + sizeof(prefetchqelem_t) + sizeof(int) + (n*sizeof(unsigned int)) + (n*sizeof(short))))
-
-
-//Coordinator Messages
+/*****************************************
+ * Coordinator Messages
+ ***************************************/
#define READ_REQUEST 1
#define READ_MULT_REQUEST 2
#define MOVE_REQUEST 3
#define TRANS_PREFETCH 8
#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9
-//Participant Messages
+/*********************************
+ * Participant Messages
+ *******************************/
#define OBJECT_FOUND 10
#define OBJECT_NOT_FOUND 11
#define OBJECTS_FOUND 12
#define THREAD_NOTIFY_RESPONSE 25
#define TRANS_UNSUCESSFUL 26
-//Control bits for status of objects in Machine pile
-#define OBJ_LOCKED_BUT_VERSION_MATCH 14
-#define OBJ_UNLOCK_BUT_VERSION_MATCH 15
-#define VERSION_NO_MATCH 16
-
//Max number of objects
#define MAX_OBJECTS 20
+//Max remote-machine connections
+#define NUM_MACHINES 2
+#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
+//Transaction id per machine
+#define TID_LEN 20
#include <stdlib.h>
#include "threadnotify.h"
-#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
-#define TID_LEN 20
//bit designations for status field of objheader
#define DIRTY 0x01
#define NEW 0x02
struct ___Object___ * revertlist;
#endif
} transrecord_t;
+
// Structure is a shared structure that keeps track of responses from the participants
typedef struct thread_response {
char rcv_status;
//Structure to store mid and socketid information
typedef struct midSocketInfo {
- unsigned int mid;
+ unsigned int mid; /* To communicate with mid use sockid in this data structure*/
int sockid;
} midSocketInfo_t;
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
+void send_data(int fd, void *buf, int buflen);
+void recv_data(int fd, void *buf, int buflen);
/* Prototypes for object header */
unsigned int getNewOID(void);
void randomdelay();
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *, unsigned int);
-objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
+objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid header
int transCommit(transrecord_t *record); //return 0 if successful
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
void decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
-void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-void *handleLocalReq(void *);
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);// returns object header from main object store after object is copied into it from remote machine
+void *handleLocalReq(void *);//handles Local requests
int transComProcess(local_thread_data_array_t *);
int transAbortProcess(local_thread_data_array_t *);
void transAbort(transrecord_t *trans);
void *transPrefetch(void *);
void *mcqProcess(void *);
void checkPrefetchTuples(prefetchqelem_t *);
-prefetchpile_t *foundLocal(prefetchqelem_t *);
-prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);
+prefetchpile_t *foundLocal(prefetchqelem_t *);// returns node with prefetch elements(oids, offsets)
+prefetchpile_t *makePreGroups(prefetchqelem_t *, int *);// returns node with prefetch elements(oids, offsets)
void checkPreCache(prefetchqelem_t *, int *, unsigned int, int);
int transPrefetchProcess(transrecord_t *, int **, short);
void sendPrefetchReq(prefetchpile_t*, int);
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
+/**********************************************************
+ * Global variables to map socketid and remote mid
+ * to resuse sockets
+ **************************************************/
+midSocketInfo_t sockArray[NUM_MACHINES];
+int sockCount; //number of connections with all remote machines(one socket per mc)
+int sockIdFound; //track if socket file descriptor is already established
+pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
return 1; //failure
+ //Initialize mid to socketid mapping array
+ int t;
+ sockCount = 0;
+ for(t = 0; t < NUM_MACHINES; t++) {
+ sockArray[t].mid = 0;
+ sockArray[t].sockid = 0;
+ }
+
return 0;
}
* and accordingly calls other functions to process new requests */
void *dstmAccept(void *acceptfd)
{
- int val, retval, size, sum;
+ int val, retval, size, sum, sockid;
unsigned int oid;
char *buffer;
char control,ctrl;
transinfo.numnotfound = 0;
/* Receive control messages from other machines */
- if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
- printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
- pthread_exit(NULL);
- }
-
+ recv_data((int)acceptfd, &control, sizeof(char));
+
switch(control) {
case READ_REQUEST:
/* Read oid requested and search if available */
- if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
- perror("Error: receiving 0x0 object from cooridnator\n");
- pthread_exit(NULL);
- }
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
if((srcObj = mhashSearch(oid)) == NULL) {
printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
pthread_exit(NULL);
h = (objheader_t *) srcObj;
GETSIZE(size, h);
size += sizeof(objheader_t);
+ sockid = (int) acceptfd;
if (h == NULL) {
ctrl = OBJECT_NOT_FOUND;
- if(send((int)acceptfd, &ctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending control msg to coordinator\n");
- pthread_exit(NULL);
- }
+ send_data(sockid, &ctrl, sizeof(char));
} else {
/* Type */
char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
*((int *)&msg[1])=size;
- if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
- perror("Error sending size of object to coordinator\n");
- pthread_exit(NULL);
- }
- if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
- perror("Error in sending object\n");
- pthread_exit(NULL);
- }
+ send_data(sockid, &msg, sizeof(msg));
+ send_data(sockid, h, size);
}
break;
do {
if((val = prefetchReq((int)acceptfd)) != 0) {
printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
- }
-
- if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) < 0) {
- printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
- pthread_exit(NULL);
- } else if(retval == 0) {
- printf("%s() Error: socket closed at the requesting side\n");
- pthread_exit(NULL);
+ break;
}
-
+ recv_data((int)acceptfd, &control, sizeof(char));
} while (control == TRANS_PREFETCH);
-
break;
case TRANS_PREFETCH_RESPONSE:
- if((val = getPrefetchResponse((int) acceptfd)) != 0) {
- printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
- }
+ //do {
+ if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+ printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ //} while (control == TRANS_PREFETCH_RESPONSE);
break;
case START_REMOTE_THREAD:
- retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
- if (retval <= 0)
- perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
- else if (retval != sizeof(unsigned int))
- printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD %s, %d\n",
- retval, __FILE__, __LINE__);
- else
- {
- objType = getObjType(oid);
- startDSMthread(oid, objType);
- }
+ recv_data((int)acceptfd, &oid, sizeof(unsigned int));
+ objType = getObjType(oid);
+ startDSMthread(oid, objType);
break;
case THREAD_NOTIFY_REQUEST:
- retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
+ recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
pthread_exit(NULL);
}
- sum = 0;
- do {
- sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
- } while(sum < size);
+
+ recv_data((int)acceptfd, buffer, size);
oidarry = calloc(numoid, sizeof(unsigned int));
memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
pthread_exit(NULL);
}
- sum = 0;
- do {
- sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
- } while(sum < size);
+ recv_data((int)acceptfd, buffer, size);
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
unsigned int *oidmod, oid;
fixed_data_t fixed;
objheader_t *headaddr;
- int sum = 0, i, N, n, val;
+ int sum, i, size, n, val;
oidmod = NULL;
/* Read fixed_data_t data structure */
- N = sizeof(fixed) - 1;
+ size = sizeof(fixed) - 1;
ptr = (char *)&fixed;;
fixed.control = TRANS_REQUEST;
- do {
- n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
+ recv_data((int)acceptfd, ptr+1, size);
/* Read list of mids */
int mcount = fixed.mcount;
- N = mcount * sizeof(unsigned int);
+ size = mcount * sizeof(unsigned int);
unsigned int listmid[mcount];
ptr = (char *) listmid;
- sum = 0;
- do {
- n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
-
+ recv_data((int)acceptfd, ptr, size);
+
/* Read oid and version tuples for those objects that are not modified in the transaction */
int numread = fixed.numread;
- N = numread * (sizeof(unsigned int) + sizeof(unsigned short));
- char objread[N];
+ size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+ char objread[size];
if(numread != 0) { //If pile contains more than one object to be read,
// keep reading all objects
- sum = 0;
- do {
- n = recv((int)acceptfd, (void *) objread+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
+ recv_data((int)acceptfd, objread, size);
}
/* Read modified objects */
printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
- sum = 0;
- do { // Recv the objs that are modified by the Coordinator
- n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
- sum += n;
- } while (sum < fixed.sum_bytes && n != 0);
+ size = fixed.sum_bytes;
+ recv_data((int)acceptfd, modptr, size);
}
/* Create an array of oids for modified objects */
return 1;
}
- do {
- retval = recv((int)acceptfd, &control, sizeof(char), 0);
- } while(retval < sizeof(char));
-
+ recv_data((int)acceptfd, &control, sizeof(char));
+
/* Process the new control message */
switch(control) {
case TRANS_ABORT:
/* Send ack to Coordinator */
sendctrl = TRANS_UNSUCESSFUL;
- if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error: In sending ACK to coordinator\n");
- if (transinfo->objlocked != NULL) {
- free(transinfo->objlocked);
- }
- if (transinfo->objnotfound != NULL) {
- free(transinfo->objnotfound);
- }
-
- return 1;
- }
+ send_data((int)acceptfd, &sendctrl, sizeof(char));
break;
case TRANS_COMMIT:
}
free(oidlocked);
}
- if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending control to the Coordinator\n");
- return 0;
- }
+ send_data(acceptfd, &control, sizeof(char));
return control;
}
} else {/* If Obj is not locked then lock object */
}
/* Send TRANS_DISAGREE to Coordinator */
- if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending control to the Coordinator\n");
- return 0;
- }
-
+ send_data(acceptfd, &control, sizeof(char));
return control;
}
}
if(*(v_matchnolock) == fixed->numread + fixed->nummod) {
control = TRANS_AGREE;
/* Send control message */
- if((val = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending control to Coordinator\n");
- return 0;
- }
+ send_data(acceptfd, &control, sizeof(char));
}
/* Condition to send TRANS_SOFT_ABORT */
if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
control = TRANS_SOFT_ABORT;
/* Send control message */
- if((val = send(acceptfd, &control, sizeof(char),MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error in sending TRANS_SOFT_ABORT control\n");
- return 0;
- }
-
+ send_data(acceptfd, &control, sizeof(char));
+
/* Send number of oids not found and the missing oids if objects are missing in the machine */
if(*(objnotfound) != 0) {
int msg[1];
msg[0] = *(objnotfound);
- if((val = send(acceptfd, msg, sizeof(int) ,MSG_NOSIGNAL)) < sizeof(int)) {
- perror("Error in sending objects that are not found\n");
- return 0;
- }
+ send_data(acceptfd, &msg, sizeof(int));
int size = sizeof(unsigned int)* *(objnotfound);
- if((val = send(acceptfd, oidnotfound, size ,MSG_NOSIGNAL)) < size) {
- perror("Error in sending objects that are not found\n");
- return 0;
- }
+ send_data(acceptfd, oidnotfound, size);
}
}
/* Send ack to coordinator */
control = TRANS_SUCESSFUL;
- if(send((int)acceptfd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ACK to coordinator\n");
- return 1;
- }
-
+ send_data((int)acceptfd, &control, sizeof(char));
return 0;
}
int prefetchReq(int acceptfd) {
int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
- int length, sd;
+ int length, sd = -1;
char *recvbuffer, *sendbuffer, control;
unsigned int oid, mid;
short *offsetarry;
objheader_t *header;
struct sockaddr_in remoteAddr;
- while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
- if(length == -1) { //-1 is special character to represent end of sending oids and offsets
- break;
- } else {
- numbytes = 0;
+ do {
+ recv_data((int)acceptfd, &length, sizeof(int));
+ if(length != -1) {
size = length - sizeof(int);
if((recvbuffer = calloc(1, size)) == NULL) {
printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
return -1;
}
- while(numbytes < size) {
- numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
- }
-
+ recv_data((int)acceptfd, recvbuffer, size);
oid = *((unsigned int *) recvbuffer);
mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
size = size - (2 * sizeof(unsigned int));
}
memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
free(recvbuffer);
-
- /* Create socket to send information */
- if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
- perror("prefetchReq():socket()");
- return -1;
+#if 0
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 0;
+ pthread_mutex_unlock(&sockLock);
+ for(i = 0; i < NUM_MACHINES; i++) {
+ if(sockArray[i].mid == mid) {
+ sd = sockArray[i].sockid;
+ pthread_mutex_lock(&sockLock);
+ sockIdFound = 1;
+ pthread_mutex_unlock(&sockLock);
+ break;
+ }
}
- bzero(&remoteAddr, sizeof(remoteAddr));
- remoteAddr.sin_family = AF_INET;
- remoteAddr.sin_port = htons(LISTEN_PORT);
- remoteAddr.sin_addr.s_addr = htonl(mid);
-
- if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
- printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
- inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- close(sd);
- return -1;
+
+ if(sockIdFound == 0) {
+ if(sockCount < NUM_MACHINES) {
+
+#endif
+ /* Create socket to send information */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("prefetchReq():socket()");
+ return -1;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return -1;
+ }
+
+#if 0
+ sockArray[sockCount].mid = mid;
+ sockArray[sockCount].sockid = sd;
+ pthread_mutex_lock(&sockLock);
+ sockCount++;
+ pthread_mutex_unlock(&sockLock);
+ } else {
+ //TODO Fix for connecting to more than 2 machines && close socket
+ printf("%s(): Error: Currently works for only 2 machines\n", __func__);
+ return -1;
+ }
}
+#endif
/*Process each oid */
if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
close(sd);
return -1;
}
+
/* Calculate the oid corresponding to the offset value */
for(i = 0 ; i< numoffset ; i++) {
/* Check for arrays */
}
free(offsetarry);
}
+ close(sd);
}
- }
- close(sd);
+ } while (length != -1);
return 0;
}
int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
int numbytes = 0;
- if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
- free(sendbuffer);
- return -1;
- }
-
+ send_data(sd, control, sizeof(char));
/* Send the buffer with its size */
- if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
- printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
- __func__, __FILE__, __LINE__, numbytes, *(size));
- free(sendbuffer);
- return -1;
- }
-
+ int length = *(size);
+ send_data(sd, sendbuffer, length);
free(sendbuffer);
return 0;
}
*((unsigned short *)(&msg[1]+size)) = newversion;
size += sizeof(unsigned short);
*((unsigned int *)(&msg[1]+size)) = threadid;
- bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
- if (bytesSent < 0){
- perror("processReqNotify():send()");
- close(sd);
- return;
- } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
- printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n",
- bytesSent, __FILE__, __LINE__);
- close(sd);
- return;
- } else {
- close(sd);
- return;
- }
-
+ size = 1+ 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sd, msg, size);
}
close(sd);
}
free(oidarry);
free(versionarry);
}
-
#define LISTEN_PORT 2156
#define NUM_THREADS 1
#define PREFETCH_CACHE_SIZE 1048576 //1MB
-#define NUM_MACHINES 2
#define CONFIG_FILENAME "dstm.conf"
/* Global Variables */
unsigned int oidMin;
unsigned int oidMax;
-/* Global variables to track mapping of socketid and remote mid */
+/************************************************************
+ * Global variables to map socketid and remote mid to
+ * reuse sockets
+ ***********************************************************/
midSocketInfo_t midSocketArray[NUM_MACHINES];
-int sockCount = 0;
-int sockIdFound;
+int sockCount; //number of connections with all remote machines(one socket per mc)
+int sockIdFound; //track if socket file descriptor is already established
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
+/*******************************
+ * Send and Recv function calls
+ *******************************/
+void send_data(int fd , void *buf, int buflen) {
+ char *buffer = (char *)(buf);
+ int size = buflen;
+ int numbytes;
+ while (size > 0) {
+ numbytes = send(fd, buffer, size, MSG_NOSIGNAL);
+ if (numbytes == -1) {
+ perror("send");
+ printf("error: at %s, %d\n", __FILE__, __LINE__);
+ exit(-1);
+ }
+ buffer += numbytes;
+ size -= numbytes;
+ }
+}
+
+void recv_data(int fd , void *buf, int buflen) {
+ char *buffer = (char *)(buf);
+ int size = buflen;
+ int numbytes;
+ while (size > 0) {
+ numbytes = recv(fd, buffer, size, 0);
+ if (numbytes == -1) {
+ perror("recv");
+ printf("error: at %s, %d\n", __FILE__, __LINE__);
+ exit(-1);
+ }
+ buffer += numbytes;
+ size -= numbytes;
+ }
+}
+
void printhex(unsigned char *ptr, int numBytes)
{
int i;
pthread_detach(tPrefetch);
//Initialize mid to socketid mapping array
+ sockCount = 0;
for(t = 0; t < NUM_MACHINES; t++) {
midSocketArray[t].mid = 0;
midSocketArray[t].sockid = 0;
}
/* Send bytes of data with TRANS_REQUEST control message */
- if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
- perror("Error sending fixed bytes for thread\n");
- close(sd);
- pthread_exit(NULL);
- }
-
+ send_data(sd, &(tdata->buffer->f), sizeof(fixed_data_t));
+
/* Send list of machines involved in the transaction */
{
int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
- if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
- perror("Error sending list of machines for thread\n");
- close(sd);
- pthread_exit(NULL);
- }
+ send_data(sd, tdata->buffer->listmid, size);
}
/* Send oids and version number tuples for objects that are read */
{
int size=(sizeof(unsigned int)+sizeof(unsigned short))*tdata->buffer->f.numread;
-
- if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
- perror("Error sending tuples for thread\n");
- close(sd);
- pthread_exit(NULL);
- }
+ send_data(sd, tdata->buffer->objread, size);
}
/* Send objects that are modified */
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
- if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
- perror("Error sending obj modified for thread\n");
- close(sd);
- pthread_exit(NULL);
- }
+ send_data(sd, headeraddr, size);
}
/* Read control message from Participant */
- if((n = read(sd, &control, sizeof(char))) <= 0) {
- perror("Error in reading control message from Participant\n");
- close(sd);
- pthread_exit(NULL);
- }
-
+ recv_data(sd, &control, sizeof(char));
recvcontrol = control;
/* Update common data structure and increment count */
pthread_exit(NULL);
}
- do {
- retval = recv((int)sd, &control, sizeof(char), 0);
- } while (retval < sizeof(char));
+ recv_data((int)sd, &control, sizeof(char));
if(control == TRANS_UNSUCESSFUL) {
//printf("DEBUG-> TRANS_ABORTED\n");
* It returns a char that is only needed to check the correctness of execution of this function inside
* transRequest()*/
char sendResponse(thread_data_array_t *tdata, int sd) {
- int n, N, sum, oidcount = 0, control;
+ int n, size, sum, oidcount = 0, control;
char *ptr, retval = 0;
unsigned int *oidnotfound;
control = *(tdata->replyctrl);
- if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("Error sending ctrl message for participant\n");
- return 0;
- }
+ send_data(sd, &control, sizeof(char));
- //FIXME read missing objects
+ //TODO read missing objects to be used during object migration
/* If the decided response is due to a soft abort and missing objects at the Participant's side */
/*
if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
// Read list of objects missing
- if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
- N = oidcount * sizeof(unsigned int);
+ recv_data(sd, &oidcount, sizeof(int));
+ //if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
+ if(oidcount != 0) {
+ size = oidcount * sizeof(unsigned int);
if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 0;
}
ptr = (char *) oidnotfound;
- do {
- n = read(sd, ptr+sum, N-sum);
- sum += n;
- } while(sum < N && n !=0);
+ recv_data(sd, ptr, size);
}
retval = TRANS_SOFT_ABORT;
}
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
- if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
- perror("getRemoteObj(): error sending message\n");
- return NULL;
- }
+ send_data(sd, readrequest, sizeof(readrequest));
/* Read response from the Participant */
- if((val = read(sd, &control, sizeof(char))) <= 0) {
- printf("getRemoteObj(): error no response, %d\n", val);
- return NULL;
- }
+ recv_data(sd, &control, sizeof(char));
switch(control) {
case OBJECT_NOT_FOUND:
return NULL;
case OBJECT_FOUND:
/* Read object if found into local cache */
- if((val = read(sd, &size, sizeof(int))) <= 0) {
- perror("getRemoteObj(): error in reading size\n");
- return NULL;
- }
+ recv_data(sd, &size, sizeof(int));
objcopy = objstrAlloc(record->cache, size);
- int sum = 0;
- while (sum < size) {
- sum += read(sd, (char *)objcopy+sum, size-sum);
- }
+ recv_data(sd, objcopy, size);
+
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, oid, objcopy);
break;
/* Send TRANS_PREFETCH control message */
control = TRANS_PREFETCH;
- if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
- perror("sendPrefetchReq() Sending TRANS_PREFETCH");
- return;
- }
+ send_data(sd, &control, sizeof(char));
/* Send Oids and offsets in pairs */
tmp = mcpilenode->objpiles;
*((short*)(oidnoffset + off)) = tmp->offset[i];
off+=sizeof(short);
}
- if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) {
- perror("Sending oids and offsets");
- return;
- }
-
+ send_data(sd, oidnoffset, len);
tmp = tmp->next;
}
/* Send a special char -1 to represent the end of sending oids + offset pair to remote machine */
endpair = -1;
- if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) {
- perror("Error sending endpair\n");
- return;
- }
-
+ send_data(sd, &endpair, sizeof(int));
+
return;
}
unsigned int oid;
void *modptr, *oldptr;
- if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
- printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
+ recv_data((int)sd, &length, sizeof(int));
+ size = length - sizeof(int);
+ if((recvbuffer = calloc(1, size)) == NULL) {
+ printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
return -1;
- } else {
+ }
+ recv_data((int)sd, recvbuffer, size);
+
+ control = *((char *) recvbuffer);
+ if(control == OBJECT_FOUND) {
numbytes = 0;
- size = length - sizeof(int);
- if((recvbuffer = calloc(1, size)) == NULL) {
- printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ size = size - (sizeof(char) + sizeof(unsigned int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+ printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ free(recvbuffer);
return -1;
}
- while(numbytes < size) {
- numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
- }
-
- control = *((char *) recvbuffer);
- if(control == OBJECT_FOUND) {
- numbytes = 0;
- oid = *((unsigned int *)(recvbuffer + sizeof(char)));
- size = size - (sizeof(char) + sizeof(unsigned int));
- pthread_mutex_lock(&prefetchcache_mutex);
- if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
- printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- free(recvbuffer);
- return -1;
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
-
- /* Insert the oid and its address into the prefetch hash lookup table */
- /* Do a version comparison if the oid exists */
- if((oldptr = prehashSearch(oid)) != NULL) {
- /* If older version then update with new object ptr */
- if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
- prehashRemove(oid);
- prehashInsert(oid, modptr);
- } else {
- /* TODO modptr should be reference counted */
- }
- } else {/* Else add the object ptr to hash table*/
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+
+ /* Insert the oid and its address into the prefetch hash lookup table */
+ /* Do a version comparison if the oid exists */
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ /* If older version then update with new object ptr */
+ if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+ prehashRemove(oid);
prehashInsert(oid, modptr);
+ } else {
+ /* TODO modptr should be reference counted */
}
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
- /* Broadcast signal on prefetch cache condition variable */
- pthread_cond_broadcast(&pflookup.cond);
- /* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
- } else if(control == OBJECT_NOT_FOUND) {
- oid = *((unsigned int *)(recvbuffer + sizeof(char)));
- /* TODO: For each object not found query DHT for new location and retrieve the object */
- /* Throw an error */
- printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
- free(recvbuffer);
- exit(-1);
- } else {
- printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
+ } else {/* Else add the object ptr to hash table*/
+ prehashInsert(oid, modptr);
}
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
+ /* Broadcast signal on prefetch cache condition variable */
+ pthread_cond_broadcast(&pflookup.cond);
+ /* Unlock the Prefetch Cache look up table*/
+ pthread_mutex_unlock(&pflookup.lock);
+ } else if(control == OBJECT_NOT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ /* TODO: For each object not found query DHT for new location and retrieve the object */
+ /* Throw an error */
+ printf("OBJECT %x NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n", oid);
free(recvbuffer);
+ exit(-1);
+ } else {
+ printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
}
+ free(recvbuffer);
+
return 0;
}
{
msg[0] = START_REMOTE_THREAD;
memcpy(&msg[1], &oid, sizeof(unsigned int));
-
- bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
- if (bytesSent < 0)
- {
- perror("startRemoteThread():send()");
- status = -1;
- }
- else if (bytesSent != 1 + sizeof(unsigned int))
- {
- printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
- status = -1;
- }
- else
- {
- status = 0;
- }
+ send_data(sock, msg, 1 + sizeof(unsigned int));
}
close(sock);
*((unsigned int *)(&msg[1] + size)) = threadid;
pthread_mutex_lock(&(ndata->threadnotify));
- bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0);
- if (bytesSent < 0){
- perror("reqNotify():send()");
- status = -1;
- } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){
- printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__);
- status = -1;
- } else {
- status = 0;
- }
-
+ size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
+ send_data(sock, msg, size);
pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify));
pthread_mutex_unlock(&(ndata->threadnotify));
}
size+= sizeof(unsigned short);
*((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
- bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0);
- if (bytesSent < 0){
- perror("notifyAll():send()");
- status = -1;
- } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
- printf("notifyAll(): error, sent %d bytes %s, %d\n",
- bytesSent, __FILE__, __LINE__);
- status = -1;
- } else {
- status = 0;
- }
+ size = 1 + 2*sizeof(unsigned int) + sizeof(unsigned short);
+ send_data(sock, msg, size);
}
//close socket
close(sock);