void mcpileqInit(void) {
/* Initialize machine queue that containing prefetch oids and offset values sorted by remote machineid */
mcqueue.front = mcqueue.rear = NULL;
- pthread_mutex_init(&mcqueue.qlock, NULL);
+ //Intiliaze and set machile pile queue's mutex attribute
+ pthread_mutexattr_init(&mcqueue.qlockattr);
+ pthread_mutexattr_settype(&mcqueue.qlockattr, PTHREAD_MUTEX_RECURSIVE_NP);
+ //pthread_mutex_init(&mcqueue.qlock, NULL);
+ pthread_mutex_init(&mcqueue.qlock,&mcqueue.qlockattr);
pthread_cond_init(&mcqueue.qcond, NULL);
}
-/* Insert to the rear of machine pile queue */
-/*
-void mcpileenqueue(prefetchpile_t *node) {
- if(mcqueue.front == NULL && mcqueue.rear == NULL) {
- mcqueue.front = mcqueue.rear = node;
- } else {
- node->next = NULL;
- mcqueue.rear->next = node;
- mcqueue.rear = node;
- }
-}
-*/
-
/* Insert to the rear of machine pile queue */
void mcpileenqueue(prefetchpile_t *node) {
prefetchpile_t *tmp, *prev;
/* Global Variables */
extern int classsize[];
-extern primarypfq_t pqueue; // shared prefetch queue
+extern primarypfq_t pqueue; //Shared prefetch queue
extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids
objstr_t *prefetchcache; //Global Prefetch cache
pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
+pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store
extern prehashtable_t pflookup; //Global Prefetch cache's lookup table
pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue
int t, rc;
//Create and initialize prefetch cache structure
prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE);
- pthread_mutex_init(&prefetchcache_mutex, NULL);
+
+ /* Initialize attributes for mutex */
+ pthread_mutexattr_init(&prefetchcache_mutex_attr);
+ pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+
+ pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
+
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR))
return; //Failure
#endif
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
/* Look up in machine lookup table and copy into cache*/
- tmp = mhashSearch(oid);
- GETSIZE(size, tmp);
+ GETSIZE(size, objheader);
size += sizeof(objheader_t);
//TODO:Lock the local trans cache while copying the object here
objcopy = objstrAlloc(record->cache, size);
#endif
} else {
/*If object not found in prefetch cache then block until object appears in the prefetch cache */
- pthread_mutex_lock(&prefetchcache_mutex);
+ pthread_mutex_lock(&pflookup.lock);
while(!found) {
rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
if(rc == ETIMEDOUT) {
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
chashInsert(record->lookupTable, OID(tmp), objcopy);
- pthread_mutex_unlock(&prefetchcache_mutex);
+ pthread_mutex_unlock(&pflookup.lock);
#ifdef COMPILER
return &objcopy[1];
#else
return objcopy;
#endif
} else {
- pthread_mutex_unlock(&prefetchcache_mutex);
+ pthread_mutex_unlock(&pflookup.lock);
break;
}
}
/* Retry trans commit procedure if not sucessful in the first try */
} while (treplyretry == 1);
+ /* Free Resources */
+ objstrDelete(record->cache);
+ chashDelete(record->lookupTable);
free(record);
return 0;
}
}
}
- /* Send Abort */
if(transdisagree > 0) {
+ /* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
- /* Free resources */
- objstrDelete(tdata->rec->cache);
- chashDelete(tdata->rec->lookupTable);
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
- /* Free resources */
- objstrDelete(tdata->rec->cache);
- chashDelete(tdata->rec->lookupTable);
- } else { /* (transsoftabort > 0 && transdisagree == 0) */
+ } else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 1;
header->version += 1;
}
+ /*If object is in prefetch cache then update it in prefetch cache */
+
+
+ /* If object is newly created inside transaction then commit it */
for (i = 0; i < numcreated; i++)
{
int tmpsize;
mhashInsert(oidcreated[i], (((char *)modptr) + offset));
GETSIZE(tmpsize, header);
offset += sizeof(objheader_t) + tmpsize;
-
lhashInsert(oidcreated[i], myIpAddr);
}