From faaea186e1850c8c8f986e0fdaf6829ad4ae7b27 Mon Sep 17 00:00:00 2001 From: adash Date: Sat, 26 Jan 2008 01:48:15 +0000 Subject: [PATCH] fix memory corruption errors and replace mallocs with calloc. Current fix works fine for testcase Atomic2.java and Atomic3.java other minor fixes Remove prefetchpile.c file (was not used) --- .../src/Runtime/DSTM/interface/dstmserver.c | 258 +++++++++--------- .../src/Runtime/DSTM/interface/machinepile.c | 86 ++++-- .../src/Runtime/DSTM/interface/machinepile.h | 2 +- Robust/src/Runtime/DSTM/interface/mcpileq.c | 39 ++- Robust/src/Runtime/DSTM/interface/mcpileq.h | 2 +- Robust/src/Runtime/DSTM/interface/objstr.c | 6 +- Robust/src/Runtime/DSTM/interface/queue.c | 1 + Robust/src/Runtime/DSTM/interface/trans.c | 97 ++++--- 8 files changed, 281 insertions(+), 210 deletions(-) diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 5928799c..fd328e45 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -140,13 +140,14 @@ void *dstmAccept(void *acceptfd) switch(control) { case READ_REQUEST: + printf("DEBUG -> Recv READ_REQUEST\n"); /* Read oid requested and search if available */ if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) { perror("Error receiving object from cooridnator\n"); pthread_exit(NULL); } if((srcObj = mhashSearch(oid)) == NULL) { - printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__); + printf("Object 0x%x is not found in Main Object Store %s %d\n", oid, __FILE__, __LINE__); pthread_exit(NULL); } h = (objheader_t *) srcObj; @@ -217,36 +218,37 @@ void *dstmAccept(void *acceptfd) case THREAD_NOTIFY_REQUEST: size = sizeof(unsigned int); - retval = recv((int)acceptfd, ptr, size, 0); - numoid = *((unsigned int *) ptr); + retval = recv((int)acceptfd, &numoid, size, 0); size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int); - retval = recv((int)acceptfd, ptr, size, 0); + bzero(&buffer, RECEIVE_BUFFER_SIZE); + retval = recv((int)acceptfd, &buffer, size, 0); oidarry = calloc(numoid, sizeof(unsigned int)); - memcpy(oidarry, ptr, sizeof(unsigned int) * numoid); + memcpy(oidarry, buffer, sizeof(unsigned int) * numoid); size = sizeof(unsigned int) * numoid; versionarry = calloc(numoid, sizeof(unsigned short)); - memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid); + memcpy(versionarry, buffer+size, sizeof(unsigned short) * numoid); size += sizeof(unsigned short) * numoid; - mid = *((unsigned int *)(ptr+size)); + mid = *((unsigned int *)(buffer+size)); size += sizeof(unsigned int); - threadid = *((unsigned int *)(ptr+size)); + threadid = *((unsigned int *)(buffer+size)); processReqNotify(numoid, oidarry, versionarry, mid, threadid); break; case THREAD_NOTIFY_RESPONSE: size = sizeof(unsigned short) + 2 * sizeof(unsigned int); - retval = recv((int)acceptfd, ptr, size, 0); + bzero(&buffer, RECEIVE_BUFFER_SIZE); + retval = recv((int)acceptfd, &buffer, size, 0); if(retval <= 0) perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg"); else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short)) printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval); else { - oid = *((unsigned int *)ptr); + oid = *((unsigned int *)buffer); size = sizeof(unsigned int); - version = *((unsigned short *)(ptr+size)); + version = *((unsigned short *)(buffer+size)); size += sizeof(unsigned short); - threadid = *((unsigned int *)(ptr+size)); + threadid = *((unsigned int *)(buffer+size)); threadNotify(oid,version,threadid); } @@ -640,119 +642,123 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock * then use offset values to prefetch references to other objects */ int prefetchReq(int acceptfd) { - int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0; - int isArray = 0; - unsigned int oid, index = 0; - char *ptr, buffer[PRE_BUF_SIZE]; - void *mobj; - unsigned int objoid; - char control; - objheader_t * header; - int bytesRecvd; - - /* Repeatedly recv the oid and offset pairs sent for prefetch */ - while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { - count++; - if(length == -1) - break; - sum = 0; - index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the - // first 4 bytes are saved to send the - // size of the buffer (that is computed at the end of the loop) - bytesRecvd = 0; - do { - bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd, - sizeof(unsigned int) - bytesRecvd, 0); - } while (bytesRecvd < sizeof(unsigned int)); - numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); - N = numoffset * sizeof(short); - short offset[numoffset]; - ptr = (char *)&offset; - /* Recv the offset values per oid */ - do { - n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); - sum += n; - } while(sum < N && n != 0); - - /* Process each oid */ - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ - /* Save the oids not found in buffer for later use */ - *(buffer + index) = OBJECT_NOT_FOUND; - index += sizeof(char); - memcpy(buffer+index, &oid, sizeof(unsigned int)); - index += sizeof(unsigned int); - } else { /* If Obj found in machine (i.e. has not moved) */ - /* send the oid, it's size, it's header and data */ - header = mobj; - GETSIZE(size, header); - size += sizeof(objheader_t); - *(buffer + index) = OBJECT_FOUND; - index += sizeof(char); - memcpy(buffer+index, &oid, sizeof(unsigned int)); - index += sizeof(unsigned int); - memcpy(buffer+index, &size, sizeof(int)); - index += sizeof(int); - memcpy(buffer + index, header, size); - index += size; - /* Calculate the oid corresponding to the offset value */ - for(i = 0 ; i< numoffset ; i++) { - /* Check for arrays */ - if(TYPE(header) > NUMCLASSES) { - isArray = 1; - } - if(isArray == 1) { - int elementsize = classsize[TYPE(header)]; - objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i]))); - } else { - objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i])); - } - if((header = mhashSearch(objoid)) == NULL) { - /* Obj not found, send oid */ - *(buffer + index) = OBJECT_NOT_FOUND; - index += sizeof(char); - memcpy(buffer+index, &oid, sizeof(unsigned int)); - index += sizeof(unsigned int); - break; - } else {/* Obj Found */ - /* send the oid, it's size, it's header and data */ - GETSIZE(size, header); - size+=sizeof(objheader_t); - *(buffer + index) = OBJECT_FOUND; - index += sizeof(char); - memcpy(buffer+index, &oid, sizeof(unsigned int)); - index += sizeof(unsigned int); - memcpy(buffer+index, &size, sizeof(int)); - index += sizeof(int); - memcpy(buffer+index, header, size); - index += size; - isArray = 0; - continue; - } - } - } - /* Check for overflow in the buffer */ - if (index >= PRE_BUF_SIZE) { - printf("Char buffer is overflowing\n"); - return 1; - } - /* Send Prefetch response control message only once*/ - if(count == 1) { - control = TRANS_PREFETCH_RESPONSE; - if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { - perror("Error in sending PREFETCH RESPONSE to Coordinator\n"); - return 1; - } - } - - /* Add the buffer size into buffer as a parameter */ - *((unsigned int *)buffer)=index; - /* Send the entire buffer with its size and oids found and not found */ - if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) { - perror("Error sending oids found\n"); - return 1; - } - } - return 0; + int i, length, sum, n, numbytes, numoffset, N, size, count = 0; + int isArray = 0, bytesRecvd; + unsigned int oid, index = 0; + unsigned int objoid, myIpAddr; + char *ptr, control, buffer[PRE_BUF_SIZE]; + void *mobj; + objheader_t * header; + +#ifdef MAC + myIpAddr = getMyIpAddr("en1"); +#else + myIpAddr = getMyIpAddr("eth0"); +#endif + + /* Repeatedly recv the oid and offset pairs sent for prefetch */ + while(numbytes = recv((int)acceptfd, &length, sizeof(int), 0) != 0) { + count++; + if(length == -1) + break; + index = sizeof(unsigned int); // Index starts with sizeof unsigned int because the + // first 4 bytes are saved to send the + // size of the buffer (that is computed at the end of the loop) + bytesRecvd = 0; + do { + bytesRecvd += recv((int)acceptfd, (char *)&oid +bytesRecvd, + sizeof(unsigned int) - bytesRecvd, 0); + } while (bytesRecvd < sizeof(unsigned int)); + numoffset = (length - (sizeof(int) + sizeof(unsigned int)))/ sizeof(short); + N = numoffset * sizeof(short); + short offset[numoffset]; + ptr = (char *)&offset; + sum = 0; + /* Recv the offset values per oid */ + do { + n = recv((int)acceptfd, (void *)ptr+sum, N-sum, 0); + sum += n; + } while(sum < N && n != 0); + + /* Process each oid */ + if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + /* Save the oids not found in buffer for later use */ + *(buffer + index) = OBJECT_NOT_FOUND; + index += sizeof(char); + *((unsigned int *)(buffer+index)) = oid; + index += sizeof(unsigned int); + } else { /* If Obj found in machine (i.e. has not moved) */ + /* send the oid, it's size, it's header and data */ + header = (objheader_t *)mobj; + GETSIZE(size, header); + size += sizeof(objheader_t); + *(buffer + index) = OBJECT_FOUND; + index += sizeof(char); + *((unsigned int *)(buffer+index)) = oid; + index += sizeof(unsigned int); + *((int *)(buffer+index)) = size; + index += sizeof(int); + memcpy(buffer + index, header, size); + index += size; + /* Calculate the oid corresponding to the offset value */ + for(i = 0 ; i< numoffset ; i++) { + /* Check for arrays */ + if(TYPE(header) > NUMCLASSES) { + isArray = 1; + } + if(isArray == 1) { + int elementsize = classsize[TYPE(header)]; + objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*offset[i]))); + } else { + objoid = *((unsigned int *)(((char *)header) + sizeof(objheader_t) + offset[i])); + } + if((header = mhashSearch(objoid)) == NULL) { + /* Obj not found, send oid */ + *(buffer + index) = OBJECT_NOT_FOUND; + index += sizeof(char); + *((unsigned int *)(buffer+index)) = objoid; + index += sizeof(unsigned int); + break; + } else {/* Obj Found */ + /* send the oid, it's size, it's header and data */ + GETSIZE(size, header); + size+=sizeof(objheader_t); + *(buffer+index) = OBJECT_FOUND; + index += sizeof(char); + *((unsigned int *)(buffer+index)) = objoid; + index += sizeof(unsigned int); + *((int *)(buffer+index)) = size; + index += sizeof(int); + memcpy(buffer+index, header, size); + index += size; + isArray = 0; + continue; + } + } + } + /* Check for overflow in the buffer */ + if (index >= PRE_BUF_SIZE) { + printf("Char buffer is overflowing\n"); + return 1; + } + /* Send Prefetch response control message only once*/ + if(count == 1){ + control = TRANS_PREFETCH_RESPONSE; + if((numbytes = send(acceptfd, &control, sizeof(char), MSG_NOSIGNAL)) < sizeof(char)) { + perror("Error in sending PREFETCH RESPONSE to Coordinator\n"); + return 1; + } + } + + /* Add the buffer size into buffer as a parameter */ + *((unsigned int *)buffer)=index; + /* Send the entire buffer with its size and oids found and not found */ + if(send((int)acceptfd, &buffer, index, MSG_NOSIGNAL) < sizeof(index -1)) { + perror("Error sending oids found\n"); + return 1; + } + } + return 0; } void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) { @@ -818,7 +824,7 @@ checkversion: STATUS(header) &= ~(LOCK); } else { randomdelay(); - printf("DEBUG-> processReqNotify() Object is still locked\n"); + printf("processReqNotify() Object is still locked\n"); goto checkversion; } } diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.c b/Robust/src/Runtime/DSTM/interface/machinepile.c index ec5e9ba9..c27ea01b 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.c +++ b/Robust/src/Runtime/DSTM/interface/machinepile.c @@ -1,22 +1,30 @@ #include "machinepile.h" -int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t **head) { - prefetchpile_t *tmp = *head; +prefetchpile_t *insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefetchpile_t *head) { + prefetchpile_t *tmp = head; + prefetchpile_t *ptr; objpile_t *objnode; unsigned int *oidarray; - int ntuples; + short *offvalues; + int i; char found = 0; while (tmp != NULL) { if (tmp->mid == mid) { // Found a match with exsisting machine id if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return -1; + return NULL; + } + if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; } /* Fill objpiles DS */ objnode->oid = oid; objnode->numoffset = numoffset; - objnode->offset = offset; + for(i = 0; ioffset = offvalues; objnode->next = tmp->objpiles; tmp->objpiles = objnode; found = 1; @@ -24,26 +32,54 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet } tmp = tmp->next; } - if (!found) {// Not found => insert new mid DS - if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return -1; - } - tmp->mid = mid; - if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { - printf("Calloc error: %s %d\n", __FILE__, __LINE__); - return -1; + + tmp = head; + if(found != 1) { + if(tmp->mid == 0) {//First time + tmp->mid = mid; + if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + // Fill objpiles DS + objnode->oid = oid; + objnode->numoffset = numoffset; + for(i = 0; ioffset = offvalues; + objnode->next = NULL; + tmp->objpiles = objnode; + tmp->next = NULL; + } else { + if ((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + tmp->mid = mid; + if ((objnode = (objpile_t *) calloc(1, sizeof(objpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + if ((offvalues = (short *) calloc(numoffset, sizeof(short))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + // Fill objpiles DS + objnode->oid = oid; + objnode->numoffset = numoffset; + for(i = 0; ioffset = offvalues; + objnode->next = NULL; + tmp->objpiles = objnode; + tmp->next = head; + head = tmp; } - /* Fill objpiles DS */ - objnode->oid = oid; - objnode->numoffset = numoffset; - objnode->offset = offset; - objnode->next = tmp->objpiles; // i.e., objnode->next = NULL; - tmp->objpiles = objnode; - tmp->next = *head; - *head = tmp; } - return 0; + + return head; } - - diff --git a/Robust/src/Runtime/DSTM/interface/machinepile.h b/Robust/src/Runtime/DSTM/interface/machinepile.h index 7d98b214..8add41b7 100644 --- a/Robust/src/Runtime/DSTM/interface/machinepile.h +++ b/Robust/src/Runtime/DSTM/interface/machinepile.h @@ -5,6 +5,6 @@ #include #include -int insertPile(int, unsigned int, short, short *, prefetchpile_t **); +prefetchpile_t *insertPile(int, unsigned int, short, short *, prefetchpile_t *); #endif diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.c b/Robust/src/Runtime/DSTM/interface/mcpileq.c index d6da34f3..fea54094 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.c +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.c @@ -1,6 +1,6 @@ #include "mcpileq.h" -mcpileq_t mcqueue; +mcpileq_t mcqueue; //Global queue void mcpileqInit(void) { /* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */ @@ -14,17 +14,13 @@ void mcpileqInit(void) { } /* Insert to the rear of machine pile queue */ -void mcpileenqueue(prefetchpile_t *node) { - prefetchpile_t *tmp, *prev; +void mcpileenqueue(prefetchpile_t *node, prefetchpile_t *tail) { if(mcqueue.front == NULL && mcqueue.rear == NULL) { - mcqueue.front = mcqueue.rear = node; + mcqueue.front = node; + mcqueue.rear = tail; } else { - tmp = mcqueue.rear->next = node; - while(tmp != NULL) { - prev = tmp; - tmp = tmp->next; - } - mcqueue.rear = prev; + mcqueue.rear->next = node; + mcqueue.rear = tail; } } @@ -32,7 +28,7 @@ void mcpileenqueue(prefetchpile_t *node) { prefetchpile_t *mcpiledequeue(void) { prefetchpile_t *retnode; if(mcqueue.front == NULL) { - printf("Machune pile queue empty: Underfloe %s %d\n", __FILE__, __LINE__); + printf("Machine pile queue empty: Underflow %s %d\n", __FILE__, __LINE__); return NULL; } retnode = mcqueue.front; @@ -73,19 +69,16 @@ void mcdealloc(prefetchpile_t *node) { while (prefetchpile_ptr != NULL) { - objpile_ptr = prefetchpile_ptr->objpiles; - while (objpile_ptr != NULL) - { - if (objpile_ptr->numoffset > 0) - free(objpile_ptr->offset); - objpile_next_ptr = objpile_ptr->next; + prefetchpile_next_ptr = prefetchpile_ptr; + while(prefetchpile_ptr->objpiles != NULL) { + if(prefetchpile_ptr->objpiles->numoffset > 0) { + free(prefetchpile_ptr->objpiles->offset); + } + objpile_ptr = prefetchpile_ptr->objpiles; + prefetchpile_ptr->objpiles = objpile_ptr->next; free(objpile_ptr); - objpile_ptr = objpile_next_ptr; } - prefetchpile_next_ptr = prefetchpile_ptr->next; - free(prefetchpile_ptr); - prefetchpile_ptr = prefetchpile_next_ptr; + prefetchpile_ptr = prefetchpile_next_ptr->next; + free(prefetchpile_next_ptr); } } - - diff --git a/Robust/src/Runtime/DSTM/interface/mcpileq.h b/Robust/src/Runtime/DSTM/interface/mcpileq.h index 8c570d7f..26a3de2c 100644 --- a/Robust/src/Runtime/DSTM/interface/mcpileq.h +++ b/Robust/src/Runtime/DSTM/interface/mcpileq.h @@ -29,7 +29,7 @@ typedef struct mcpileq { }mcpileq_t; void mcpileqInit(void); -void mcpileenqueue(prefetchpile_t *); +void mcpileenqueue(prefetchpile_t *, prefetchpile_t *); prefetchpile_t *mcpiledequeue(void); void mcpiledelete(); void mcpiledisplay(); diff --git a/Robust/src/Runtime/DSTM/interface/objstr.c b/Robust/src/Runtime/DSTM/interface/objstr.c index 90657338..20004396 100644 --- a/Robust/src/Runtime/DSTM/interface/objstr.c +++ b/Robust/src/Runtime/DSTM/interface/objstr.c @@ -2,7 +2,7 @@ objstr_t *objstrCreate(unsigned int size) { - objstr_t *tmp = malloc(sizeof(objstr_t) + size); + objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size)); tmp->size = size; tmp->next = NULL; tmp->top = tmp + 1; //points to end of objstr_t structure! @@ -38,7 +38,7 @@ void *objstrAlloc(objstr_t *store, unsigned int size) { //end of list, all full if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects { - store->next = (objstr_t *)malloc(sizeof(objstr_t) + size); + store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size)); if (store->next == NULL) return NULL; store = store->next; @@ -46,7 +46,7 @@ void *objstrAlloc(objstr_t *store, unsigned int size) } else { - store->next = malloc(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE); + store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE)); if (store->next == NULL) return NULL; store = store->next; diff --git a/Robust/src/Runtime/DSTM/interface/queue.c b/Robust/src/Runtime/DSTM/interface/queue.c index 954a52b5..6837726b 100644 --- a/Robust/src/Runtime/DSTM/interface/queue.c +++ b/Robust/src/Runtime/DSTM/interface/queue.c @@ -57,6 +57,7 @@ prefetchqelem_t *pre_dequeue(void) { pqueue.front = pqueue.front->next; if (pqueue.front == NULL) pqueue.rear = NULL; + retnode->next = NULL; return retnode; } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 48dcd424..419ed321 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -84,6 +84,7 @@ inline int findmax(int *array, int arraylength) { void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { int qnodesize; int len = 0; + int i; /* Allocate for the queue node*/ char *node; @@ -102,6 +103,7 @@ void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short memcpy(node + len, endoffsets, ntuples*sizeof(short)); len += ntuples * sizeof(short); memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); + /* Lock and insert into primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); pre_enqueue((prefetchqelem_t *)node); @@ -189,6 +191,7 @@ void transInit() { do { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); + pthread_detach(tPrefetch); //Create and Initialize a pool of threads @@ -228,7 +231,7 @@ void randomdelay() /* This function initializes things required in the transaction start*/ transrecord_t *transStart() { - transrecord_t *tmp = malloc(sizeof(transrecord_t)); + transrecord_t *tmp = calloc(1, sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); #ifdef COMPILER @@ -325,8 +328,6 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) { /* Get the object from the remote location */ machinenumber = lhashSearch(oid); - char* ipaddr; - midtoIP(machinenumber, ipaddr); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__); @@ -458,7 +459,7 @@ int transCommit(transrecord_t *record) { pthread_mutex_t tlshrd; thread_data_array_t *thread_data_array; - if((thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount)) == NULL) { + if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { printf("Malloc error %s, %d\n", __FILE__, __LINE__); pthread_cond_destroy(&tcond); pthread_mutex_destroy(&tlock); @@ -718,7 +719,7 @@ void *transRequest(void *threadarg) { /* Close connection */ close(sd); - //pthread_exit(NULL); + pthread_exit(NULL); } /* This function decides the reponse that needs to be sent to @@ -812,7 +813,7 @@ char sendResponse(thread_data_array_t *tdata, int sd) { /* This function opens a connection, places an object read request to the * remote machine, reads the control message and object if available and * copies the object and its header to the local cache. - * TODO replace mnum and midtoIP() with MACHINE_IP address later */ + * */ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int sd, size, val; @@ -827,13 +828,14 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { perror("Error in socket\n"); return NULL; } + bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); midtoIP(mnum,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); + /* Open connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect\n"); @@ -999,7 +1001,7 @@ void *handleLocalReq(void *threadarg) { free(localtdata->transinfo->objnotfound); } - //pthread_exit(NULL); + pthread_exit(NULL); } /* This function completes the ABORT process if the transaction is aborting */ @@ -1020,12 +1022,12 @@ int transAbortProcess(local_thread_data_array_t *localtdata) { } printf("TRANS_ABORTED\n"); + return 0; } /*This function completes the COMMIT process is the transaction is commiting*/ int transComProcess(local_thread_data_array_t *localtdata) { - static int prevsize = 0, *prevptr; objheader_t *header, *tcptr; int i, nummod, tmpsize, numcreated, numlocked; unsigned int *oidmod, *oidcreated, *oidlocked; @@ -1160,9 +1162,8 @@ void checkPrefetchTuples(prefetchqelem_t *node) { } /* This function makes machine piles to be added into the machine pile queue for each prefetch call */ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { - char *ptr, *tmp; - int ntuples, slength, i, machinenum; - int maxoffset; + char *ptr; + int ntuples, i, machinenum, count=0; unsigned int *oid; short *endoffsets, *arryfields, *offset; prefetchpile_t *head = NULL; @@ -1174,6 +1175,11 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + if((head = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + /* Check for redundant tuples by comparing oids of each tuple */ for(i = 0; i < ntuples; i++) { if(oid[i] == 0) @@ -1184,9 +1190,18 @@ prefetchpile_t *makePreGroups(prefetchqelem_t *node, int *numoffset) { return NULL; } /* Insert into machine pile */ - offset = &arryfields[endoffsets[i-1]]; - insertPile(machinenum, oid[i], numoffset[i], offset, &head); + if(i == 0){ + offset = &arryfields[0]; + } else { + offset = &arryfields[endoffsets[i-1]]; + } + + if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){ + printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__); + return NULL; + } } + return head; } @@ -1205,6 +1220,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ int numoffset[ntuples];//Number of offsets for each tuple numoffset[0] = endoffsets[0]; @@ -1283,10 +1299,14 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { /* Look in Prefetch cache */ checkPreCache(node, numoffset, oid[i],i); } - } + /* Make machine groups */ - head = makePreGroups(node, numoffset); + if((head = makePreGroups(node, numoffset)) == NULL) { + printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__); + return NULL; + } + return head; } @@ -1360,6 +1380,7 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i void *transPrefetch(void *t) { prefetchqelem_t *qnode; prefetchpile_t *pilehead = NULL; + prefetchpile_t *ptr = NULL, *piletail = NULL; while(1) { /* lock mutex of primary prefetch queue */ @@ -1376,23 +1397,35 @@ void *transPrefetch(void *t) { pthread_exit(NULL); } pthread_mutex_unlock(&pqueue.qlock); + /* Reduce redundant prefetch requests */ checkPrefetchTuples(qnode); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ - pilehead = foundLocal(qnode); + if((pilehead = foundLocal(qnode)) == NULL) { + printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + + ptr = pilehead; + while(ptr != NULL) { + if(ptr->next == NULL) { + piletail = ptr; + } + ptr = ptr->next; + } /* Lock mutex of pool queue */ pthread_mutex_lock(&mcqueue.qlock); /* Update the pool queue with the new remote machine piles generated per prefetch call */ - mcpileenqueue(pilehead); + mcpileenqueue(pilehead, piletail); /* Broadcast signal on machine pile queue */ pthread_cond_broadcast(&mcqueue.qcond); /* Unlock mutex of machine pile queue */ pthread_mutex_unlock(&mcqueue.qlock); /* Deallocate the prefetch queue pile node */ predealloc(qnode); - + pthread_exit(NULL); } } @@ -1428,17 +1461,17 @@ void *mcqProcess(void *threadid) { /* Deallocate the machine queue pile node */ mcdealloc(mcpilenode); + pthread_exit(NULL); } } void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { - int sd, i, offset, off, len, endpair, count = 0; + int sd, i, off, len, endpair, count = 0; struct sockaddr_in serv_addr; struct hostent *server; char machineip[16], control; objpile_t *tmp; - /* Send Trans Prefetch Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for SEND_PREFETCH_REQUEST\n"); @@ -1470,16 +1503,17 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; while(tmp != NULL) { - off = offset = 0; + off = 0; count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); char oidnoffset[len]; + bzero(oidnoffset, len); memcpy(oidnoffset, &len, sizeof(int)); off = sizeof(int); memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); off += sizeof(unsigned int); for(i = 0; i < tmp->numoffset; i++) { - memcpy(oidnoffset + off, &tmp->offset[i], sizeof(short)); + memcpy(oidnoffset + off, &(tmp->offset[i]), sizeof(short)); off+=sizeof(short); } if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { @@ -1487,6 +1521,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { close(sd); return; } + tmp = tmp->next; } @@ -1511,6 +1546,7 @@ void getPrefetchResponse(int count, int sd) { char *ptr; void *modptr, *oldptr; + /* Read prefetch response from the Remote machine */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for Prefetch request sent\n"); @@ -1527,23 +1563,24 @@ void getPrefetchResponse(int count, int sd) { perror("Size of buffer not recv\n"); return; } - memcpy(&bufsize, buffer, sizeof(unsigned int)); + bufsize = *((unsigned int *) buffer); ptr = buffer + sizeof(unsigned int); /* Keep receiving the buffer containing oid info */ do { n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0); sum +=n; } while(sum < bufsize && n != 0); + /* Decode the contents of the buffer */ index = sizeof(unsigned int); while(index < (bufsize - sizeof(unsigned int))) { if(buffer[index] == OBJECT_FOUND) { /* Increment it to get the object */ index += sizeof(char); - memcpy(&oid, buffer + index, sizeof(unsigned int)); + oid = *((unsigned int *)(buffer+index)); index += sizeof(unsigned int); /* For each object found add to Prefetch Cache */ - memcpy(&objsize, buffer + index, sizeof(int)); + objsize = *((int *)(buffer+index)); index+=sizeof(int); pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { @@ -1581,7 +1618,6 @@ void getPrefetchResponse(int count, int sd) { /* Increment it to get the object */ /* TODO: For each object not found query DHT for new location and retrieve the object */ index += sizeof(char); - //memcpy(&oid, buffer + index, sizeof(unsigned int)); oid = *((unsigned int *)(buffer + index)); index += sizeof(unsigned int); /* Throw an error */ @@ -1845,7 +1881,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n return -1; } else { msg[0] = THREAD_NOTIFY_REQUEST; - msg[1] = numoid; + *((unsigned int *)(&msg[1])) = numoid; /* Send array of oids */ size = sizeof(unsigned int); { @@ -1863,7 +1899,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n i = 0; while(i < numoid) { version = versionarry[i]; - *((unsigned short *)(&msg[1] + size)) = oid; + *((unsigned short *)(&msg[1] + size)) = version; size += sizeof(unsigned short); i++; } @@ -1878,7 +1914,7 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n if (bytesSent < 0){ perror("reqNotify():send()"); status = -1; - } else if (bytesSent != 1 + 5*sizeof(unsigned int)){ + } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int)){ printf("reNotify(): error, sent %d bytes\n", bytesSent); status = -1; } else { @@ -1891,7 +1927,6 @@ int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int n close(sock); return status; } - void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { notifydata_t *ndata; int i, objIsFound = 0, index; -- 2.34.1