struct ___Object___ * revertlist;
#endif
} transrecord_t;
-// Structure that keeps track of responses from the participants
+// Structure is a shared structure that keeps track of responses from the participants
typedef struct thread_response {
char rcv_status;
} thread_response_t;
-// Structure that holds fixed data sizes to be sent along with TRANS_REQUEST
+// Structure that holds fixed data to be sent along with TRANS_REQUEST
typedef struct fixed_data {
- char control;
- char trans_id[TID_LEN];
- int mcount; // Machine count
- short numread; // Number of objects read
- short nummod; // Number of objects modified
- int sum_bytes; // Total bytes modified
+ char control; /* control message */
+ char trans_id[TID_LEN]; /* transaction id */
+ int mcount; /* participant count */
+ short numread; /* no of objects read */
+ short nummod; /* no of objects modified */
+ int sum_bytes; /* total bytes of modified objects in a transaction */
} fixed_data_t;
-// Structure that holds variable data sizes per machine participant
+/* Structure that holds trans request information for each participant */
typedef struct trans_req_data {
- fixed_data_t f;
- unsigned int *listmid;
- char *objread;
- unsigned int *oidmod;
-} trans_req_data_t;
-
-// Structure passed to dstmAcceptinfo() on server side to complete TRANS_COMMIT process
-
+ fixed_data_t f; /* Holds first few fixed bytes of data sent during TRANS_REQUEST protcol*/
+ unsigned int *listmid; /* Pointer to array holding list of participants */
+ char *objread; /* Pointer to array holding oid and version number of objects that are only read */
+ unsigned int *oidmod; /* Pointer to array holding oids of objects that are modified */
+} trans_req_data_t;
+
+/* Structure that holds information of objects that are not found in the participant
+ * and objs locked within a transaction during commit process */
typedef struct trans_commit_data{
- unsigned int *objmod;
- unsigned int *objlocked;
- unsigned int *objnotfound;
- void *modptr;
- int nummod;
- int numlocked;
- int numnotfound;
+ unsigned int *objlocked; /* Pointer to array holding oids of objects locked inside a transaction */
+ unsigned int *objnotfound; /* Pointer to array holding oids of objects not found on the participant machine */
+ void *modptr; /* Pointer to the address in the mainobject store of the participant that holds all modified objects */
+ int numlocked; /* no of objects locked */
+ int numnotfound; /* no of objects not found */
} trans_commit_data_t;
#define PRINT_TID(PTR) printf("DEBUG -> %x %d\n", PTR->mid, PTR->thread_id);
-//structure for passing multiple arguments to thread
+/* Structure for passing multiple arguments to a thread
+ * spawned to process each transaction on a machine */
typedef struct thread_data_array {
- int thread_id;
+ int thread_id;
int mid;
- int pilecount;
- trans_req_data_t *buffer;
- thread_response_t *recvmsg;//shared datastructure to keep track of the control message receiv
- pthread_cond_t *threshold; //threshhold for waking up a thread
- pthread_mutex_t *lock; //lock the count variable
- int *count; //variable to count responses of TRANS_REQUEST protocol from all participants
- char *replyctrl; //shared ctrl message that stores the reply to be sent, filled by decideResp
- char *replyretry; //shared variable to find out if we need retry (TRANS_COMMIT case)
- transrecord_t *rec; // To send modified objects
+ int pilecount; /* No of remote machines involved */
+ trans_req_data_t *buffer; /* Holds trans request information sent to participants */
+ thread_response_t *recvmsg; /* Shared datastructure to keep track of the participants response to a trans request */
+ pthread_cond_t *threshold; /* Condition var to waking up a thread */
+ pthread_mutex_t *lock; /* Lock for counting participants response */
+ int *count; /* Variable to count responses from all participants to the TRANS_REQUEST protocol */
+ char *replyctrl; /* Shared ctrl message that stores the reply to be sent to participants, filled by decideResponse() */
+ char *replyretry; /* Shared variable that keep track if coordinator needs retry */
+ transrecord_t *rec; /* To send modified objects */
} thread_data_array_t;
//Structure for passing arguments to the local m/c thread
typedef struct local_thread_data_array {
- thread_data_array_t *tdata;
- trans_commit_data_t *transinfo; //Required for trans commit process
+ thread_data_array_t *tdata; /* Holds all the arguments send to a thread that is spawned when transaction commits */
+ trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */
} local_thread_data_array_t;
-// Structure to save information about an oid necesaary for the decideControl()
-typedef struct objinfo {
- unsigned int oid;
- int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc)
-}objinfo_t;
-
//Structure for members within prefetch tuples
typedef struct member {
- short offset;
- short index;
- struct member *next;
+ short offset; /* Holds offset of the ptr field */
+ short index; /* Holds the array index value */
+ struct member *next;
}trans_member_t;
-/*
-//Structure that holds the compiler generated prefetch data
-typedef struct compprefetchdata {
-transrecord_t *record;
-} compprefetchdata_t;
-*/
-
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
void *dstmListen();
void *dstmAccept(void *);
int readClientReq(trans_commit_data_t *, int);
-int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, int);
+int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
-int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, unsigned int *, int);
-int transCommitProcess(trans_commit_data_t *, int);
+int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
+//int transCommitProcess(trans_commit_data_t *, int);
+int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
/* end server portion */
/* Prototypes for transactions */
int decideResponse(thread_data_array_t *);// Coordinator decides what response to send to the participant
char sendResponse(thread_data_array_t *, int); //Sends control message back to Participants
void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
-int transAbortProcess(void *, unsigned int *, int, int, int);
-int transComProcess(trans_commit_data_t *);
+int transAbortProcess(void *, unsigned int *, int, int);
+//int transComProcess(trans_commit_data_t *);
+int transComProcess(void*, unsigned int *, unsigned int *, int, int);
void prefetch(int, unsigned int *, short *, short*);
void *transPrefetch(void *);
void *mcqProcess(void *);
objstr_t *mainobjstore;
+/* This function initializes the main objects store and creates the
+ * global machine and location lookup table */
+
int dstmInit(void)
{
- /* Initialize main object store */
mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
- /* Create machine lookup table and location lookup table */
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
return 0;
}
+/* This function starts the thread to listen on a socket
+ * for tranaction calls */
void *dstmListen()
{
int listenfd, acceptfd;
int readClientReq(trans_commit_data_t *transinfo, int acceptfd) {
char *ptr;
void *modptr;
+ unsigned int *oidmod, oid;
fixed_data_t fixed;
+ objheader_t *headaddr;
int sum = 0, i, N, n, val;
/* Read fixed_data_t data structure */
}
sum = 0;
do { // Recv the objs that are modified by the Coordinator
- n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+ n = recv((int)acceptfd, (char *) modptr+sum, fixed.sum_bytes-sum, 0);
sum += n;
} while (sum < fixed.sum_bytes && n != 0);
}
+ /* Create an array of oids for modified objects */
+ oidmod = (unsigned int *) calloc(fixed.nummod, sizeof(unsigned int));
+ ptr = (char *) modptr;
+ for(i = 0 ; i < fixed.nummod; i++) {
+ headaddr = (objheader_t *) ptr;
+ oid = OID(headaddr);
+ oidmod[i] = oid;
+ ptr += sizeof(objheader_t) + classsize[TYPE(headaddr)];
+ }
+
/*Process the information read */
- if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, acceptfd)) != 0) {
+ if((val = processClientReq(&fixed, transinfo, listmid, objread, modptr, oidmod, acceptfd)) != 0) {
printf("Error in processClientReq %s, %d\n", __FILE__, __LINE__);
return 1;
}
+
+ /* Free resources */
+ free(oidmod);
+
return 0;
}
* function and sends a reply to the co-ordinator.
* Following this it also receives a new control message from the co-ordinator and processes this message*/
int processClientReq(fixed_data_t *fixed, trans_commit_data_t *transinfo,
- unsigned int *listmid, char *objread, void *modptr, int acceptfd) {
+ unsigned int *listmid, char *objread, void *modptr, unsigned int *oidmod, int acceptfd) {
char *ptr, control, sendctrl;
objheader_t *tmp_header;
void *header;
case TRANS_COMMIT:
/* Invoke the transCommit process() */
printf("DEBUG -> Recv TRANS_COMMIT \n");
- if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
+ if((val = transCommitProcess(modptr, oidmod, transinfo->objlocked, fixed->nummod, transinfo->numlocked, (int)acceptfd)) != 0) {
printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
return 1;
}
/* Free memory */
printf("DEBUG -> Freeing...\n");
fflush(stdout);
- if (transinfo->objmod != NULL) {
- free(transinfo->objmod);
- transinfo->objmod = NULL;
- }
+
if (transinfo->objlocked != NULL) {
free(transinfo->objlocked);
transinfo->objlocked = NULL;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
- oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int objmodnotfound = 0, nummodfound = 0;
+ int objnotfound = 0, objlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
/* modptr points to the beginning of the object store
* created at the Pariticipant.
* Object store holds the modified objects involved in the transaction request */
- ptr = modptr;
+ ptr = (char *) modptr;
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
} else {//Objs modified
headptr = (objheader_t *) ptr;
oid = OID(headptr);
- oidmod[objmod] = oid;//Array containing modified oids
- objmod++;
version = headptr->version;
ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
}
/* Decide what control message to send to Coordinator */
if ((val = decideCtrlMessage(fixed, transinfo, &v_matchnolock, &v_matchlock, &v_nomatch, &objnotfound, &objlocked,
- modptr, oidnotfound, oidlocked, oidmod, acceptfd)) == 0) {
+ modptr, oidnotfound, oidlocked, acceptfd)) == 0) {
printf("Error in decideCtrlMessage %s, %d\n", __FILE__, __LINE__);
return 0;
}
* to send to Coordinator based on the votes of oids involved in the transaction */
int decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int *v_matchnolock, int *v_matchlock,
int *v_nomatch, int *objnotfound, int *objlocked, void *modptr,
- unsigned int *oidnotfound, unsigned int *oidlocked, unsigned int *oidmod,
- int acceptfd) {
+ unsigned int *oidnotfound, unsigned int *oidlocked, int acceptfd) {
int val;
char control = 0;
/* Condition to send TRANS_AGREE */
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
* if Participant receives a TRANS_COMMIT */
- transinfo->objmod = oidmod;
transinfo->objlocked = oidlocked;
transinfo->objnotfound = oidnotfound;
transinfo->modptr = modptr;
- transinfo->nummod = fixed->nummod;
transinfo->numlocked = *(objlocked);
transinfo->numnotfound = *(objnotfound);
/* This function processes all modified objects involved in a TRANS_COMMIT and updates pointer
* addresses in lookup table and also changes version number
* Sends an ACK back to Coordinator */
-int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
+int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlocked, int nummod, int numlocked, int acceptfd) {
objheader_t *header;
int i = 0, offset = 0;
char control;
+
/* Process each modified object saved in the mainobject store */
- for(i=0; i<transinfo->nummod; i++) {
- if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
}
/* Change reference count of older address and free space in objstr ?? */
header->rcount = 1; //Not sure what would be the val
/* Change ptr address in mhash table */
- printf("DEBUG -> removing object oid = %d\n", transinfo->objmod[i]);
- mhashRemove(transinfo->objmod[i]);
- mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ mhashRemove(oidmod[i]);
+ mhashInsert(oidmod[i], (((char *)modptr) + offset));
offset += sizeof(objheader_t) + classsize[TYPE(header)];
/* Update object version number */
- header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header = (objheader_t *) mhashSearch(oidmod[i]);
header->version += 1;
}
/* Unlock locked objects */
- for(i=0; i<transinfo->numlocked; i++) {
- header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
STATUS(header) &= ~(LOCK);
}
return 0;
}
+/* This function recevies the oid and offset tuples from the Coordinator's prefetch call.
+ * Looks for the objects to be prefetched in the main object store.
+ * If objects are not found then record those and if objects are found
+ * then use offset values to prefetch references to other objects */
+
int prefetchReq(int acceptfd) {
int i, length, sum, n, numbytes, numoffset, N, objnotfound = 0, size, count = 0;
unsigned int oid, index = 0;
} while(sum < N && n != 0);
/* Process each oid */
- /* Check if object is still present in the machine since the beginning of TRANS_PREFETCH */
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found in buffer for later use */
*(buffer + index) = OBJECT_NOT_FOUND;
#include "plookup.h"
extern int classsize[];
+//NOTE: "pile" ptr points to the head of the linked list of the machine pile data structures
+
+/* This function creates a new pile data structure to hold
+ * obj ids of objects modified or read inside a transaction,
+ * no of objects read and no of objects modified
+ * that belong to a single machine */
+
plistnode_t *pCreate(int objects) {
plistnode_t *pile;
printf("Calloc error %s %d\n", __FILE__, __LINE__);
return NULL;
}
- pile->next = NULL;
if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ free(pile);
return NULL;
}
+ /*
if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s %d\n", __FILE__, __LINE__);
return NULL;
}
- pile->nummod = pile->numread = pile->sum_bytes = 0;
- if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) {
+ */
+ if ((pile->objread = calloc(objects, sizeof(unsigned int) + sizeof(short))) == NULL) {
printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ free(pile);
+ free(pile->oidmod);
return NULL;
}
- pile->objmodified = NULL;
- pile->nummod = pile->numread = pile->sum_bytes = 0;
+ pile->nummod = pile->numread = pile->sum_bytes = 0;
+ pile->next = NULL;
return pile;
}
+/* This function inserts necessary information into
+ * a machine pile data structure */
plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
plistnode_t *ptr, *tmp;
int found = 0, offset;
tmp = pile;
- //Add oid into a machine that is a part of the pile linked list structure
+ //Add oid into a machine that is already present in the pile linked list structure
while(tmp != NULL) {
if (tmp->mid == mid) {
if (STATUS(headeraddr) & DIRTY) {
tmp->nummod = tmp->nummod + 1;
tmp->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
} else {
- tmp->oidread[tmp->numread] = OID(headeraddr);
+ // tmp->oidread[tmp->numread] = OID(headeraddr);
offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
*((unsigned int *)(tmp->objread + offset))=OID(headeraddr);
offset += sizeof(unsigned int);
memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
tmp->numread = tmp->numread + 1;
- // printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread);
}
found = 1;
break;
ptr->nummod = ptr->nummod + 1;
ptr->sum_bytes += sizeof(objheader_t) + classsize[TYPE(headeraddr)];
} else {
- ptr->oidread[ptr->numread] = OID(headeraddr);
+ // ptr->oidread[ptr->numread] = OID(headeraddr);
*((unsigned int *)ptr->objread)=OID(headeraddr);
memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
ptr->numread = ptr->numread + 1;
return pile;
}
-//Count the number of machine groups
+//Count the number of machine piles
int pCount(plistnode_t *pile) {
plistnode_t *tmp;
int pcount = 0;
while(tmp != NULL) {
next = tmp->next;
free(tmp->oidmod);
- free(tmp->oidread);
+ //free(tmp->oidread);
free(tmp->objread);
free(tmp);
tmp = next;
#include <stdio.h>
#include "dstm.h"
+/* This structure is created using a transaction record.
+ * It is filled out with pile information necessary for
+ * participants involved in a transaction. */
typedef struct plistnode {
unsigned int mid;
- int local; /*Variable that keeps track if this pile is for LOCAL machine */
- unsigned int *oidmod;
- unsigned int *oidread;
- int nummod;
- int numread;
- int sum_bytes;
- char *objread;
- char *objmodified;
- int vote;
+ int local; /* Variable that keeps track if this pile is for LOCAL machine */
+ unsigned int *oidmod; /* Pointer to array containing oids of modified objects */
+ unsigned int *oidread; /* TODO: REMOVE THIS Pointer to array of objects read */
+ int nummod; /* no of objects read */
+ int numread; /* no of objects modified */
+ int sum_bytes; /* total bytes of objects modified */
+ char *objread; /* Pointer to array containing oids of objects read and their version numbers*/
struct plistnode *next;
} plistnode_t;
chashInsert(record->lookupTable, OID(tmp), objcopy);
return(objcopy);
} else { /* If not found anywhere, then block until object appears in prefetch cache */
+#if 0
printf("Inside remote machine\n");
pthread_mutex_lock(&pflookup.lock);
while(!found) {
pthread_mutex_unlock(&pflookup.lock);
}
}
+#endif
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
- //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
return NULL;
}
else {
- //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
return(objcopy);
}
}
}
next = curr->next;
//Get machine location for object id
+ //TODO Check is the object is newly created ...if not then lookup the location table
if ((machinenum = lhashSearch(curr->key)) == 0) {
printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
/* This function initiates the transaction commit process
* Spawns threads for each of the new connections with Participants
- * and creates new piles by calling the createPiles(),
- * Fills the piles with necesaary information and
- * Sends a transrequest() to each pile*/
+ * and creates new piles by calling the createPiles(),
+ * Sends a transrequest() to each remote machines for objects found remotely
+ * and calls handleLocalReq() to process objects found locally */
int transCommit(transrecord_t *record) {
unsigned int tot_bytes_mod, *listmid;
plistnode_t *pile, *pile_ptr;
return 0;
}
-/* This function sends information involved in the transaction request and
- * accepts a response from particpants.
+/* This function sends information involved in the transaction request
+ * to participants and accepts a response from particpants.
* It calls decideresponse() to decide on what control message
- * to send next and sends the message using sendResponse()*/
+ * to send next to participants and sends the message using sendResponse()*/
void *transRequest(void *threadarg) {
int sd, i, n;
struct sockaddr_in serv_addr;
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
/* Lock and update count */
- //Thread sleeps until all messages from pariticipants are received by coordinator
+ /* Thread sleeps until all messages from pariticipants are received by coordinator */
pthread_mutex_lock(tdata->lock);
(*(tdata->count))++; /* keeps track of no of messages received by the coordinator */
}
/* This function decides the reponse that needs to be sent to
- * all Participant machines involved in the transaction commit */
+ * all Participant machines after the TRANS_REQUEST protocol */
int decideResponse(thread_data_array_t *tdata) {
char control;
int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what
message to send */
- //Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
- /*Switch on response from Participant */
control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses
written onto the shared array */
switch(control) {
}
}
- /* Decide what control message to send to Participant */
+ /* Send Abort */
if(transdisagree > 0) {
- /* Send Abort */
*(tdata->replyctrl) = TRANS_ABORT;
printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
/* Free resources */
N = oidcount * sizeof(unsigned int);
if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 0;
}
ptr = (char *) oidnotfound;
do {
} else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */
retval = TRANS_COMMIT;
}
- /* Send response to the Participant */
+
if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
perror("Error sending ctrl message for participant\n");
}
return objcopy;
}
-/*This function handles the local trans requests involved in a transaction commiting process
- * makes a decision if the local machine sends AGREE or DISAGREE or SOFT_ABORT
- * Activates the other nonlocal threads that are waiting for the decision and the
- * based on common decision by all groups involved in the transaction it
- * either commits or aborts the transaction.
- * It also frees the calloced memory resources
- */
-
+/* This function handles the local objects involved in a transaction commiting process.
+ * It also makes a decision if this local machine sends AGREE or DISAGREE or SOFT_ABORT to coordinator.
+ * Note Coordinator = local machine
+ * It wakes up the other threads from remote participants that are waiting for the coordinator's decision and
+ * based on common agreement it either commits or aborts the transaction.
+ * It also frees the memory resources */
void *handleLocalReq(void *threadarg) {
- int val, i = 0;
+ int val, i = 0, size, offset = 0;
short version;
char control = 0, *ptr;
unsigned int oid;
unsigned int *oidnotfound = NULL, *oidlocked = NULL, *oidmod = NULL;
void *mobj, *modptr;
- objheader_t *headptr;
+ objheader_t *headptr, *headeraddr;
local_thread_data_array_t *localtdata;
localtdata = (local_thread_data_array_t *) threadarg;
/* Counters and arrays to formulate decision on control message to be sent */
oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int));
- oidmod = (unsigned int *) calloc(localtdata->tdata->buffer->f.nummod, sizeof(unsigned int));
- int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
- int objmodnotfound = 0, nummodfound = 0;
+ int objnotfound = 0, objlocked = 0;
+ int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
/* modptr points to the beginning of the object store
* created at the Pariticipant */
printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__);
return NULL;
}
+ /* Write modified objects into the mainobject store */
+ for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) {
+ headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]);
+ size = sizeof(objheader_t) + classsize[TYPE(headeraddr)];
+ memcpy(modptr+offset, headeraddr, size);
+ offset += size;
+ }
ptr = modptr;
+ offset = 0; //Reset
/* Process each oid in the machine pile/ group per thread */
for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) {
incr += sizeof(unsigned int);
version = *((short *)(localtdata->tdata->buffer->objread + incr));
} else {//Objs modified
- headptr = (objheader_t *) ptr;
+ headptr = (objheader_t *)ptr;
oid = OID(headptr);
- oidmod[objmod] = oid;//Array containing modified oids
- objmod++;
version = headptr->version;
ptr += sizeof(objheader_t) + classsize[TYPE(headptr)];
}
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
printf("DEBUG -> Sending TRANS_DISAGREE\n");
- //return tdata->recvmsg[tdata->thread_id].rcv_status;
}
} else {/* If Obj is not locked then lock object */
STATUS(((objheader_t *)mobj)) |= LOCK;
/* Send TRANS_DISAGREE to Coordinator */
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_DISAGREE;
printf("DEBUG -> Sending TRANS_DISAGREE\n");
- // return tdata->recvmsg[tdata->thread_id].rcv_status;
}
}
}
}
- /*Decide the response to be sent to the Coordinator( the local machine in this case)*/
-
/* Condition to send TRANS_AGREE */
if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) {
localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE;
/* Fill out the trans_commit_data_t data structure. This is required for a trans commit process
* if Participant receives a TRANS_COMMIT */
- localtdata->transinfo->objmod = oidmod;
localtdata->transinfo->objlocked = oidlocked;
localtdata->transinfo->objnotfound = oidnotfound;
localtdata->transinfo->modptr = modptr;
- localtdata->transinfo->nummod = localtdata->tdata->buffer->f.nummod;
localtdata->transinfo->numlocked = objlocked;
localtdata->transinfo->numnotfound = objnotfound;
- /*Set flag to show that common data structure for this individual thread has been written to */
- //*(tdata->localstatus) |= LM_UPDATED;
-
/* Lock and update count */
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(localtdata->tdata->lock);
/*Based on DecideResponse(), Either COMMIT or ABORT the operation*/
if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){
- if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->transinfo->nummod, localtdata->tdata->buffer->f.numread) != 0) {
+ if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) {
printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
}else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){
- if(transComProcess(localtdata->transinfo) != 0) {
+ if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->transinfo->numlocked) != 0) {
printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__);
return NULL;
}
/* Free memory */
printf("DEBUG -> Freeing...\n");
fflush(stdout);
- if (localtdata->transinfo->objmod != NULL) {
- free(localtdata->transinfo->objmod);
- localtdata->transinfo->objmod = NULL;
- }
+
if (localtdata->transinfo->objlocked != NULL) {
free(localtdata->transinfo->objlocked);
localtdata->transinfo->objlocked = NULL;
}
/* This function completes the ABORT process if the transaction is aborting
*/
-int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod, int numread) {
+int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) {
char *ptr;
int i;
objheader_t *tmp_header;
printf("DEBUG -> Recv TRANS_ABORT\n");
/* Set all ref counts as 1 and do garbage collection */
- ptr = modptr;
+ ptr = (char *)modptr;
for(i = 0; i< nummod; i++) {
tmp_header = (objheader_t *)ptr;
tmp_header->rcount = 1;
}
/* Unlock objects that was locked due to this transaction */
for(i = 0; i< numlocked; i++) {
- header = mhashSearch(objlocked[i]);// find the header address
+ if((header = mhashSearch(objlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
/*This function completes the COMMIT process is the transaction is commiting
*/
-int transComProcess(trans_commit_data_t *transinfo) {
+int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *objlocked, int nummod, int numlocked) {
objheader_t *header;
int i = 0, offset = 0;
char control;
- printf("DEBUG -> Recv TRANS_COMMIT\n");
/* Process each modified object saved in the mainobject store */
- for(i=0; i<transinfo->nummod; i++) {
- if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
+ for(i = 0; i < nummod; i++) {
+ if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) {
printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
}
/* Change reference count of older address and free space in objstr ?? */
header->rcount = 1; //TODO Not sure what would be the val
/* Change ptr address in mhash table */
- mhashRemove(transinfo->objmod[i]);
- mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
+ mhashRemove(oidmod[i]);
+ mhashInsert(oidmod[i], (((char *)modptr) + offset));
offset += sizeof(objheader_t) + classsize[TYPE(header)];
/* Update object version number */
- header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
+ header = (objheader_t *) mhashSearch(oidmod[i]);
header->version += 1;
}
/* Unlock locked objects */
- for(i=0; i<transinfo->numlocked; i++) {
- header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
+ for(i = 0; i < numlocked; i++) {
+ if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) {
+ printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
STATUS(header) &= ~(LOCK);
}
/* This function is called by the thread calling transPrefetch */
void *transPrefetch(void *t) {
- //int *offstarray = NULL;
prefetchqelem_t *qnode;
prefetchpile_t *pilehead = NULL;
tmp = mcpilenode->objpiles;
while(tmp != NULL) {
off = offset = 0;
- count++; // Keeps track of the number of oid and offset tuples sent per remote machine
+ count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */
len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short));
char oidnoffset[len];
memcpy(oidnoffset, &len, sizeof(int));