objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
+pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
/* This function initializes the main objects store and creates the
* global machine and location lookup table */
int dstmInit(void)
{
mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
- pthread_mutex_init(&mainobjstore_mutex, NULL);
+ /* Initialize attribute for mutex */
+ pthread_mutexattr_init(&mainobjstore_mutex_attr);
+ pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
+ //pthread_mutex_init(&mainobjstore_mutex, NULL);
+ pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
}
/* Send ack to Coordinator */
- printf("DEBUG -> Recv TRANS_ABORT\n");
sendctrl = TRANS_SUCESSFUL;
if(send((int)acceptfd, &sendctrl, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ACK to coordinator\n");
case TRANS_COMMIT:
/* Invoke the transCommit process() */
- printf("DEBUG -> Recv TRANS_COMMIT \n");
if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
/* Free memory */
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
return control;
}
} else {/* If Obj is not locked then lock object */
perror("Error in sending control to the Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_DISAGREE\n");
+ if (objlocked > 0) {
+ STATUS(((objheader_t *)mobj)) &= ~(LOCK);
+ free(oidlocked);
+ }
return control;
}
}
perror("Error in sending control to Coordinator\n");
return 0;
}
- printf("DEBUG -> Sending TRANS_AGREE\n");
}
/* Condition to send TRANS_SOFT_ABORT */
if((*(v_matchlock) > 0 && *(v_nomatch) == 0) || (*(objnotfound) > 0 && *(v_nomatch) == 0)) {
char msg[]={TRANS_SOFT_ABORT, 0,0,0,0};
*((int*)&msg[1])= *(objnotfound);
- printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
/* Send control message */
if((val = send(acceptfd, &msg, sizeof(msg),MSG_NOSIGNAL)) < sizeof(msg)) {
perror("Error in sending no of objects that are not found\n");
t = time(NULL);
req.tv_sec = 0;
req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
- nanosleep(&req, &rem);
+ //nanosleep(&req, &rem);
+ nanosleep(&req, NULL);
return;
}
rc = gettimeofday(&tp, NULL);
/* Convert from timeval to timespec */
- ts.tv_nsec = tp.tv_usec * 10;
+ ts.tv_nsec = tp.tv_usec * 1000;
/* Search local transaction cache */
if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
free(ltdata);
/* wait a random amount of time */
- if (treplyretry == 1)
+ if (treplyretry == 1)
randomdelay();
/* Retry trans commit procedure if not sucessful in the first try */
pthread_exit(NULL);
}
- printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
/* Send bytes of data with TRANS_REQUEST control message */
if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
perror("Error sending fixed bytes for thread\n");
return;
}
-/* This function sends the final response to remote machines per thread in their respective socket id */
+/* This function sends the final response to remote machines per thread in their respective socket id
+ * It returns a char that is only needed to check the correctness of execution of this function inside
+ * transRequest()*/
char sendResponse(thread_data_array_t *tdata, int sd) {
int n, N, sum, oidcount = 0;
char *ptr, retval = 0;
}
/* Send ack to Coordinator */
- printf("TRANS_SUCCESSFUL\n");
/*Free the pointer */
ptr = NULL;
/* dequeue node to create a machine piles and finally unlock mutex */
if((qnode = pre_dequeue()) == NULL) {
printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&pqueue.qlock);
pthread_exit(NULL);
}
pthread_mutex_unlock(&pqueue.qlock);
/* Dequeue node to send remote machine connections*/
if((mcpilenode = mcpiledequeue()) == NULL) {
printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mcqueue.qlock);
pthread_exit(NULL);
}
/* Unlock mutex */
prehashInsert(oid, modptr);
} else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) {
/* Add the new object ptr to hash table */
+ prehashRemove(oid);
prehashInsert(oid, modptr);
- } else { /* Do nothing */
+ } else { /* Do nothing: TODO modptr should be reference counted */
;
}
} else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/
prehashInsert(oid, modptr);
}
/* Lock the Prefetch Cache look up table*/
- pthread_mutex_lock(&pflookup.lock);
+ //pthread_mutex_lock(&pflookup.lock);
/* Broadcast signal on prefetch cache condition variable */
pthread_cond_broadcast(&pflookup.cond);
/* Unlock the Prefetch Cache look up table*/
- pthread_mutex_unlock(&pflookup.lock);
+ //pthread_mutex_unlock(&pflookup.lock);
} else if(buffer[index] == OBJECT_NOT_FOUND) {
/* Increment it to get the object */
/* TODO: For each object not found query DHT for new location and retrieve the object */