objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
+pthread_mutex_t lockObjHeader;
pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
sockPoolHashTable_t *transPResponseSocketPool;
pthread_mutexattr_init(&mainobjstore_mutex_attr);
pthread_mutexattr_settype(&mainobjstore_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP);
pthread_mutex_init(&mainobjstore_mutex, &mainobjstore_mutex_attr);
+ pthread_mutex_init(&lockObjHeader,NULL);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
int tmpsize;
headptr = (objheader_t *) ptr;
oid = OID(headptr);
- version = headptr->version;
- GETSIZE(tmpsize, headptr);
- ptr += sizeof(objheader_t) + tmpsize;
- }
-
- /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
-
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
- /* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
- } else { /* If Obj found in machine (i.e. has not moved) */
- /* Check if Obj is locked by any previous transaction */
- if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else {/* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- STATUS(headptr) &= ~(LOCK);
- }
- free(oidlocked);
- }
- send_data(acceptfd, &control, sizeof(char));
- return control;
- }
- } else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- STATUS(headptr) &= ~(LOCK);
- }
- free(oidlocked);
- }
-
- /* Send TRANS_DISAGREE to Coordinator */
- send_data(acceptfd, &control, sizeof(char));
- return control;
- }
- }
- }
+ version = headptr->version;
+ GETSIZE(tmpsize, headptr);
+ ptr += sizeof(objheader_t) + tmpsize;
+ }
+
+ /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
+
+ if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ /* Save the oids not found and number of oids not found for later use */
+ oidnotfound[objnotfound] = oid;
+ objnotfound++;
+ } else { /* If Obj found in machine (i.e. has not moved) */
+ /* Check if Obj is locked by any previous transaction */
+ pthread_mutex_lock(&lockObjHeader);
+ if ((STATUS(((objheader_t *)mobj)) & LOCK) == LOCK) {
+ pthread_mutex_unlock(&lockObjHeader);
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ v_matchlock++;
+ } else {/* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
+ send_data(acceptfd, &control, sizeof(char));
+ return control;
+ }
+ } else {/* If Obj is not locked then lock object */
+ STATUS(((objheader_t *)mobj)) |= LOCK;
+ pthread_mutex_unlock(&lockObjHeader);
+ /* Save all object oids that are locked on this machine during this transaction request call */
+ oidlocked[objlocked] = OID(((objheader_t *)mobj));
+ objlocked++;
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ v_matchnolock++;
+ } else { /* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ control = TRANS_DISAGREE;
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ STATUS(headptr) &= ~(LOCK);
+ }
+ free(oidlocked);
+ }
+
+ /* Send TRANS_DISAGREE to Coordinator */
+ send_data(acceptfd, &control, sizeof(char));
+ return control;
+ }
+ }
+ }
}
/* Decide what control message to send to Coordinator */
return 1;
}
GETSIZE(tmpsize,header);
- pthread_mutex_lock(&mainobjstore_mutex);
memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
header->version += 1;
/* If threads are waiting on this object to be updated, notify them */
notifyAll(&header->notifylist, OID(header), header->version);
}
pthread_mutex_unlock(¬ifymutex);
- pthread_mutex_unlock(&mainobjstore_mutex);
offset += sizeof(objheader_t) + tmpsize;
}
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
pthread_mutex_t notifymutex;
+pthread_mutex_t atomicObjLock;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
pthread_mutex_init(¬ifymutex, NULL);
+ pthread_mutex_init(&atomicObjLock, NULL);
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
printf("ERROR\n");
}
free(thread_data_array[i].buffer);
}
-
+
/* Free resources */
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
/* Retry trans commit procedure during soft_abort case */
} while (treplyretry);
-
if(treplyctrl == TRANS_ABORT) {
/* Free Resources */
objstrDelete(record->cache);
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
+ pthread_mutex_lock(&atomicObjLock);
if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) {
- if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
- v_matchlock++;
- } else {/* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
+ pthread_mutex_unlock(&atomicObjLock);
+ if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */
+ v_matchlock++;
+ } else {/* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ }
} else {/* If Obj is not locked then lock object */
- STATUS(((objheader_t *)mobj)) |= LOCK;
- /* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
- if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
- v_matchnolock++;
- } else { /* If versions don't match ...HARD ABORT */
- v_nomatch++;
- /* Send TRANS_DISAGREE to Coordinator */
- localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
- }
+ STATUS(((objheader_t *)mobj)) |= LOCK;
+ pthread_mutex_unlock(&atomicObjLock);
+ /* Save all object oids that are locked on this machine during this transaction request call */
+ oidlocked[objlocked] = OID(((objheader_t *)mobj));
+ objlocked++;
+ if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
+ v_matchnolock++;
+ } else { /* If versions don't match ...HARD ABORT */
+ v_nomatch++;
+ /* Send TRANS_DISAGREE to Coordinator */
+ localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
+ }
}
}
} // End for
*((unsigned int *)(&msg[1] + size)) = myIpAddr;
size += sizeof(unsigned int);
*((unsigned int *)(&msg[1] + size)) = threadid;
-
pthread_mutex_lock(&(ndata->threadnotify));
size = 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int);
send_data(sock, msg, size);