trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */
} local_thread_data_array_t;
-//Structure for objects involved in wait-notify call
-//TODO Use it
-typedef struct notifydata {
- unsigned int numoid; /* Number of oids on which we are waiting for updated notification */
- unsigned int threadid; /* The threadid that is waiting for update notification response*/
- unsigned int *oidarry; /* Pointer to array of oids */
- unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */
-}notifydata_t;
-
-
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
unsigned short getObjType(unsigned int oid);
int startRemoteThread(unsigned int oid, unsigned int mid);
/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
-void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid);
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid);
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version);
} while(sum < N && n != 0);
/* Process each oid */
- printf("Oid 0x%x is being searched\n", oid);
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found in buffer for later use */
*(buffer + index) = OBJECT_NOT_FOUND;
objheader_t *header;
unsigned int oid;
unsigned short newversion;
- char msg[1+ sizeof(unsigned int)];
+ char msg[1+ 2 * sizeof(unsigned int) + sizeof(unsigned short)];
int sd;
struct sockaddr_in remoteAddr;
int bytesSent;
msg[0] = THREAD_NOTIFY_RESPONSE;
msg[1] = oid;
size = sizeof(unsigned int);
- memcpy(&msg[1] + size, &newversion, sizeof(unsigned short));
+ *((unsigned short *)(&msg[1]+size)) = newversion;
size += sizeof(unsigned short);
- memcpy(&msg[1] + size, &threadid, sizeof(unsigned int));
- bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0);
+ *((unsigned int *)(&msg[1]+size)) = threadid;
+ bytesSent = send(sd, msg, 1+ 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
if (bytesSent < 0){
perror("processReqNotify():send()");
status = -1;
notifyhashtable_t nlookup; //Global hash table
+/* This function creates a new node in the linked list of threads waiting
+ * for an update notification from a particular object.
+ * This takes in the head of the linked list and inserts the new node to it */
void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
threadlist_t *ptr;
if(head == NULL) {
}
}
+/* This function displays the linked list of threads waiting on update notification
+ * from an object */
void display(threadlist_t *head) {
threadlist_t *ptr;
if(head == NULL) {
}
}
+/* This function creates a new hash table that stores a mapping between the threadid and
+ * a pointer to the thread notify data */
unsigned int notifyhashCreate(unsigned int size, float loadfactor) {
notifylistnode_t *nodes;
return( tid % (nlookup.size));
}
-// Insert threadcond and threadid mapping into the hash table
-unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
+// Insert pointer to the notify data and threadid mapping into the hash table
+unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata) {
unsigned int newsize;
int index;
notifylistnode_t *ptr, *node;
pthread_mutex_lock(&nlookup.locktable);
if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable
ptr[index].threadid = tid;
- ptr[index].threadcond = threadcond;
+ ptr[index].ndata = ndata;
} else { // Insert in the beginning of linked list
if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
}
node->threadid = tid;
- node->threadcond = threadcond;
+ node->ndata = ndata;
node->next = ptr[index].next;
ptr[index].next = node;
}
return 0;
}
-// Return pthread_cond_t threadcond for a given threadid in the hash table
-pthread_cond_t notifyhashSearch(unsigned int tid) {
+// Return pointer to thread notify data for a given threadid in the hash table
+notifydata_t *notifyhashSearch(unsigned int tid) {
int index;
notifylistnode_t *ptr, *node;
- pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
ptr = nlookup.table; // Address of the beginning of hash table
index = notifyhashFunction(tid);
while(node != NULL) {
if(node->threadid == tid) {
pthread_mutex_unlock(&nlookup.locktable);
- return node->threadcond;
+ return node->ndata;
}
node = node->next;
}
pthread_mutex_unlock(&nlookup.locktable);
- return tmp;
+ return NULL;
}
// Remove an entry from the hash table
unsigned int notifyhashRemove(unsigned int tid) {
int index;
- notifylistnode_t *curr, *prev;
- notifylistnode_t *ptr, *node;
+ notifylistnode_t *curr, *prev, *ptr, *node;
ptr = nlookup.table;
index = notifyhashFunction(tid);
curr = &ptr[index];
- pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&nlookup.locktable);
for (; curr != NULL; curr = curr->next) {
if (curr->threadid == tid) { // Find a match in the hash table
nlookup.numelements--; // Decrement the number of elements in the global hashtable
if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t
curr->threadid = 0;
- curr->threadcond = tmp;
+ curr->ndata = NULL;
} else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected
curr->threadid = curr->next->threadid;
- curr->threadcond = curr->next->threadcond;
+ curr->ndata = curr->next->ndata;
node = curr->next;
curr->next = curr->next->next;
free(node);
// Insert into the new table
if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) {
nlookup.table[index].threadid = curr->threadid;
- nlookup.table[index].threadcond = curr->threadcond;
+ nlookup.table[index].ndata = curr->ndata;
nlookup.numelements++;
}else {
if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) {
return 1;
}
newnode->threadid = curr->threadid;
- newnode->threadcond = curr->threadcond;
+ newnode->ndata = curr->ndata;
newnode->next = nlookup.table[index].next;
nlookup.table[index].next = newnode;
nlookup.numelements++;
struct threadlist *next;
} threadlist_t;
+//Structure for objects involved in wait-notify call
+typedef struct notifydata {
+ unsigned int numoid; /* Number of oids on which we are waiting for updated notification */
+ unsigned int threadid; /* The threadid that is waiting for update notification response*/
+ unsigned int *oidarry; /* Pointer to array of oids that this threadid is waiting on*/
+ unsigned short *versionarry;/* Pointer to array of versions of the oids that we are waiting on */
+ pthread_cond_t threadcond; /* Cond variable associated with each threadid that needs to be signaled*/
+}notifydata_t;
+
typedef struct notifylistnode {
unsigned int threadid;
- pthread_cond_t threadcond;
+ notifydata_t *ndata;
struct notifylistnode *next;
} notifylistnode_t;
typedef struct notifyhashtable {
- notifylistnode_t *table; //points to beginning of hash table
+ notifylistnode_t *table; //Points to beginning of hash table
unsigned int size;
unsigned int numelements;
float loadfactor;
- pthread_mutex_t locktable;
+ pthread_mutex_t locktable; //Lock for the hashtable
} notifyhashtable_t;
-void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid);
-void display(threadlist_t *head);
-unsigned int notifyhashCreate(unsigned int size, float loadfactor);
-unsigned int notifyhashFunction(unsigned int tid);
-unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond);
-pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found
-unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid); //Inserts nodes for one object that
+ //needs to send notification to threads waiting on it
+void display(threadlist_t *head);// Displays linked list of nodes for one object
+unsigned int notifyhashCreate(unsigned int size, float loadfactor); //returns 1 if hashtable creation is not successful
+unsigned int notifyhashFunction(unsigned int tid); //returns index in the hash table
+unsigned int notifyhashInsert(unsigned int tid, notifydata_t *ndata); //returns 1 if insert not successful
+notifydata_t *notifyhashSearch(unsigned int tid); //returns pointer to notify data, NULL if not found
+unsigned int notifyhashRemove(unsigned int tid); //returns 1 if not successful
unsigned int notifyhashResize(unsigned int newsize);
#endif
STATUS(((objheader_t *)header)) &= ~(LOCK);
}
- printf("TRANS_ABORTED at Coordinator end\n");
-
+ printf("TRANS_ABORTED\n");
return 0;
}
/*This function completes the COMMIT process is the transaction is commiting*/
int transComProcess(local_thread_data_array_t *localtdata) {
+ static int prevsize = 0, *prevptr;
objheader_t *header, *tcptr;
int i, nummod, tmpsize, numcreated, numlocked;
unsigned int *oidmod, *oidcreated, *oidlocked;
}
STATUS(header) &= ~(LOCK);
}
+
return 0;
}
/* Send Trans Prefetch Request */
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket for TRANS_REQUEST\n");
+ perror("Error in socket for SEND_PREFETCH_REQUEST\n");
return;
}
+
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
/* Open Connection */
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect for TRANS_REQUEST\n");
+ perror("Error in connect for SEND_PREFETCH_REQUEST\n");
close(sd);
return;
}
/* This function sends notification request per thread waiting on object(s) whose version
* changes */
-void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) {
+int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) {
int sock,i;
objheader_t *objheader;
struct sockaddr_in remoteAddr;
- char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3];
+ char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)];
char *ptr;
int bytesSent;
int status, size;
unsigned short version;
- unsigned int oid, threadid;
+ unsigned int oid, threadid, mid;
pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification
pthread_cond_t threadcond;
+ notifydata_t *ndata;
+
+ //FIXME currently all oids belong to one machine
+ oid = oidarry[0];
+ mid = lhashSearch(oid);
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
perror("reqNotify():socket()");
- return;
+ return -1;
}
bzero(&remoteAddr, sizeof(remoteAddr));
/* Generate unique threadid */
threadid = (unsigned int) pthread_self();
- if((status = notifyhashInsert(threadid, threadcond)) != 0) {
+
+ /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */
+ if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) {
+ printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
+ return -1;
+ }
+ ndata->numoid = numoid;
+ ndata->threadid = threadid;
+ ndata->oidarry = oidarry;
+ ndata->versionarry = versionarry;
+ ndata->threadcond = threadcond;
+ if((status = notifyhashInsert(threadid, ndata)) != 0) {
printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
- return;
+ free(ndata);
+ return -1;
}
-
- /* Save data that is sent for later processing */
- //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list
- //TODO
-
- /* Send oidarry, version array, threadid and machine id */
+
+ /* Send number of oids, oidarry, version array, machine id and threadid */
if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
printf("reqNotify():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
- status = -1;
+ free(ndata);
+ return -1;
} else {
msg[0] = THREAD_NOTIFY_REQUEST;
msg[1] = numoid;
}
close(sock);
+ return status;
}
void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
- pthread_cond_t ret;
+ notifydata_t *ndata;
+ int i, objIsFound = 0, index;
+ void *ptr;
+
//Look up the tid and call the corresponding pthread_cond_signal
- ret = notifyhashSearch(tid);
- pthread_cond_signal(&ret);
- //TODO process oid and version
+ if((ndata = notifyhashSearch(tid)) == NULL) {
+ printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ for(i = 0; i < ndata->numoid; i++) {
+ if(ndata->oidarry[i] == oid){
+ objIsFound = 1;
+ index = i;
+ }
+ }
+ if(objIsFound == 0){
+ return;
+ } else {
+ if(version <= ndata->versionarry[index]){
+ return;
+ } else {
+ /* Clear from prefetch cache and free thread related data structure */
+ if((ptr = prehashSearch(oid)) == NULL) {
+ //TODO Ask about freeing
+ printf("threadnotify(): No such oid %s, %d\n", __FILE__, __LINE__);
+ pthread_cond_signal(&ndata->threadcond);
+ free(ndata);
+ return;
+ } else {
+ prehashRemove(oid);
+ pthread_cond_signal(&ndata->threadcond);
+ free(ndata);
+ }
+ }
+ }
+ }
+ return;
}
int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
msg[0] = THREAD_NOTIFY_RESPONSE;
msg[1] = oid;
size = sizeof(unsigned int);
- *(&msg[1]+ size) = version;
+ *((unsigned short *)(&msg[1]+ size)) = version;
size+= sizeof(unsigned short);
- *(&msg[1]+ size) = ptr->threadid;
+ *((unsigned int *)(&msg[1]+ size)) = ptr->threadid;
bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
if (bytesSent < 0){
*head = ptr->next;
free(ptr);
}
+ return status;
}
void transAbort(transrecord_t *trans) {
ptr = transRead(trans, oidvalue);
struct ___Thread___ *p = (struct ___Thread___ *) ptr;
p->___threadDone___ = 1;
- while(!transCommit(trans)) {
- printf("DEBUG-> Trans not committed yet\n");
+ if(transCommit(trans) != 0) {
transAbort(trans);
goto transstart;
}
printf("DEBUG -> Inside thread join\n");
#ifdef DSTM
pthread_t thread;
- unsigned int *oidarray, mid;
+ unsigned int *oidarray;
unsigned short *versionarray, version;
transrecord_t *trans;
objheader_t *ptr;
return;
}
versionarray[0] = version;
- mid = lhashSearch((unsigned int) VAR(___this___));
/* Request Notification */
- reqNotify(oidarray, versionarray, mid, 1);
+ reqNotify(oidarray, versionarray, 1);
free(oidarray);
free(versionarray);
transAbort(trans);
tmp = transRead(trans, (unsigned int) oid);
struct ___Thread___ *t = (struct ___Thread___ *) tmp;
t->___threadDone___ = 1;
- while(!transCommit(trans)) {
- printf("DEBUG-> Trans not committed yet\n");
+ if(transCommit(trans)!= 0) {
transAbort(trans);
goto transstart;
}