printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
retval);
else
- { //TODO: execute run method on this global thread object
- printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=0x%x\n", oid);
+ {
objType = getObjType(oid);
- printf("dstmAccept(): type of object 0x%x is %d\n", oid, objType);
startDSMthread(oid, objType);
-
-
}
break;
}
/* Read modified objects */
- if(fixed.nummod != 0) { // If pile contains more than one modified object,
- // allocate new object store and recv all modified objects
- // TODO deallocate this space
- pthread_mutex_lock(&mainobjstore_mutex);
- if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
- printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mainobjstore_mutex);
+ if(fixed.nummod != 0) {
+ if ((modptr = calloc(1, fixed.sum_bytes)) == NULL) {
+ printf("calloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return 1;
}
- pthread_mutex_unlock(&mainobjstore_mutex);
sum = 0;
do { // Recv the objs that are modified by the Coordinator
n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
/* Free resources */
if(oidmod != NULL) {
free(oidmod);
- oidmod = NULL;
}
return 1;
}
/* Free resources */
if(oidmod != NULL) {
free(oidmod);
- oidmod = NULL;
}
return 0;
/* Process the new control message */
switch(control) {
case TRANS_ABORT:
- /* Set all ref counts as 1 and do garbage collection */
- ptr = modptr;
- for(i = 0; i< fixed->nummod; i++) {
- int tmpsize;
- tmp_header = (objheader_t *)ptr;
- tmp_header->rcount = 0;
- GETSIZE(tmpsize, tmp_header);
- ptr += sizeof(objheader_t) + tmpsize;
- }
+ if (fixed->nummod > 0)
+ free(modptr);
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< transinfo->numlocked; i++) {
header = mhashSearch(transinfo->objlocked[i]);// find the header address
if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
/* Free memory */
- printf("DEBUG -> Freeing...\n");
- fflush(stdout);
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
}
}
/* Free memory */
- printf("DEBUG -> Freeing...\n");
- fflush(stdout);
-
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
}
* in TRANS_REQUEST and If a TRANS_DISAGREE sends the response immediately back to the coordinator */
char handleTransReq(fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
int val, i = 0;
- short version;
+ unsigned short version;
char control = 0, *ptr;
unsigned int oid;
unsigned int *oidnotfound, *oidlocked;
incr *= i;
oid = *((unsigned int *)(objread + incr));
incr += sizeof(unsigned int);
- version = *((short *)(objread + incr));
+ version = *((unsigned short *)(objread + incr));
} else {//Objs modified
int tmpsize;
headptr = (objheader_t *) ptr;
}
} else {/* If Obj is not locked then lock object */
STATUS(((objheader_t *)mobj)) |= LOCK;
-
- /*TESTING Add random wait to make transactions run for a long time such that
- * we can test for soft abort case */
-
- //randomdelay();
-
/* Save all object oids that are locked on this machine during this transaction request call */
oidlocked[objlocked] = OID(((objheader_t *)mobj));
objlocked++;
* Sends an ACK back to Coordinator */
int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
objheader_t *header;
+ objheader_t *newheader;
int i = 0, offset = 0;
char control;
+ int tmpsize;
/* Process each modified object saved in the mainobject store */
for(i = 0; i < nummod; i++) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- /* Change reference count of older address and free space in objstr ?? */
- header->rcount = 0;
-
- /* Change ptr address in mhash table */
- mhashRemove(oidmod[i]);
- mhashInsert(oidmod[i], (((char *)modptr) + offset));
- {
- int tmpsize;
- GETSIZE(tmpsize,header);
- offset += sizeof(objheader_t) + tmpsize;
- }
- /* Update object version number */
- header = (objheader_t *) mhashSearch(oidmod[i]);
+ GETSIZE(tmpsize,header);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t));
header->version += 1;
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ offset += sizeof(objheader_t) + tmpsize;
}
+ if (nummod > 0)
+ free(modptr);
+
/* Unlock locked objects */
for(i = 0; i < numlocked; i++) {
if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
unsigned int oidMin;
unsigned int oidMax;
+void printhex(unsigned char *, int);
+
+void printhex(unsigned char *ptr, int numBytes)
+{
+ int i;
+ for (i = 0; i < numBytes; i++)
+ {
+ if (ptr[i] < 16)
+ printf("0%x ", ptr[i]);
+ else
+ printf("%x ", ptr[i]);
+ }
+ printf("\n");
+ return;
+}
+
plistnode_t *createPiles(transrecord_t *);
inline int arrayLength(int *array) {
int i;
/* This function initializes things required in the transaction start*/
transrecord_t *transStart()
{
- printf("Starting transaction\n");
transrecord_t *tmp = malloc(sizeof(transrecord_t));
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
rc = gettimeofday(&tp, NULL);
+ /* 1ms delay */
+ tp.tv_usec += 1000;
+ if (tp.tv_usec >= 1000000)
+ {
+ tp.tv_usec -= 1000000;
+ tp.tv_sec += 1;
+ }
/* Convert from timeval to timespec */
+ ts.tv_sec = tp.tv_sec;
ts.tv_nsec = tp.tv_usec * 1000;
/* Search local transaction cache */
/* Look up in machine lookup table and copy into cache*/
GETSIZE(size, objheader);
size += sizeof(objheader_t);
- //TODO:Lock the local trans cache while copying the object here
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)objheader, size);
/* Insert into cache's lookup table */
return objcopy;
#endif
} else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
- found = 1;
GETSIZE(size, tmp);
size+=sizeof(objheader_t);
- //TODO:Lock the local trans cache while copying the object here
objcopy = objstrAlloc(record->cache, size);
memcpy(objcopy, (void *)tmp, size);
/* Insert into cache's lookup table */
pthread_mutex_lock(&pflookup.lock);
while(!found) {
rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
- if(rc == ETIMEDOUT) {
- printf("Wait timed out\n");
- /* Check Prefetch cache again */
- if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
- found = 1;
- GETSIZE(size,tmp);
- size+=sizeof(objheader_t);
- objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)tmp, size);
- chashInsert(record->lookupTable, OID(tmp), objcopy);
- pthread_mutex_unlock(&pflookup.lock);
+ /* Check Prefetch cache again */
+ if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) {
+ found = 1;
+ GETSIZE(size,tmp);
+ size+=sizeof(objheader_t);
+ objcopy = objstrAlloc(record->cache, size);
+ memcpy(objcopy, (void *)tmp, size);
+ chashInsert(record->lookupTable, OID(tmp), objcopy);
+ pthread_mutex_unlock(&pflookup.lock);
#ifdef COMPILER
- return &objcopy[1];
+ return &objcopy[1];
#else
- return objcopy;
+ return objcopy;
#endif
- } else {
+ } else if (rc == ETIMEDOUT) {
+// printf("Wait timed out\n");
pthread_mutex_unlock(&pflookup.lock);
break;
- }
}
}
/* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
*(tdata->replyretry) = 0;
+ /* clear objects from prefetch cache */
+ for (i = 0; i < tdata->buffer->f.numread; i++)
+ prehashRemove(*(unsigned int *)(tdata->buffer->objread + (sizeof(unsigned int) + sizeof(unsigned short))*i));
+ for (i = 0; i < tdata->buffer->f.nummod; i++)
+ prehashRemove(tdata->buffer->oidmod[i]);
} else if(transagree == tdata->buffer->f.mcount){
/* Send Commit */
*(tdata->replyctrl) = TRANS_COMMIT;
return NULL;
}
-#ifdef DEBUG1
- printf("DEBUG -> ready to rcv ...\n");
-#endif
/* Read response from the Participant */
if((val = read(sd, &control, sizeof(char))) <= 0) {
perror("No control response for getRemoteObj sent\n");
* based on common agreement it either commits or aborts the transaction.
* It also frees the memory resources */
void *handleLocalReq(void *threadarg) {
- int val, i = 0, size, offset = 0;
- short version;
- char control = 0, *ptr;
- unsigned int oid;
unsigned int *oidnotfound = NULL, *oidlocked = NULL;
- void *mobj, *modptr;
- objheader_t *headptr, *headeraddr;
local_thread_data_array_t *localtdata;
+ int objnotfound = 0, objlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
+ int numread, i;
+ unsigned int oid;
+ unsigned short version;
+ void *mobj;
+ objheader_t *headptr;
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0;
- int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
-
- /* modptr points to the beginning of the object store
- * created at the Pariticipant */
- pthread_mutex_lock(&mainobjstore_mutex);
- if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) {
- printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(&mainobjstore_mutex);
- pthread_exit(NULL);
- }
- pthread_mutex_unlock(&mainobjstore_mutex);
- /* Write modified objects into the mainobject store */
- for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
- headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
- GETSIZE(size,headeraddr);
- size+=sizeof(objheader_t);
- memcpy((char *)modptr+offset, headeraddr, size);
- offset += size;
- }
- /* Write new objects into the mainobject store */
- for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) {
- headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]);
- GETSIZE(size, headeraddr);
- size+=sizeof(objheader_t);
- memcpy((char *)modptr+offset, headeraddr, size);
- offset += size;
- }
-
- ptr = modptr;
- offset = 0; //Reset
+ numread = localtdata->tdata->buffer->f.numread;
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
- if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified
+ if (i < localtdata->tdata->buffer->f.numread) {
int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
incr *= i;
oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr));
- incr += sizeof(unsigned int);
- version = *((short *)(localtdata->tdata->buffer->objread + incr));
- } else {//Objs modified
+ version = *((short *)(localtdata->tdata->buffer->objread + incr + sizeof(unsigned int)));
+ } else { // Objects Modified
int tmpsize;
- headptr = (objheader_t *)ptr;
+ headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]);
+ if (headptr == NULL) {
+ printf("Error: handleLocalReq() returning NULL\n");
+ return NULL;
+ }
oid = OID(headptr);
version = headptr->version;
- GETSIZE(tmpsize, headptr);
- ptr += sizeof(objheader_t) + tmpsize;
}
-
/* Check if object is still present in the machine since the beginning of TRANS_REQUEST */
/* Save the oids not found and number of oids not found for later use */
- if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
+ if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */
/* Save the oids not found and number of oids not found for later use */
oidnotfound[objnotfound] = oid;
objnotfound++;
}
} else {/* If Obj is not locked then lock object */
STATUS(((objheader_t *)mobj)) |= LOCK;
- //TODO Remove this for Testing
- //randomdelay(); -- Why is this here. BCD
-
/* Save all object oids that are locked on this machine during this transaction request call */
oidlocked[objlocked] = OID(((objheader_t *)mobj));
objlocked++;
}
}
}
- }
-
+ } // End for
/* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
/* Condition to send TRANS_SOFT_ABORT */
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
- //TODO currently the only soft abort case that is supported is when object locked by previous
- //transaction => v_matchlock > 0
- //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported
- /* Send number of oids not found and the missing oids if objects are missing in the machine */
- /* TODO Remember to store the oidnotfound for later use
- if(objnotfound != 0) {
- int size = sizeof(unsigned int)* objnotfound;
- }
- */
}
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
* if Participant receives a TRANS_COMMIT */
localtdata->transinfo->objlocked = oidlocked;
localtdata->transinfo->objnotfound = oidnotfound;
- localtdata->transinfo->modptr = modptr;
+ localtdata->transinfo->modptr = NULL;
localtdata->transinfo->numlocked = objlocked;
localtdata->transinfo->numnotfound = objnotfound;
-
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock);
}
pthread_mutex_unlock(localtdata->tdata->lock);
-
- /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
+ if(transAbortProcess(localtdata) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
- }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
- if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) {
+ } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) {
+ if(transComProcess(localtdata) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
pthread_exit(NULL);
}
}
-
/* Free memory */
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
- localtdata->transinfo->objlocked = NULL;
}
if (localtdata->transinfo->objnotfound != NULL) {
free(localtdata->transinfo->objnotfound);
- localtdata->transinfo->objnotfound = NULL;
}
pthread_exit(NULL);
}
-/* This function completes the ABORT process if the transaction is aborting
-*/
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
- char *ptr;
- int i;
- objheader_t *tmp_header;
+
+/* This function completes the ABORT process if the transaction is aborting */
+int transAbortProcess(local_thread_data_array_t *localtdata) {
+ int i, numlocked;
+ unsigned int *objlocked;
void *header;
- /* Set all ref counts as 1 and do garbage collection */
- ptr = modptr;
- for(i = 0; i< nummod; i++) {
- int tmpsize;
- tmp_header = (objheader_t *)ptr;
- tmp_header->rcount = 0;
- GETSIZE(tmpsize, tmp_header);
- ptr += sizeof(objheader_t) + tmpsize;
- }
- /* Unlock objects that was locked due to this transaction */
- for(i = 0; i< numlocked; i++) {
+ numlocked = localtdata->transinfo->numlocked;
+ objlocked = localtdata->transinfo->objlocked;
+
+ for (i = 0; i < numlocked; i++) {
if((header = mhashSearch(objlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
- /* Send ack to Coordinator */
+ printf("TRANS_ABORTED at Coordinator end\n");
- /*Free the pointer */
- ptr = NULL;
return 0;
}
-/*This function completes the COMMIT process is the transaction is commiting
-*/
-int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) {
- objheader_t *header;
- int i = 0, offset = 0;
- char control;
-
- /* Process each modified object saved in the mainobject store */
- for(i = 0; i < nummod; i++) {
- int tmpsize;
+/*This function completes the COMMIT process is the transaction is commiting*/
+int transComProcess(local_thread_data_array_t *localtdata) {
+ objheader_t *header, *tcptr;
+ int i, nummod, tmpsize, numcreated, numlocked;
+ unsigned int *oidmod, *oidcreated, *oidlocked;
+ void *ptrcreate;
+
+ nummod = localtdata->tdata->buffer->f.nummod;
+ oidmod = localtdata->tdata->buffer->oidmod;
+ numcreated = localtdata->tdata->buffer->f.numcreated;
+ oidcreated = localtdata->tdata->buffer->oidcreated;
+ numlocked = localtdata->transinfo->numlocked;
+ oidlocked = localtdata->transinfo->objlocked;
+ for (i = 0; i < nummod; i++) {
if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
- printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ /* Copy from transaction cache -> main object store */
+ if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
- /* Change reference count of older address and free space in objstr ?? */
- header->rcount = 0;
-
- /* Change ptr address in mhash table */
- mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary
- mhashInsert(oidmod[i], (((char *)modptr) + offset));
GETSIZE(tmpsize, header);
- offset += sizeof(objheader_t) + tmpsize;
- /* Update object version number */
- header = (objheader_t *) mhashSearch(oidmod[i]);
+ pthread_mutex_lock(&mainobjstore_mutex);
+ memcpy(header, tcptr, tmpsize + sizeof(objheader_t));
header->version += 1;
+ pthread_mutex_unlock(&mainobjstore_mutex);
}
-
- /*If object is in prefetch cache then update it in prefetch cache */
-
-
/* If object is newly created inside transaction then commit it */
- for (i = 0; i < numcreated; i++)
- {
- int tmpsize;
- header = (objheader_t *)(((char *)modptr) + offset);
- mhashInsert(oidcreated[i], (((char *)modptr) + offset));
+ for (i = 0; i < numcreated; i++) {
+ if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
+ printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
GETSIZE(tmpsize, header);
- offset += sizeof(objheader_t) + tmpsize;
+ pthread_mutex_lock(&mainobjstore_mutex);
+ if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
+ printf("Error: transComProcess() failed objstrAlloc\n");
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ return 1;
+ }
+ pthread_mutex_unlock(&mainobjstore_mutex);
+ memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t));
+
+ mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
-
/* Unlock locked objects */
for(i = 0; i < numlocked; i++) {
- if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
return 1;
}
STATUS(header) &= ~(LOCK);
}
-
- //TODO Update location lookup table
-
- /* Send ack to Coordinator */
- printf("TRANS_SUCCESSFUL\n");
return 0;
}
} else {
k = endoffsets[i-1];
index = endoffsets[j-1];
- printf("Value of slength = %d\n", slength);
for(count = 0; count < slength; count++) {
if(arryfields[k] != arryfields[index]) {
break;
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
midtoIP(mcpilenode->mid ,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);