chashInsert(record->lookupTable, OID(tmp), objcopy);
return(objcopy);
} else { /* If not found anywhere, then block until object appears in prefetch cache */
+#if 0
printf("Inside remote machine\n");
pthread_mutex_lock(&pflookup.lock);
while(!found) {
pthread_mutex_unlock(&pflookup.lock);
}
}
+#endif
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
return NULL;
}
else {
- //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
return(objcopy);
}
}
}
next = curr->next;
//Get machine location for object id
+ //TODO Check is the object is newly created ...if not then lookup the location table
if ((machinenum = lhashSearch(curr->key)) == 0) {
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
/* This function initiates the transaction commit process
* Spawns threads for each of the new connections with Participants
- * and creates new piles by calling the createPiles(),
- * Fills the piles with necesaary information and
- * Sends a transrequest() to each pile*/
+ * and creates new piles by calling the createPiles(),
+ * Sends a transrequest() to each remote machines for objects found remotely
+ * and calls handleLocalReq() to process objects found locally */
int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
return 0;
}
-/* This function sends information involved in the transaction request and
- * accepts a response from particpants.
+/* This function sends information involved in the transaction request
+ * to participants and accepts a response from particpants.
* It calls decideresponse() to decide on what control message
- * to send next and sends the message using sendResponse()*/
+ * to send next to participants and sends the message using sendResponse()*/
void *transRequest(void *threadarg) {
int sd, i, n;
struct sockaddr_in serv_addr;
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
/* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
+ /* Thread sleeps until all messages from pariticipants are received by coordinator */
pthread_mutex_lock(tdata->lock);
(*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
}
/* This function decides the reponse that needs to be sent to
- * all Participant machines involved in the transaction commit */
+ * all Participant machines after the TRANS_REQUEST protocol */
int decideResponse(thread_data_array_t *tdata) {
char control;
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
- //Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
- /*Switch on response from Participant */
control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
written onto the shared array */
switch(control) {
}
}
- /* Decide what control message to send to Participant */
+ /* Send Abort */
if(transdisagree > 0) {
- /* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
/* Free resources */
N = oidcount * sizeof(unsigned int);
if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 0;
}
ptr = (char *) oidnotfound;
do {
} else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
retval = TRANS_COMMIT;
}
- /* Send response to the Participant */
+
if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
}
return objcopy;
}
-/*This function handles the local trans requests involved in a transaction commiting process
- * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
- * Activates the other nonlocal threads that are waiting for the decision and the
- * based on common decision by all groups involved in the transaction it
- * either commits or aborts the transaction.
- * It also frees the calloced memory resources
- */
-
+/* This function handles the local objects involved in a transaction commiting process.
+ * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
+ * Note Coordinator = local machine
+ * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
+ * based on common agreement it either commits or aborts the transaction.
+ * It also frees the memory resources */
void *handleLocalReq(void *threadarg) {
- int val, i = 0;
+ int val, i = 0, size, offset = 0;
short version;
char control = 0, *ptr;
unsigned int oid;
unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
void *mobj, *modptr;
- objheader_t *headptr;
+ objheader_t *headptr, *headeraddr;
local_thread_data_array_t *localtdata;
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int objmodnotfound = 0, nummodfound = 0;
+ int objnotfound = 0, objlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
/* modptr points to the beginning of the object store
* created at the Pariticipant */
printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return NULL;
}
+ /* Write modified objects into the mainobject store */
+ for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
+ headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
+ size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+ memcpy(modptr+offset, headeraddr, size);
+ offset += size;
+ }
ptr = modptr;
+ offset = 0; //Reset
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
incr += sizeof(unsigned int);
version = *((short *)(localtdata->tdata->buffer->objread + incr));
} else {//Objs modified
- headptr = (objheader_t *) ptr;
+ headptr = (objheader_t *)ptr;
oid = OID(headptr);
- oidmod[objmod] = oid;//Array containing modified oids
- objmod++;
version = headptr->version;
ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
}
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
printf("DEBUG -> Sending TRANS_DISAGREE\n");
- //return tdata->recvmsg[tdata->thread_id].rcv_status;
}
} else {/* If Obj is not locked then lock object */
STATUS(((objheader_t *)mobj)) |= LOCK;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
printf("DEBUG -> Sending TRANS_DISAGREE\n");
- // return tdata->recvmsg[tdata->thread_id].rcv_status;
}
}
}
}
- /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
-
/* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
* if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objmod = oidmod;
localtdata->transinfo->objlocked = oidlocked;
localtdata->transinfo->objnotfound = oidnotfound;
localtdata->transinfo->modptr = modptr;
- localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
localtdata->transinfo->numlocked = objlocked;
localtdata->transinfo->numnotfound = objnotfound;
- /*Set flag to show that common data structure for this individual thread has been written to */
- //*(tdata->localstatus) |= LM_UPDATED;
-
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
/*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
+ if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
}else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
- if(transComProcess(localtdata->transinfo) != 0) {
+ if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
/* Free memory */
printf("DEBUG -> Freeing...\n");
fflush(stdout);
- if (localtdata->transinfo->objmod != NULL) {
- free(localtdata->transinfo->objmod);
- localtdata->transinfo->objmod = NULL;
- }
+
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
localtdata->transinfo->objlocked = NULL;
}
/* This function completes the ABORT process if the transaction is aborting
*/
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
char *ptr;
int i;
objheader_t *tmp_header;
printf("DEBUG -> Recv TRANS_ABORT\n");
/* Set all ref counts as 1 and do garbage collection */
- ptr = modptr;
+ ptr = (char *)modptr;
for(i = 0; i< nummod; i++) {
tmp_header = (objheader_t *)ptr;
tmp_header->rcount = 1;
}
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< numlocked; i++) {
- header = mhashSearch(objlocked[i]);// find the header address
+ if((header = mhashSearch(objlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
/*This function completes the COMMIT process is the transaction is commiting
*/
-int transComProcess(trans_commit_data_t *transinfo) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
objheader_t *header;
int i = 0, offset = 0;
char control;
- printf("DEBUG -> Recv TRANS_COMMIT\n");
/* Process each modified object saved in the mainobject store */
- for(i=0; i<transinfo->nummod; i++) {
- if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
}
/* Change reference count of older address and free space in objstr ?? */
header->rcount = 1; //TODO Not sure what would be the val
/* Change ptr address in mhash table */
- mhashRemove(transinfo->objmod[i]);
- mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ mhashRemove(oidmod[i]);
+ mhashInsert(oidmod[i], (((char *)modptr) + offset));
offset += sizeof(objheader_t) + classsize[TYPE(header)];
/* Update object version number */
- header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header = (objheader_t *) mhashSearch(oidmod[i]);
header->version += 1;
}
/* Unlock locked objects */
- for(i=0; i<transinfo->numlocked; i++) {
- header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
STATUS(header) &= ~(LOCK);
}
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
- //int *offstarray = NULL;
prefetchqelem_t *qnode;
prefetchpile_t *pilehead = NULL;
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
off = offset = 0;
- count++; // Keeps track of the number of oid and offset tuples sent per remote machine
+ count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
char oidnoffset[len];
memcpy(oidnoffset, &len, sizeof(int));