#define LISTEN_PORT 2156
#define BACKLOG 10 //max pending connections
#define RECEIVE_BUFFER_SIZE 2048
-#define PRE_BUF_SIZE 2048
extern int classsize[];
* and accordingly calls other functions to process new requests */
void *dstmAccept(void *acceptfd)
{
- int val, retval, size;
+ int val, retval, size, sum;
unsigned int oid;
- char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
+ char *buffer;
+ char control,ctrl;
char *ptr;
void *srcObj;
objheader_t *h;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
- int i;
-
transinfo.objlocked = NULL;
transinfo.objnotfound = NULL;
transinfo.modptr = NULL;
/* Receive control messages from other machines */
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
- perror("Error: in receiving control from coordinator\n");
+ printf("%s() Error: Receiving control = %d at %s, %d\n", __func__, control, __FILE__, __LINE__);
pthread_exit(NULL);
}
pthread_exit(NULL);
}
break;
+ case TRANS_PREFETCH_RESPONSE:
+ if((val = getPrefetchResponse((int) acceptfd)) != 0) {
+ printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
+ }
+ break;
case START_REMOTE_THREAD:
retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
if (retval <= 0)
break;
case THREAD_NOTIFY_REQUEST:
- size = sizeof(unsigned int);
- bzero(&buffer, RECEIVE_BUFFER_SIZE);
- retval = recv((int)acceptfd, &buffer, size, 0);
- numoid = *((unsigned int *) &buffer);
+ retval = recv((int)acceptfd, &numoid, sizeof(unsigned int), 0);
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
- bzero(&buffer, RECEIVE_BUFFER_SIZE);
- retval = recv((int)acceptfd, &buffer, size, 0);
- if(retval <=0)
- perror("dstmAccept(): error receiving THREAD_NOTIFY_REQUEST");
- else if( retval != (2* sizeof(unsigned int) + (sizeof(unsigned int) + sizeof(unsigned short)) * numoid))
- printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_REQUEST %s, %d\n", retval,
- __FILE__, __LINE__);
- else {
- oidarry = calloc(numoid, sizeof(unsigned int));
- memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
- size = sizeof(unsigned int) * numoid;
- versionarry = calloc(numoid, sizeof(unsigned short));
- memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
- size += sizeof(unsigned short) * numoid;
- mid = *((unsigned int *)(buffer+size));
- size += sizeof(unsigned int);
- threadid = *((unsigned int *)(buffer+size));
- processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+ if((buffer = calloc(1,size)) == NULL) {
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
+ sum = 0;
+ do {
+ sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
+ } while(sum < size);
+
+ oidarry = calloc(numoid, sizeof(unsigned int));
+ memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
+ size = sizeof(unsigned int) * numoid;
+ versionarry = calloc(numoid, sizeof(unsigned short));
+ memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid);
+ size += sizeof(unsigned short) * numoid;
+ mid = *((unsigned int *)(buffer+size));
+ size += sizeof(unsigned int);
+ threadid = *((unsigned int *)(buffer+size));
+ processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+ free(buffer);
break;
case THREAD_NOTIFY_RESPONSE:
size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
- bzero(&buffer, RECEIVE_BUFFER_SIZE);
- retval = recv((int)acceptfd, &buffer, size, 0);
- if(retval <= 0)
- perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE");
- else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
- printf("dstmAccept(): incorrect msg size %d for THREAD_NOTIFY_RESPONSE msg %s, %d\n",
- retval, __FILE__, __LINE__);
- else {
- oid = *((unsigned int *)buffer);
- size = sizeof(unsigned int);
- version = *((unsigned short *)(buffer+size));
- size += sizeof(unsigned short);
- threadid = *((unsigned int *)(buffer+size));
- threadNotify(oid,version,threadid);
+ if((buffer = calloc(1,size)) == NULL) {
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
- break;
+ sum = 0;
+ do {
+ sum += recv((int)acceptfd, buffer+sum, size-sum, 0);
+ } while(sum < size);
+
+ oid = *((unsigned int *)buffer);
+ size = sizeof(unsigned int);
+ version = *((unsigned short *)(buffer+size));
+ size += sizeof(unsigned short);
+ threadid = *((unsigned int *)(buffer+size));
+ threadNotify(oid,version,threadid);
+ free(buffer);
+ break;
default:
printf("Error: dstmAccept() Unknown opcode %d at %s, %d\n", control, __FILE__, __LINE__);
}
* then use offset values to prefetch references to other objects */
int prefetchReq(int acceptfd) {
- int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
- int isArray = 0;
- unsigned int oid, index = 0;
- char *ptr, buffer[PRE_BUF_SIZE];
- void *mobj;
- unsigned int objoid;
- char control;
- objheader_t * header;
- int bytesRecvd;
+ int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
+ int length, sd;
+ char *recvbuffer, *sendbuffer, control;
+ unsigned int oid, mid;
+ unsigned short *offsetarry;
+ objheader_t *header;
+ struct sockaddr_in remoteAddr;
- /* Repeatedly recv one oid and offset pair sent for prefetch */
- while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) {
- count++;
- if(length == -1)
+ while((numbytes = recv((int)acceptfd, &length, sizeof(int), 0)) != 0) {
+ if(length == -1) { //-1 is special character to represent end of sending oids and offsets
break;
- index = 0;
- bytesRecvd = 0;
- do {
- bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd,
- sizeof(unsigned int) - bytesRecvd, 0);
- } while (bytesRecvd < sizeof(unsigned int));
- numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short);
- N = numoffset * sizeof(short);
- short offset[numoffset];
- ptr = (char *)&offset;
- sum = 0;
- /* Recv the offset values per oid */
- do {
- n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0);
- sum += n;
- } while(sum < N && n != 0);
+ } else {
+ numbytes = 0;
+ size = length - sizeof(int);
+ if((recvbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ while(numbytes < size) {
+ numbytes += recv((int)acceptfd, recvbuffer+numbytes, size-numbytes, 0);
+ }
- bzero(&buffer, PRE_BUF_SIZE);
- /* Process each oid */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found in buffer for later use */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- *((unsigned int *)(buffer+index)) = oid;
- index += sizeof(unsigned int);
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* send the oid, it's size, it's header and data */
- header = (objheader_t *)mobj;
- GETSIZE(size, header);
- size += sizeof(objheader_t);
- *(buffer + index) = OBJECT_FOUND;
- index += sizeof(char);
- *((unsigned int *)(buffer+index)) = oid;
- index += sizeof(unsigned int);
- *((int *)(buffer+index)) = size;
- index += sizeof(int);
- memcpy(buffer + index, header, size);
- index += size;
- /* Calculate the oid corresponding to the offset value */
- for(i = 0 ; i< numoffset ; i++) {
- /* Check for arrays */
- if(TYPE(header) > NUMCLASSES) {
- isArray = 1;
+ oid = *((unsigned int *) recvbuffer);
+ mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
+ size = size - (2 * sizeof(unsigned int));
+ numoffset = size / sizeof(short);
+ if((offsetarry = calloc(numoffset, sizeof(unsigned short))) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(recvbuffer);
+ return -1;
+ }
+ memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
+ free(recvbuffer);
+
+ /* Create socket to send information */
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("prefetchReq():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ close(sd);
+ return -1;
+ }
+
+ /*Process each oid */
+ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found in buffer for later use */
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
}
- if(isArray == 1) {
- int elementsize = classsize[TYPE(header)];
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i])));
- } else {
- objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i]));
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ } else { /* Object Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
}
- if((header = mhashSearch(objoid)) == NULL) {
- /* Obj not found, send oid */
- *(buffer + index) = OBJECT_NOT_FOUND;
- index += sizeof(char);
- *((unsigned int *)(buffer+index)) = objoid;
- index += sizeof(unsigned int);
- break;
- } else {/* Obj Found */
- /* send the oid, it's size, it's header and data */
- GETSIZE(size, header);
- size+=sizeof(objheader_t);
- *(buffer+index) = OBJECT_FOUND;
- index += sizeof(char);
- *((unsigned int *)(buffer+index)) = objoid;
- index += sizeof(unsigned int);
- *((int *)(buffer+index)) = size;
- index += sizeof(int);
- memcpy(buffer+index, header, size);
- index += size;
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ /* Calculate the oid corresponding to the offset value */
+ for(i = 0 ; i< numoffset ; i++) {
+ /* Check for arrays */
+ if(TYPE(header) > NUMCLASSES) {
+ isArray = 1;
+ }
+ if(isArray == 1) {
+ int elementsize = classsize[TYPE(header)];
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offsetarry[i])));
+ } else {
+ oid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offsetarry[i]));
+ }
+
+ if((header = mhashSearch(oid)) == NULL) {
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) sendbuffer) = size;
+ *((char *)(sendbuffer + sizeof(int))) = OBJECT_NOT_FOUND;
+ *((unsigned int *)(sendbuffer + sizeof(int) + sizeof(char))) = oid;
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ break;
+ } else {/* Obj Found */
+ int incr = 0;
+ GETSIZE(objsize, header);
+ size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
+ if((sendbuffer = calloc(1, size)) == NULL) {
+ printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ free(offsetarry);
+ close(sd);
+ return -1;
+ }
+ *((int *) (sendbuffer + incr)) = size;
+ incr += sizeof(int);
+ *((char *)(sendbuffer + incr)) = OBJECT_FOUND;
+ incr += sizeof(char);
+ *((unsigned int *)(sendbuffer+incr)) = oid;
+ incr += sizeof(unsigned int);
+ memcpy(sendbuffer + incr, header, objsize + sizeof(objheader_t));
+
+ control = TRANS_PREFETCH_RESPONSE;
+ if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
+ free(offsetarry);
+ printf("Error: %s() in sending prefetch response at %s, %d\n",
+ __func__, __FILE__, __LINE__);
+ close(sd);
+ return -1;
+ }
+ }
isArray = 0;
- continue;
}
+ free(offsetarry);
}
}
+ }
+ close(sd);
+ return 0;
+}
- /* Check for overflow in the buffer */
- if (index >= PRE_BUF_SIZE) {
- printf("Error: Buffer array overflow %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- /* Send Prefetch response control message only once*/
- if(count == 1){
- control = TRANS_PREFETCH_RESPONSE;
- if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
- perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
- }
+int sendPrefetchResponse(int sd, char *control, char *sendbuffer, int *size) {
+ int numbytes = 0;
- //Send buffer size
- if((numbytes = send(acceptfd, &index, sizeof(unsigned int), MSG_NOSIGNAL)) < sizeof(unsigned int)) {
- perror("Error: in sending PREFETCH RESPONSE to Coordinator\n");
- return 1;
- }
+ if((numbytes = send(sd, control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) {
+ printf("%s() Error: in sending PREFETCH RESPONSE to Coordinator at %s, %d\n", __func__, __FILE__, __LINE__);
+ free(sendbuffer);
+ return -1;
+ }
- /* Send the entire buffer with its size and oids found and not found */
- if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index)) {
- perror("Error: sending oids found\n");
- return 1;
- }
+ /* Send the buffer with its size */
+ if((numbytes = send(sd, sendbuffer, *(size), MSG_NOSIGNAL)) < *(size)) {
+ printf("%s() Error: in sending oid found at %s, %d size sent = %d, actual size = %d\n",
+ __func__, __FILE__, __LINE__, numbytes, *(size));
+ free(sendbuffer);
+ return -1;
}
+
+ free(sendbuffer);
return 0;
}
int sd;
struct sockaddr_in remoteAddr;
int bytesSent;
- int status, size;
+ int size;
int i = 0;
while(i < numoid) {
if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("Error: processReqNotify():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
+ close(sd);
+ return;
} else {
//Send Update notification
msg[0] = THREAD_NOTIFY_RESPONSE;
bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
if (bytesSent < 0){
perror("processReqNotify():send()");
- status = -1;
+ close(sd);
+ return;
} else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
printf("Error: processReqNotify(): error, sent %d bytes %s, %d\n",
bytesSent, __FILE__, __LINE__);
- status = -1;
+ close(sd);
+ return;
} else {
- status = 0;
+ close(sd);
+ return;
}
}
#endif
#define LISTEN_PORT 2156
-#define RECEIVE_BUFFER_SIZE 2048
#define NUM_THREADS 10
#define PREFETCH_CACHE_SIZE 1048576 //1MB
#define CONFIG_FILENAME "dstm.conf"
int qnodesize;
int len = 0;
int i, rc;
-
+
/* Allocate for the queue node*/
char *node;
if(ntuples > 0) {
struct timeval tp;
if(oid == 0) {
+ printf("Error: %s, %d oid is NULL \n", __FILE__, __LINE__);
return NULL;
}
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+
#ifdef COMPILER
return &objheader[1];
#else
#endif
} else {
/*If object not found in prefetch cache then block until object appears in the prefetch cache */
+ /*
pthread_mutex_lock(&pflookup.lock);
while(!found) {
rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
- /* Check Prefetch cache again */
+ // Check Prefetch cache again
if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
found = 1;
GETSIZE(size,tmp);
break;
}
}
+ */
/* Get the object from the remote location */
if((machinenumber = lhashSearch(oid)) == 0) {
printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
+
#ifdef COMPILER
return &objcopy[1];
#else
plistnode_t *pile, *pile_ptr;
int i, j, rc, val;
int pilecount, offset, threadnum = 0, trecvcount = 0;
- char buffer[RECEIVE_BUFFER_SIZE],control;
+ char control;
char transid[TID_LEN];
trans_req_data_t *tosend;
trans_commit_data_t transinfo;
static int newtid = 0;
char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */
char localstat = 0;
+ thread_data_array_t *thread_data_array;
+ local_thread_data_array_t *ltdata;
+ do {
+ trecvcount = 0;
+ threadnum = 0;
+ treplyretry = 0;
+ thread_data_array = NULL;
+ ltdata = NULL;
+ /* Look through all the objects in the transaction record and make piles
+ * for each machine involved in the transaction*/
+ pile_ptr = pile = createPiles(record);
- /* Look through all the objects in the transaction record and make piles
- * for each machine involved in the transaction*/
- pile_ptr = pile = createPiles(record);
-
- /* Create the packet to be sent in TRANS_REQUEST */
-
- /* Count the number of participants */
- pilecount = pCount(pile);
-
- /* Create a list of machine ids(Participants) involved in transaction */
- if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- pListMid(pile, listmid);
-
+ /* Create the packet to be sent in TRANS_REQUEST */
- /* Initialize thread variables,
- * Spawn a thread for each Participant involved in a transaction */
- pthread_t thread[pilecount];
- pthread_attr_t attr;
- pthread_cond_t tcond;
- pthread_mutex_t tlock;
- pthread_mutex_t tlshrd;
+ /* Count the number of participants */
+ pilecount = pCount(pile);
- thread_data_array_t *thread_data_array;
- if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- return 1;
- }
+ /* Create a list of machine ids(Participants) involved in transaction */
+ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ pListMid(pile, listmid);
- local_thread_data_array_t *ltdata;
- if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
- printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- free(thread_data_array);
- return 1;
- }
- thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
+ /* Initialize thread variables,
+ * Spawn a thread for each Participant involved in a transaction */
+ pthread_t thread[pilecount];
+ pthread_attr_t attr;
+ pthread_cond_t tcond;
+ pthread_mutex_t tlock;
+ pthread_mutex_t tlshrd;
- /* Initialize and set thread detach attribute */
- pthread_attr_init(&attr);
- pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
- pthread_mutex_init(&tlock, NULL);
- pthread_cond_init(&tcond, NULL);
+ if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ return 1;
+ }
- /* Process each machine pile */
- while(pile != NULL) {
- //Create transaction id
- newtid++;
- if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+ if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
free(thread_data_array);
- free(ltdata);
return 1;
}
- tosend->f.control = TRANS_REQUEST;
- sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
- tosend->f.mcount = pilecount;
- tosend->f.numread = pile->numread;
- tosend->f.nummod = pile->nummod;
- tosend->f.numcreated = pile->numcreated;
- tosend->f.sum_bytes = pile->sum_bytes;
- tosend->listmid = listmid;
- tosend->objread = pile->objread;
- tosend->oidmod = pile->oidmod;
- tosend->oidcreated = pile->oidcreated;
- thread_data_array[threadnum].thread_id = threadnum;
- thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].buffer = tosend;
- thread_data_array[threadnum].recvmsg = rcvd_control_msg;
- thread_data_array[threadnum].threshold = &tcond;
- thread_data_array[threadnum].lock = &tlock;
- thread_data_array[threadnum].count = &trecvcount;
- thread_data_array[threadnum].replyctrl = &treplyctrl;
- thread_data_array[threadnum].replyretry = &treplyretry;
- thread_data_array[threadnum].rec = record;
- /* If local do not create any extra connection */
- if(pile->mid != myIpAddr) { /* Not local */
- do {
- rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
- } while(rc!=0);
- if(rc) {
- perror("Error in pthread create\n");
+
+ thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */
+
+ /* Initialize and set thread detach attribute */
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ pthread_mutex_init(&tlock, NULL);
+ pthread_cond_init(&tcond, NULL);
+
+ /* Process each machine pile */
+ while(pile != NULL) {
+ //Create transaction id
+ newtid++;
+ if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
return 1;
}
- } else { /*Local*/
- ltdata->tdata = &thread_data_array[threadnum];
- ltdata->transinfo = &transinfo;
- do {
- val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
- } while(val!=0);
- if(val) {
- perror("Error in pthread create\n");
+ tosend->f.control = TRANS_REQUEST;
+ sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid);
+ tosend->f.mcount = pilecount;
+ tosend->f.numread = pile->numread;
+ tosend->f.nummod = pile->nummod;
+ tosend->f.numcreated = pile->numcreated;
+ tosend->f.sum_bytes = pile->sum_bytes;
+ tosend->listmid = listmid;
+ tosend->objread = pile->objread;
+ tosend->oidmod = pile->oidmod;
+ tosend->oidcreated = pile->oidcreated;
+ thread_data_array[threadnum].thread_id = threadnum;
+ thread_data_array[threadnum].mid = pile->mid;
+ thread_data_array[threadnum].buffer = tosend;
+ thread_data_array[threadnum].recvmsg = rcvd_control_msg;
+ thread_data_array[threadnum].threshold = &tcond;
+ thread_data_array[threadnum].lock = &tlock;
+ thread_data_array[threadnum].count = &trecvcount;
+ thread_data_array[threadnum].replyctrl = &treplyctrl;
+ thread_data_array[threadnum].replyretry = &treplyretry;
+ thread_data_array[threadnum].rec = record;
+ /* If local do not create any extra connection */
+ if(pile->mid != myIpAddr) { /* Not local */
+ do {
+ rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]);
+ } while(rc!=0);
+ if(rc) {
+ perror("Error in pthread create\n");
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
+ free(thread_data_array);
+ free(ltdata);
+ return 1;
+ }
+ } else { /*Local*/
+ ltdata->tdata = &thread_data_array[threadnum];
+ ltdata->transinfo = &transinfo;
+ do {
+ val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata);
+ } while(val!=0);
+ if(val) {
+ perror("Error in pthread create\n");
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ pDelete(pile_ptr);
+ free(listmid);
+ for (i = 0; i < threadnum; i++)
+ free(thread_data_array[i].buffer);
+ free(thread_data_array);
+ free(ltdata);
+ return 1;
+ }
+ }
+
+ threadnum++;
+ pile = pile->next;
+ }
+ /* Free attribute and wait for the other threads */
+ pthread_attr_destroy(&attr);
+
+ for (i = 0; i < threadnum; i++) {
+ rc = pthread_join(thread[i], NULL);
+ if(rc)
+ {
+ printf("Error: return code from pthread_join() is %d\n", rc);
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
- for (i = 0; i < threadnum; i++)
- free(thread_data_array[i].buffer);
- free(thread_data_array);
- free(ltdata);
+ for (j = i; j < threadnum; j++) {
+ free(thread_data_array[j].buffer);
+ }
return 1;
}
+ free(thread_data_array[i].buffer);
}
- threadnum++;
- pile = pile->next;
- }
-
- /* Free attribute and wait for the other threads */
- pthread_attr_destroy(&attr);
+ /* Free resources */
+ pthread_cond_destroy(&tcond);
+ pthread_mutex_destroy(&tlock);
+ free(listmid);
+ pDelete(pile_ptr);
- for (i = 0; i < threadnum; i++) {
- rc = pthread_join(thread[i], NULL);
- if(rc)
- {
- printf("Error: return code from pthread_join() is %d\n", rc);
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- pDelete(pile_ptr);
- free(listmid);
- for (j = i; j < threadnum; j++) {
- free(thread_data_array[j].buffer);
- }
- return 1;
+ /* wait a random amount of time before retrying to commit transaction*/
+ if(treplyretry == 1) {
+ free(thread_data_array);
+ free(ltdata);
+ randomdelay();
}
- free(thread_data_array[i].buffer);
- }
- /* Free resources */
- pthread_cond_destroy(&tcond);
- pthread_mutex_destroy(&tlock);
- free(listmid);
- pDelete(pile_ptr);
-
+ /* Retry trans commit procedure during soft_abort case */
+ } while (treplyretry == 1);
+
if(treplyctrl == TRANS_ABORT) {
/* Free Resources */
return 0;
} else {
//TODO Add other cases
- printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n");
+ printf("Error: in %s() THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n", __func__);
exit(-1);
}
struct sockaddr_in serv_addr;
thread_data_array_t *tdata;
objheader_t *headeraddr;
- char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+ char control, recvcontrol;
char machineip[16], retval;
tdata = (thread_data_array_t *) threadarg;
return NULL;
}
objcopy = objstrAlloc(record->cache, size);
+ int sum = 0;
+ while (sum < size) {
+ sum += read(sd, (char *)objcopy+sum, size-sum);
+ }
+ /*
if((val = read(sd, (char *)objcopy, size)) <= 0) {
perror("No objects are read from the remote participant\n");
return NULL;
}
+ */
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, oid, objcopy);
break;
prefetchpile_t *foundLocal(prefetchqelem_t *node) {
int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val;
unsigned int *oid;
- unsigned int objoid;
- int isArray = 0;
+ int isArray;
char *ptr, *tmp;
objheader_t *objheader;
short *endoffsets, *arryfields;
for(i = 1; i<ntuples; i++) {
numoffset[i] = endoffsets[i] - endoffsets[i-1];
}
+
for(i = 0; i < ntuples; i++) {
if(oid[i] == 0){
if(i == 0) {
}
continue;
}
+
/* If object found locally */
if((objheader = (objheader_t*) mhashSearch(oid[i])) != NULL) {
+ isArray = 0;
tmp = (char *) objheader;
int orgnumoffset = numoffset[i];
if(i == 0) {
}
for(j = 0; j<orgnumoffset; j++) {
+ unsigned int objoid = 0;
/* Check for arrays */
if(TYPE(objheader) > NUMCLASSES) {
isArray = 1;
flag = 1;
checkPreCache(node, numoffset, oid[i], i);
break;
- }
+ }
tmp = (char *) objheader;
isArray = 0;
}
while(tmp != NULL) {
off = 0;
count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
- len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short));
+ len = sizeof(int) + sizeof(unsigned int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short));
char oidnoffset[len];
bzero(oidnoffset, len);
*((unsigned int*)oidnoffset) = len;
off = sizeof(int);
*((unsigned int *)((char *)oidnoffset + off)) = tmp->oid;
off += sizeof(unsigned int);
+ *((unsigned int *)((char *)oidnoffset + off)) = myIpAddr; //Recently added as of 03/03/2008 at 6:00pm
+ off += sizeof(unsigned int);
for(i = 0; i < tmp->numoffset; i++) {
*((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i];
off+=sizeof(unsigned short);
return;
}
- /* Get Response from the remote machine */
- getPrefetchResponse(count,sd);
close(sd);
return;
}
-void getPrefetchResponse(int count, int sd) {
- int i = 0, val, n, N, sum, index, objsize;
- unsigned int bufsize,oid;
- char *buffer;
- char control;
- char *ptr;
+int getPrefetchResponse(int sd) {
+ int numbytes = 0, length = 0, size = 0;
+ char *recvbuffer, control;
+ unsigned int oid;
void *modptr, *oldptr;
- /* Read prefetch response from the Remote machine */
- if((val = read(sd, &control, sizeof(char))) <= 0) {
- perror("No control response for Prefetch request sent\n");
- return;
- }
+ if((numbytes = recv((int)sd, &length, sizeof(int), 0)) <= 0) {
+ printf("%s() Error: in receiving length at %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ } else {
+ numbytes = 0;
+ size = length - sizeof(int);
+ if((recvbuffer = calloc(1, size)) == NULL) {
+ printf("%s() Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ while(numbytes < size) {
+ numbytes += recv((int)sd, recvbuffer+numbytes, size-numbytes, 0);
+ }
- if(control == TRANS_PREFETCH_RESPONSE) {
- /*For each oid and offset tuple sent as prefetch request to remote machine*/
- while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) {
- if((buffer = calloc(1, bufsize)) == NULL) {
- printf("Calloc Error in %s() at %s, %d\n", __func__, __FILE__, __LINE__);
- return;
+ control = *((char *) recvbuffer);
+ if(control == OBJECT_FOUND) {
+ numbytes = 0;
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ size = size - (sizeof(char) + sizeof(unsigned int));
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((modptr = objstrAlloc(prefetchcache, size)) == NULL) {
+ printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ free(recvbuffer);
+ return -1;
}
- sum = 0;
- index = 0;
- ptr = buffer;
- /* Keep receiving the buffer containing oid info */
- do {
- n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0);
- sum +=n;
- } while(sum < bufsize && n != 0);
-
- /* Decode the contents of the buffer */
- while(index < bufsize ) {
- if(buffer[index] == OBJECT_FOUND) {
- /* Increment it to get the object */
- index += sizeof(char);
- oid = *((unsigned int *)(buffer+index));
- index += sizeof(unsigned int);
- /* For each object found add to Prefetch Cache */
- objsize = *((int *)(buffer+index));
- index += sizeof(int);
- pthread_mutex_lock(&prefetchcache_mutex);
- if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) {
- printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&prefetchcache_mutex);
- free(buffer);
- return;
- }
- pthread_mutex_unlock(&prefetchcache_mutex);
- memcpy(modptr, buffer+index, objsize);
- index += objsize;
- /* Insert the oid and its address into the prefetch hash lookup table */
- /* Do a version comparison if the oid exists */
- if((oldptr = prehashSearch(oid)) != NULL) {
- /* If older version then update with new object ptr */
- if(((objheader_t *)oldptr)->version < ((objheader_t *)modptr)->version) {
- prehashRemove(oid);
- prehashInsert(oid, modptr);
- } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) {
- /* Add the new object ptr to hash table */
- prehashRemove(oid);
- prehashInsert(oid, modptr);
- } else { /* Do nothing: TODO modptr should be reference counted */
- ;
- }
- } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
- prehashInsert(oid, modptr);
- }
- /* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
- /* Broadcast signal on prefetch cache condition variable */
- pthread_cond_broadcast(&pflookup.cond);
- /* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
- } else if(buffer[index] == OBJECT_NOT_FOUND) {
- /* Increment it to get the object */
- /* TODO: For each object not found query DHT for new location and retrieve the object */
- index += sizeof(char);
- oid = *((unsigned int *)(buffer + index));
- index += sizeof(unsigned int);
- /* Throw an error */
- printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
- exit(-1);
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(modptr, recvbuffer + sizeof(char) + sizeof(unsigned int), size);
+
+ /* Insert the oid and its address into the prefetch hash lookup table */
+ /* Do a version comparison if the oid exists */
+ if((oldptr = prehashSearch(oid)) != NULL) {
+ /* If older version then update with new object ptr */
+ if(((objheader_t *)oldptr)->version <= ((objheader_t *)modptr)->version) {
+ prehashRemove(oid);
+ prehashInsert(oid, modptr);
} else {
- printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__);
- free(buffer);
- return;
+ /* TODO modptr should be reference counted */
}
+ } else {/* Else add the object ptr to hash table*/
+ prehashInsert(oid, modptr);
}
- free(buffer);
+ /* Lock the Prefetch Cache look up table*/
+ pthread_mutex_lock(&pflookup.lock);
+ /* Broadcast signal on prefetch cache condition variable */
+ pthread_cond_broadcast(&pflookup.cond);
+ /* Unlock the Prefetch Cache look up table*/
+ pthread_mutex_unlock(&pflookup.lock);
+ } else if(control == OBJECT_NOT_FOUND) {
+ oid = *((unsigned int *)(recvbuffer + sizeof(char)));
+ /* TODO: For each object not found query DHT for new location and retrieve the object */
+ /* Throw an error */
+ printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
+ free(recvbuffer);
+ exit(-1);
+ } else {
+ printf("Error: in decoding the control value %d, %s, %d\n",control, __FILE__, __LINE__);
}
- } else
- printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
- return;
+ free(recvbuffer);
+ }
+
+ return 0;
}
unsigned short getObjType(unsigned int oid)