perror("Error receiving object from cooridnator\n");
return NULL;
}
- srcObj = mhashSearch(oid);
+ if((srcObj = mhashSearch(oid)) == NULL) {
+ printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+ }
h = (objheader_t *) srcObj;
GETSIZE(size, h);
size += sizeof(objheader_t);
}
} else {
/* Type */
- char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
- *((int *)&msg[1])=size;
- if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
- perror("Error sending size of object to coordinator\n");
- return NULL;
- }
- if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
- perror("Error in sending object\n");
- return NULL;
- }
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ if(send((int)acceptfd, &msg, sizeof(msg), MSG_NOSIGNAL) < sizeof(msg)) {
+ perror("Error sending size of object to coordinator\n");
+ return NULL;
+ }
+ if(send((int)acceptfd, h, size, MSG_NOSIGNAL) < size) {
+ perror("Error in sending object\n");
+ return NULL;
+ }
}
break;
/*Process the information read */
if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
+ /* Free resources */
+ if(oidmod != NULL) {
+ free(oidmod);
+ oidmod = NULL;
+ }
return 1;
}
-
/* Free resources */
- free(oidmod);
+ if(oidmod != NULL) {
+ free(oidmod);
+ oidmod = NULL;
+ }
return 0;
}
header = mhashSearch(transinfo->objlocked[i]);// find the header address
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
-
+
/* Send ack to Coordinator */
printf("DEBUG -> Recv TRANS_ABORT\n");
sendctrl = TRANS_SUCESSFUL;
if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
+ transinfo->objlocked = NULL;
+ }
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
+ transinfo->objnotfound = NULL;
+ }
+
return 1;
}
ptr = NULL;
printf("DEBUG -> Recv TRANS_COMMIT \n");
if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
+ /* Free memory */
+ printf("DEBUG -> Freeing...\n");
+ fflush(stdout);
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
+ transinfo->objlocked = NULL;
+ }
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
+ transinfo->objnotfound = NULL;
+ }
return 1;
}
break;
//TODO Use fixed.trans_id TID since Client may have died
break;
}
+
/* Free memory */
printf("DEBUG -> Freeing...\n");
fflush(stdout);
-
+
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
transinfo->objlocked = NULL;
free(transinfo->objnotfound);
transinfo->objnotfound = NULL;
}
+
return 0;
}
header = (objheader_t *) mhashSearch(oidmod[i]);
header->version += 1;
}
+
/* Unlock locked objects */
for(i = 0; i < numlocked; i++) {
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
}
STATUS(header) &= ~(LOCK);
}
-
//TODO Update location lookup table
/* Send ack to coordinator */
extern primarypfq_t pqueue; // shared prefetch queue
extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
objstr_t *prefetchcache; //Global Prefetch cache
-pthread_mutex_t prefetchcache_mutex;
-extern pthread_mutex_t mainobjstore_mutex;
+pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
+extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
-pthread_t tPrefetch;
+pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */
extern objstr_t *mainobjstore;
unsigned int myIpAddr;
unsigned int *hostIpAddrs;
rc = gettimeofday(&tp, NULL);
/* Convert from timeval to timespec */
- ts.tv_nsec = tp.tv_usec * 1000;
+ ts.tv_nsec = tp.tv_usec * 10;
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
tmp = mhashSearch(oid);
GETSIZE(size, tmp);
size += sizeof(objheader_t);
+ //TODO:Lock the local trans cache while copying the object here
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
found = 1;
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
+ //TODO:Lock the local trans cache while copying the object here
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
/* Insert into cache's lookup table */
#else
return objcopy;
#endif
- } else { /* If not found anywhere, then block until object appears in prefetch cache */
-#if 0
- printf("Inside remote machine\n");
- pthread_mutex_lock(&pflookup.lock);
+ } else {
+ /*If object not found in prefetch cache then block until object appears in the prefetch cache */
+ pthread_mutex_lock(&prefetchcache_mutex);
while(!found) {
- rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
+ rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
if(rc == ETIMEDOUT) {
printf("Wait timed out\n");
/* Check Prefetch cache again */
- if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+ if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
found = 1;
- GETSIZE(size, tmp);
+ GETSIZE(size,tmp);
size+=sizeof(objheader_t);
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
- /* Insert into cache's lookup table */
chashInsert(record->lookupTable, OID(tmp), objcopy);
+ pthread_mutex_unlock(&prefetchcache_mutex);
#ifdef COMPILER
return &objcopy[1];
#else
return objcopy;
#endif
} else {
- pthread_mutex_unlock(&pflookup.lock);
+ pthread_mutex_unlock(&prefetchcache_mutex);
break;
}
- pthread_mutex_unlock(&pflookup.lock);
}
}
-#endif
+
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
- //If object is not found in Remote location
+ printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
return NULL;
} else {
#ifdef COMPILER
return objcopy;
#endif
}
- }
+ }
}
/* This function creates objects in the transaction record */
tosend->oidcreated = pile->oidcreated;
thread_data_array[threadnum].thread_id = threadnum;
thread_data_array[threadnum].mid = pile->mid;
- thread_data_array[threadnum].pilecount = pilecount;
thread_data_array[threadnum].buffer = tosend;
thread_data_array[threadnum].recvmsg = rcvd_control_msg;
thread_data_array[threadnum].threshold = &tcond;
return 1;
}
}
+
threadnum++;
pile = pile->next;
}
/* Free attribute and wait for the other threads */
pthread_attr_destroy(&attr);
- for (i = 0 ;i < pilecount ; i++) {
+
+ for (i = 0; i < pilecount; i++) {
rc = pthread_join(thread[i], NULL);
if(rc)
{
}
free(thread_data_array[i].buffer);
}
+
/* Free resources */
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
- free(listmid);
+ if(listmid != NULL)
+ free(listmid);
pDelete(pile_ptr);
- free(thread_data_array);
- free(ltdata);
+ if(thread_data_array != NULL)
+ free(thread_data_array);
+ if(ltdata != NULL)
+ free(ltdata);
/* wait a random amount of time */
if (treplyretry == 1)
}
/* Send list of machines involved in the transaction */
{
- int size=sizeof(unsigned int)*tdata->pilecount;
+ int size=sizeof(unsigned int)*tdata->buffer->f.mcount;
if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
perror("Error sending list of machines for thread\n");
pthread_exit(NULL);
(*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
/* Wake up the threads and invoke decideResponse (once) */
- if(*(tdata->count) == tdata->pilecount) {
+ if(*(tdata->count) == tdata->buffer->f.mcount) {
if (decideResponse(tdata) != 0) {
printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(tdata->lock);
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
- for (i = 0 ; i < tdata->pilecount ; i++) {
+ for (i = 0 ; i < tdata->buffer->f.mcount; i++) {
control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
written onto the shared array */
switch(control) {
objstrDelete(tdata->rec->cache);
chashDelete(tdata->rec->lookupTable);
free(tdata->rec);
- } else if(transagree == tdata->pilecount){
+ } else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
/* Free resources */
headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
GETSIZE(size,headeraddr);
size+=sizeof(objheader_t);
- memcpy(modptr+offset, headeraddr, size);
+ memcpy((char *)modptr+offset, headeraddr, size);
offset += size;
}
/* Write new objects into the mainobject store */
headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
GETSIZE(size, headeraddr);
size+=sizeof(objheader_t);
- memcpy(modptr+offset, headeraddr, size);
+ memcpy((char *)modptr+offset, headeraddr, size);
offset += size;
}
(*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */
/* Wake up the threads and invoke decideResponse (once) */
- if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) {
+ if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) {
if (decideResponse(localtdata->tdata) != 0) {
printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
pthread_mutex_unlock(localtdata->tdata->lock);
}
/* Free memory */
-
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
localtdata->transinfo->objlocked = NULL;
void *header;
/* Set all ref counts as 1 and do garbage collection */
- ptr = (char *)modptr;
+ ptr = modptr;
for(i = 0; i< nummod; i++) {
- int tmpsize;
+ int tmpsize;
tmp_header = (objheader_t *)ptr;
tmp_header->rcount = 0;
GETSIZE(tmpsize, tmp_header);
}
/* Send ack to Coordinator */
+ printf("TRANS_SUCCESSFUL\n");
/*Free the pointer */
ptr = NULL;
for (i = 0; i < numcreated; i++)
{
- int tmpsize;
+ int tmpsize;
header = (objheader_t *)(((char *)modptr) + offset);
mhashInsert(oidcreated[i], (((char *)modptr) + offset));
GETSIZE(tmpsize, header);
//TODO Update location lookup table
/* Send ack to Coordinator */
+ printf("TRANS_SUCCESSFUL\n");
return 0;
}
if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
{
prefetch(1, &oid, &numoffsets, NULL);
+ pthread_mutex_lock(&pflookup.lock);
while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
{
- pthread_mutex_lock(&pflookup.lock);
pthread_cond_wait(&pflookup.cond, &pflookup.lock);
- pthread_mutex_unlock(&pflookup.lock);
}
+ pthread_mutex_unlock(&pflookup.lock);
}
}