/* Read oid requested and search if available */
recv_data((int)acceptfd, &oid, sizeof(unsigned int));
if((srcObj = mhashSearch(oid)) == NULL) {
- printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
- break;
+ printf("Error: Object 0x%x is not found in Main Object Store %s, %d\n", oid, __FILE__, __LINE__);
+ break;
}
h = (objheader_t *) srcObj;
GETSIZE(size, h);
size += sizeof(objheader_t);
sockid = (int) acceptfd;
-
if (h == NULL) {
- ctrl = OBJECT_NOT_FOUND;
- send_data(sockid, &ctrl, sizeof(char));
+ ctrl = OBJECT_NOT_FOUND;
+ send_data(sockid, &ctrl, sizeof(char));
} else {
- /* Type */
- char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
- *((int *)&msg[1])=size;
- send_data(sockid, &msg, sizeof(msg));
- send_data(sockid, h, size);
+ // Type
+ char msg[]={OBJECT_FOUND, 0, 0, 0, 0};
+ *((int *)&msg[1])=size;
+ send_data(sockid, &msg, sizeof(msg));
+ send_data(sockid, h, size);
}
break;
-
+
case READ_MULT_REQUEST:
break;
-
+
case MOVE_REQUEST:
break;
-
+
case MOVE_MULT_REQUEST:
break;
-
+
case TRANS_REQUEST:
/* Read transaction request */
transinfo.objlocked = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
- printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
- pthread_exit(NULL);
+ printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
}
break;
case TRANS_PREFETCH:
if((val = prefetchReq((int)acceptfd)) != 0) {
- printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
- break;
+ printf("Error: In prefetchReq() %s, %d\n", __FILE__, __LINE__);
+ break;
}
break;
case TRANS_PREFETCH_RESPONSE:
if((val = getPrefetchResponse((int) acceptfd)) != 0) {
- printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
- break;
+ printf("Error: In getPrefetchResponse() %s, %d\n", __FILE__, __LINE__);
+ break;
}
break;
case START_REMOTE_THREAD:
objType = getObjType(oid);
startDSMthread(oid, objType);
break;
-
+
case THREAD_NOTIFY_REQUEST:
recv_data((int)acceptfd, &numoid, sizeof(unsigned int));
size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
-
+
recv_data((int)acceptfd, buffer, size);
-
+
oidarry = calloc(numoid, sizeof(unsigned int));
memcpy(oidarry, buffer, sizeof(unsigned int) * numoid);
size = sizeof(unsigned int) * numoid;
threadid = *((unsigned int *)(buffer+size));
processReqNotify(numoid, oidarry, versionarry, mid, threadid);
free(buffer);
-
+
break;
case THREAD_NOTIFY_RESPONSE:
size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
if((buffer = calloc(1,size)) == NULL) {
- printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
- pthread_exit(NULL);
+ printf("%s() Calloc error at %s, %d\n", __func__, __FILE__, __LINE__);
+ pthread_exit(NULL);
}
-
+
recv_data((int)acceptfd, buffer, size);
-
+
oid = *((unsigned int *)buffer);
size = sizeof(unsigned int);
version = *((unsigned short *)(buffer+size));
}
}
- closeconnection:
+closeconnection:
/* Close connection */
if (close((int)acceptfd) == -1)
perror("close");
pthread_exit(NULL);
}
-
+
/* This function reads the information available in a transaction request
* and makes a function call to process the request */
int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
- char *ptr;
- void *modptr;
- unsigned int *oidmod, oid;
- fixed_data_t fixed;
- objheader_t *headaddr;
- int sum, i, size, n, val;
-
- oidmod = NULL;
-
- /* Read fixed_data_t data structure */
- size = sizeof(fixed) - 1;
- ptr = (char *)&fixed;;
- fixed.control = TRANS_REQUEST;
- recv_data((int)acceptfd, ptr+1, size);
-
- /* Read list of mids */
- int mcount = fixed.mcount;
- size = mcount * sizeof(unsigned int);
- unsigned int listmid[mcount];
- ptr = (char *) listmid;
- recv_data((int)acceptfd, ptr, size);
-
- /* Read oid and version tuples for those objects that are not modified in the transaction */
- int numread = fixed.numread;
- size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
- char objread[size];
- if(numread != 0) { //If pile contains more than one object to be read,
- // keep reading all objects
- recv_data((int)acceptfd, objread, size);
- }
-
- /* Read modified objects */
- if(fixed.nummod != 0) {
- if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
- printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
- return 1;
- }
- size = fixed.sum_bytes;
- recv_data((int)acceptfd, modptr, size);
- }
+ char *ptr;
+ void *modptr;
+ unsigned int *oidmod, oid;
+ fixed_data_t fixed;
+ objheader_t *headaddr;
+ int sum, i, size, n, val;
+
+ oidmod = NULL;
+
+ /* Read fixed_data_t data structure */
+ size = sizeof(fixed) - 1;
+ ptr = (char *)&fixed;;
+ fixed.control = TRANS_REQUEST;
+ recv_data((int)acceptfd, ptr+1, size);
+
+ /* Read list of mids */
+ int mcount = fixed.mcount;
+ size = mcount * sizeof(unsigned int);
+ unsigned int listmid[mcount];
+ ptr = (char *) listmid;
+ recv_data((int)acceptfd, ptr, size);
+
+ /* Read oid and version tuples for those objects that are not modified in the transaction */
+ int numread = fixed.numread;
+ size = numread * (sizeof(unsigned int) + sizeof(unsigned short));
+ char objread[size];
+ if(numread != 0) { //If pile contains more than one object to be read,
+ // keep reading all objects
+ recv_data((int)acceptfd, objread, size);
+ }
+
+ /* Read modified objects */
+ if(fixed.nummod != 0) {
+ if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
+ printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ size = fixed.sum_bytes;
+ recv_data((int)acceptfd, modptr, size);
+ }
/* Create an array of oids for modified objects */
oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
if (transinfo->objnotfound != NULL) {
free(transinfo->objnotfound);
}
-
return 0;
}
unsigned short version;
char control = 0, *ptr;
unsigned int oid;
- unsigned int *oidnotfound, *oidlocked;
+ unsigned int *oidnotfound, *oidlocked, *oidvernotmatch;
void *mobj;
objheader_t *headptr;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0;
+ oidvernotmatch = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
+ int objnotfound = 0, objlocked = 0, objvernotmatch = 0;
int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
+ int numBytes = 0;
/* modptr points to the beginning of the object store
* created at the Pariticipant.
* Object store holds the modified objects involved in the transaction request */
v_matchlock++;
} else {/* If versions don't match ...HARD ABORT */
v_nomatch++;
+ oidvernotmatch[objvernotmatch] = oid;
+ objvernotmatch++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ numBytes += size;
/* Send TRANS_DISAGREE to Coordinator */
control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- UnLock(STATUSPTR(headptr));
- }
- free(oidlocked);
- }
- send_data(acceptfd, &control, sizeof(char));
- return control;
}
} else {/* If Obj is not locked then lock object */
/* Save all object oids that are locked on this machine during this transaction request call */
v_matchnolock++;
} else { /* If versions don't match ...HARD ABORT */
v_nomatch++;
+ oidvernotmatch[objvernotmatch] = oid;
+ objvernotmatch++;
+ int size;
+ GETSIZE(size, mobj);
+ size += sizeof(objheader_t);
+ numBytes += size;
control = TRANS_DISAGREE;
- if (objlocked > 0) {
- for(j = 0; j < objlocked; j++) {
- if((headptr = mhashSearch(oidlocked[j])) == NULL) {
- printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
- return 0;
- }
- UnLock(STATUSPTR(headptr));
- }
- free(oidlocked);
- }
-
- /* Send TRANS_DISAGREE to Coordinator */
- send_data(acceptfd, &control, sizeof(char));
- return control;
}
}
}
}
+ /* send TRANS_DISAGREE and objs*/
+ if(v_nomatch > 0) {
+ char *objs = calloc(1, numBytes);
+ int j, offset = 0;
+ for(j = 0; j<objvernotmatch; j++) {
+ objheader_t *header = mhashSearch(oidvernotmatch[j]);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ memcpy(objs+offset, header, size);
+ offset += size;
+ }
+ if (objlocked > 0) {
+ for(j = 0; j < objlocked; j++) {
+ if((headptr = mhashSearch(oidlocked[j])) == NULL) {
+ printf("mhashSearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 0;
+ }
+ UnLock(STATUSPTR(headptr));
+ }
+ free(oidlocked);
+ }
+ send_data(acceptfd, &control, sizeof(char));
+ send_data(acceptfd, &numBytes, sizeof(int));
+ send_data(acceptfd, objs, numBytes);
+ transinfo->objvernotmatch = oidvernotmatch;
+ transinfo->numvernotmatch = objvernotmatch;
+ free(objs);
+ free(transinfo->objvernotmatch);
+ return control;
+ }
+
/* Decide what control message to send to Coordinator */
if ((control = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
}
return control;
-
}
/* This function decides what control message such as TRANS_AGREE, TRANS_DISAGREE or TRANS_SOFT_ABORT
* to send to Coordinator based on the votes of oids involved in the transaction */
/* Send control message */
send_data(acceptfd, &control, sizeof(char));
- /* Send number of oids not found and the missing oids if objects are missing in the machine */
+ /* FIXME how to send objs Send number of oids not found and the missing oids if objects are missing in the machine */
if(*(objnotfound) != 0) {
int msg[1];
msg[0] = *(objnotfound);
/* Read control message from Participant */
recv_data(sd, &control, sizeof(char));
+ /* Recv Objects if participant sends TRANS_DISAGREE */
+ if(control == TRANS_DISAGREE) {
+ int length;
+ recv_data(sd, &length, sizeof(int));
+ void *newAddr;
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = objstrAlloc(prefetchcache, length)) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ close(sd);
+ pthread_exit(NULL);
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ recv_data(sd, newAddr, length);
+ int offset = 0;
+ while(length != 0) {
+ unsigned int oidToPrefetch;
+ objheader_t * header;
+ header = (objheader_t *) (((char *)newAddr) + offset);
+ oidToPrefetch = OID(header);
+ int size = 0;
+ GETSIZE(size, header);
+ size += sizeof(objheader_t);
+ //make an entry in prefetch hash table
+ prehashInsert(oidToPrefetch, header);
+ length = length - size;
+ offset += size;
+ }
+ }
recvcontrol = control;
/* Update common data structure and increment count */
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
}
pthread_mutex_unlock(tdata->lock);
- /* Send the final response such as TRANS_COMMIT or TRANS_ABORT t
+ /* Send the final response such as TRANS_COMMIT or TRANS_ABORT
* to all participants in their respective socket */
if (sendResponse(tdata, sd) == 0) {
printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
/* clear objects from prefetch cache */
- for (i = 0; i < tdata->buffer->f.numread; i++)
+ for (i = 0; i < tdata->buffer->f.numread; i++) {
prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i)));
- for (i = 0; i < tdata->buffer->f.nummod; i++)
+ }
+ for (i = 0; i < tdata->buffer->f.nummod; i++) {
prehashRemove(tdata->buffer->oidmod[i]);
+ }
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
*(tdata->replyretry) = 0;
+ /* update prefetch cache */
+ /* For objects read */
+ char oidType;
+ int retval;
+ oidType = 'R';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.numread, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
+ oidType = 'M';
+ if((retval = updatePrefetchCache(tdata, tdata->buffer->f.nummod, oidType)) != 0) {
+ printf("Error: %s() in updating prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return;
+ }
} else {
/* Send Abort in soft abort case followed by retry commiting transaction again*/
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 1;
}
-
return;
}
+/* This function updates the prefetch cache when commiting objects
+ * based on the type of oid i.e. if oid is read or oid is modified
+ * Return -1 on error else returns 0
+ */
+int updatePrefetchCache(thread_data_array_t* tdata, int numoid, char oidType) {
+ int i;
+ for (i = 0; i < numoid; i++) {
+ //find address object
+ objheader_t *header, *newAddr;
+ int size;
+ unsigned int oid;
+ if(oidType == 'R') {
+ oid = *((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ } else {
+ oid = tdata->buffer->oidmod[i];
+ }
+ header = (objheader_t *) chashSearch(tdata->rec->lookupTable, oid);
+ //copy object into prefetch cache
+ GETSIZE(size, header);
+ pthread_mutex_lock(&prefetchcache_mutex);
+ if ((newAddr = objstrAlloc(prefetchcache, (size + sizeof(objheader_t)))) == NULL) {
+ printf("Error: %s() objstrAlloc error for copying into prefetch cache %s, %d\n", __func__, __FILE__, __LINE__);
+ return -1;
+ }
+ pthread_mutex_unlock(&prefetchcache_mutex);
+ memcpy(newAddr, header, (size + sizeof(objheader_t)));
+ //make an entry in prefetch hash table
+ prehashInsert(oid, newAddr);
+ }
+ return 0;
+}
+
/* This function sends the final response to remote machines per
* thread in their respective socket id It returns a char that is only
* needed to check the correctness of execution of this function
recv_data(sd, &size, sizeof(int));
objcopy = objstrAlloc(record->cache, size);
recv_data(sd, objcopy, size);
-
/* Insert into cache's lookup table */
chashInsert(record->lookupTable, oid, objcopy);
}
void *handleLocalReq(void *threadarg) {
unsigned int *oidnotfound = NULL, *oidlocked = NULL;
local_thread_data_array_t *localtdata;
- int objnotfound = 0, objlocked = 0;
+ int numoidnotfound = 0, numoidlocked = 0;
int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
int numread, i;
unsigned int oid;
/* Save the oids not found and number of oids not found for later use */
if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found and number of oids not found for later use */
- oidnotfound[objnotfound] = oid;
- objnotfound++;
+ oidnotfound[numoidnotfound] = oid;
+ numoidnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
if (test_and_set(STATUSPTR(mobj))) {
} else {
//we're locked
/* Save all object oids that are locked on this machine during this transaction request call */
- oidlocked[objlocked] = OID(((objheader_t *)mobj));
- objlocked++;
+ oidlocked[numoidlocked] = OID(((objheader_t *)mobj));
+ numoidlocked++;
if (version == ((objheader_t *)mobj)->version) { /* Check if versions match */
v_matchnolock++;
} else { /* If versions don't match ...HARD ABORT */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
}
/* Condition to send TRANS_SOFT_ABORT */
- if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
+ if((v_matchlock > 0 && v_nomatch == 0) || (numoidnotfound > 0 && v_nomatch == 0)) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
}
localtdata->transinfo->objlocked = oidlocked;
localtdata->transinfo->objnotfound = oidnotfound;
localtdata->transinfo->modptr = NULL;
- localtdata->transinfo->numlocked = objlocked;
- localtdata->transinfo->numnotfound = objnotfound;
+ localtdata->transinfo->numlocked = numoidlocked;
+ localtdata->transinfo->numnotfound = numoidnotfound;
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
}
UnLock(STATUSPTR(header));
}
-
+
return 0;
}