public class Thread {
/* Don't allow overriding this method. If you do, it will break dispatch
* because we don't have the type information necessary. */
- private boolean threadDone;
+ public boolean threadDone;
public Thread() {
threadDone = false;
}
- //public native static void join();
- public native void join();
+ public final native void join();
public final native void start(int mid);
#define TRANS_SUCESSFUL 21
#define TRANS_PREFETCH_RESPONSE 22
#define START_REMOTE_THREAD 23
+#define THREAD_NOTIFY_REQUEST 24
+#define THREAD_NOTIFY_RESPONSE 25
//Control bits for status of objects in Machine pile
#define OBJ_LOCKED_BUT_VERSION_MATCH 14
#include "clookup.h"
#include "queue.h"
#include "mcpileq.h"
+#include "threadnotify.h"
+
#define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
#define TID_LEN 20
#include "structdefs.h"
typedef struct objheader {
+ threadlist_t *notifylist;
unsigned short version;
unsigned short rcount;
} objheader_t;
#else
typedef struct objheader {
+ threadlist_t *notifylist;
unsigned int oid;
unsigned short type;
unsigned short version;
trans_commit_data_t *transinfo; /* Holds information of objects locked and not found in the participant */
} local_thread_data_array_t;
-//Structure for members within prefetch tuples
-typedef struct member {
- short offset; /* Holds offset of the ptr field */
- short index; /* Holds the array index value */
- struct member *next;
-}trans_member_t;
+//Structure for objects involved in wait-notify call
+//TODO Use it
+typedef struct notifydata {
+ unsigned int numoid; /* Number of oids on which we are waiting for updated notification */
+ unsigned int threadid; /* The threadid that is waiting for update notification response*/
+ unsigned int *oidarry; /* Pointer to array of oids */
+ unsigned short *version;/* Pointer to array of versions of the oids that we are waiting on */
+}notifydata_t;
+
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
int processClientReq(fixed_data_t *, trans_commit_data_t *,unsigned int *, char *, void *, unsigned int *, int);
char handleTransReq(fixed_data_t *, trans_commit_data_t *, unsigned int *, char *, void *, int);
int decideCtrlMessage(fixed_data_t *, trans_commit_data_t *, int *, int *, int *, int *, int *, void *, unsigned int *, unsigned int *, int);
-//int transCommitProcess(trans_commit_data_t *, int);
int transCommitProcess(void *, unsigned int *, unsigned int *, int, int, int);
+void processReqNotify(unsigned int numoid, unsigned int *oid, unsigned short *version, unsigned int mid, unsigned int threadid);
/* end server portion */
/* Prototypes for transactions */
void addHost(unsigned int);
void mapObjMethod(unsigned short);
-void randomdelay(void);
+void randomdelay();
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *, unsigned int);
objheader_t *transCreateObj(transrecord_t *, unsigned int); //returns oid
void *handleLocalReq(void *);
int transComProcess(local_thread_data_array_t *);
int transAbortProcess(local_thread_data_array_t *);
+void transAbort(transrecord_t *trans);
void prefetch(int, unsigned int *, unsigned short *, short*);
void *transPrefetch(void *);
void getPrefetchResponse(int, int);
unsigned short getObjType(unsigned int oid);
int startRemoteThread(unsigned int oid, unsigned int mid);
+/* Sends notification request for thread join, if sucessful returns 0 else returns -1 */
+void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid);
+void threadNotify(unsigned int oid, unsigned short version, unsigned int tid);
+int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version);
+
/* end transactions */
#endif
#include <pthread.h>
#include <netdb.h>
#include <fcntl.h>
+#include <errno.h>
+#include <string.h>
#include "dstm.h"
#include "mlookup.h"
#include "llookup.h"
+#include "threadnotify.h"
#ifdef COMPILER
#include "thread.h"
#endif
if (lhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
+
+ if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
+ return 1; //failure
return 0;
}
* and accordingly calls other functions to process new requests */
void *dstmAccept(void *acceptfd)
{
- int numbytes,i, val, retval;
+ int val, retval, size;
unsigned int oid;
char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
char *ptr;
void *srcObj;
objheader_t *h;
trans_commit_data_t transinfo;
- unsigned short objType;
-
+ unsigned short objType, *versionarry, version;
+ unsigned int *oidarry, numoid, mid, threadid;
+
transinfo.objlocked = NULL;
transinfo.objnotfound = NULL;
transinfo.modptr = NULL;
transinfo.numlocked = 0;
transinfo.numnotfound = 0;
- int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
-
/* Receive control messages from other machines */
if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
if (retval == 0) {
}
if((srcObj = mhashSearch(oid)) == NULL) {
printf("Object not found in Main Object Store %s %d\n", __FILE__, __LINE__);
+ pthread_exit(NULL);
}
h = (objheader_t *) srcObj;
GETSIZE(size, h);
case TRANS_PREFETCH:
printf("DEBUG -> Recv TRANS_PREFETCH\n");
if((val = prefetchReq((int)acceptfd)) != 0) {
- printf("Error in readClientReq\n");
+ printf("Error in transPrefetch\n");
pthread_exit(NULL);
}
break;
}
break;
+ case THREAD_NOTIFY_REQUEST:
+ size = sizeof(unsigned int);
+ retval = recv((int)acceptfd, ptr, size, 0);
+ numoid = *((unsigned int *) ptr);
+ size = (sizeof(unsigned int) + sizeof(unsigned short)) * numoid + 2 * sizeof(unsigned int);
+ retval = recv((int)acceptfd, ptr, size, 0);
+ oidarry = calloc(numoid, sizeof(unsigned int));
+ memcpy(oidarry, ptr, sizeof(unsigned int) * numoid);
+ size = sizeof(unsigned int) * numoid;
+ versionarry = calloc(numoid, sizeof(unsigned short));
+ memcpy(versionarry, ptr+size, sizeof(unsigned short) * numoid);
+ size += sizeof(unsigned short) * numoid;
+ mid = *((unsigned int *)(ptr+size));
+ size += sizeof(unsigned int);
+ threadid = *((unsigned int *)(ptr+size));
+ processReqNotify(numoid, oidarry, versionarry, mid, threadid);
+
+ break;
+
+ case THREAD_NOTIFY_RESPONSE:
+ size = sizeof(unsigned short) + 2 * sizeof(unsigned int);
+ retval = recv((int)acceptfd, ptr, size, 0);
+ if(retval <= 0)
+ perror("dstmAccept(): error receiving THREAD_NOTIFY_RESPONSE msg");
+ else if( retval != 2*sizeof(unsigned int) + sizeof(unsigned short))
+ printf("dstmAccept(): incorrect smsg size %d for THREAD_NOTIFY_RESPONSE msg\n", retval);
+ else {
+ oid = *((unsigned int *)ptr);
+ size = sizeof(unsigned int);
+ version = *((unsigned short *)(ptr+size));
+ size += sizeof(unsigned short);
+ threadid = *((unsigned int *)(ptr+size));
+ threadNotify(oid,version,threadid);
+ }
+
+ break;
+
default:
printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
}
pthread_mutex_lock(&mainobjstore_mutex);
memcpy(header, (char *)modptr + offset, tmpsize + sizeof(objheader_t));
header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
pthread_mutex_unlock(&mainobjstore_mutex);
offset += sizeof(objheader_t) + tmpsize;
}
} while(sum < N && n != 0);
/* Process each oid */
+ printf("Oid 0x%x is being searched\n", oid);
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found in buffer for later use */
*(buffer + index) = OBJECT_NOT_FOUND;
return 0;
}
+void processReqNotify(unsigned int numoid, unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int threadid) {
+ objheader_t *header;
+ unsigned int oid;
+ unsigned short newversion;
+ char msg[1+ sizeof(unsigned int)];
+ int sd;
+ struct sockaddr_in remoteAddr;
+ int bytesSent;
+ int status, size, retry = 0;
+
+ int i = 0;
+ while(i < numoid) {
+ oid = *(oidarry + i);
+ if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
+ printf("processReqNotify(): Object is not found in mlookup %s, %d\n", __FILE__, __LINE__);
+ return;
+ } else {
+ /* Check to see if versions are same */
+checkversion:
+ if ((STATUS(header) & LOCK) != LOCK) {
+ STATUS(header) |= LOCK;
+ if(header->version == *(versionarry + i)) {
+ //Add to the notify list
+ insNode(header->notifylist, threadid, mid);
+ } else {
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("processReqNotify():socket()");
+ return;
+ }
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("processReqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ } else {
+ //Send Update notification
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ msg[1] = oid;
+ size = sizeof(unsigned int);
+ memcpy(&msg[1] + size, &newversion, sizeof(unsigned short));
+ size += sizeof(unsigned short);
+ memcpy(&msg[1] + size, &threadid, sizeof(unsigned int));
+ bytesSent = send(sd, msg, 1+sizeof(unsigned int), 0);
+ if (bytesSent < 0){
+ perror("processReqNotify():send()");
+ status = -1;
+ } else if (bytesSent != 1 + sizeof(unsigned short) + 2*sizeof(unsigned int)){
+ printf("processReqNotify(): error, sent %d bytes\n", bytesSent);
+ status = -1;
+ } else {
+ status = 0;
+ }
+
+ }
+ close(sd);
+ }
+ STATUS(header) &= ~(LOCK);
+ } else {
+ randomdelay();
+ printf("DEBUG-> processReqNotify() Object is still locked\n");
+ goto checkversion;
+ }
+ }
+ }
+ free(oidarry);
+ free(versionarry);
+}
+
// Creates a machine lookup table with size =" size"
unsigned int mhashCreate(unsigned int size, float loadfactor) {
mhashlistnode_t *nodes;
- int i;
-
// Allocate space for the hash table
if((nodes = calloc(size, sizeof(mhashlistnode_t))) == NULL) {
printf("Calloc error %s %d\n", __FILE__, __LINE__);
--- /dev/null
+#include "threadnotify.h"
+
+notifyhashtable_t nlookup; //Global hash table
+
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid) {
+ threadlist_t *ptr;
+ if(head == NULL) {
+ if((head = calloc(1, sizeof(threadlist_t))) == NULL) {
+ printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
+ return;
+ }
+ head->threadid = threadid;
+ head->mid = mid;
+ head->next = NULL;
+ } else {
+ if((ptr = calloc(1, sizeof(threadlist_t))) == NULL) {
+ printf("Calloc Error %s, %d,\n", __FILE__, __LINE__);
+ return;
+ }
+ ptr->threadid = threadid;
+ ptr->mid = mid;
+ ptr->next = head;
+ head = ptr;
+ }
+}
+
+void display(threadlist_t *head) {
+ threadlist_t *ptr;
+ if(head == NULL) {
+ printf("No thread is waiting\n");
+ return;
+ } else {
+ while(head != NULL) {
+ ptr = head;
+ printf("The threadid waiting is = %d\n", ptr->threadid);
+ printf("The mid on which thread present = %d\n", ptr->mid);
+ head = ptr->next;
+ }
+ }
+}
+
+unsigned int notifyhashCreate(unsigned int size, float loadfactor) {
+ notifylistnode_t *nodes;
+
+ // Allocate space for the hash table
+ if((nodes = calloc(size, sizeof(notifylistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ nlookup.table = nodes;
+ nlookup.size = size;
+ nlookup.numelements = 0; // Initial number of elements in the hash
+ nlookup.loadfactor = loadfactor;
+ //Initialize the pthread_mutex variable
+ pthread_mutex_init(&nlookup.locktable, NULL);
+ return 0;
+}
+
+// Assign to tids to bins inside hash table
+unsigned int notifyhashFunction(unsigned int tid) {
+ return( tid % (nlookup.size));
+}
+
+// Insert threadcond and threadid mapping into the hash table
+unsigned int notifyhashInsert(unsigned int tid, pthread_cond_t threadcond) {
+ unsigned int newsize;
+ int index;
+ notifylistnode_t *ptr, *node;
+
+ if (nlookup.numelements > (nlookup.loadfactor * nlookup.size)) {
+ //Resize Table
+ newsize = 2 * nlookup.size + 1;
+ pthread_mutex_lock(&nlookup.locktable);
+ notifyhashResize(newsize);
+ pthread_mutex_unlock(&nlookup.locktable);
+ }
+ ptr = nlookup.table;
+ nlookup.numelements++;
+
+ index = notifyhashFunction(tid);
+#ifdef DEBUG
+ printf("DEBUG -> index = %d, threadid = %d\n", index, tid);
+#endif
+ pthread_mutex_lock(&nlookup.locktable);
+ if(ptr[index].next == NULL && ptr[index].threadid == 0) { // Insert at the first position in the hashtable
+ ptr[index].threadid = tid;
+ ptr[index].threadcond = threadcond;
+ } else { // Insert in the beginning of linked list
+ if ((node = calloc(1, sizeof(notifylistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&nlookup.locktable);
+ return 1;
+ }
+ node->threadid = tid;
+ node->threadcond = threadcond;
+ node->next = ptr[index].next;
+ ptr[index].next = node;
+ }
+ pthread_mutex_unlock(&nlookup.locktable);
+ return 0;
+}
+
+// Return pthread_cond_t threadcond for a given threadid in the hash table
+pthread_cond_t notifyhashSearch(unsigned int tid) {
+ int index;
+ notifylistnode_t *ptr, *node;
+ pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
+
+ ptr = nlookup.table; // Address of the beginning of hash table
+ index = notifyhashFunction(tid);
+ node = &ptr[index];
+ pthread_mutex_lock(&nlookup.locktable);
+ while(node != NULL) {
+ if(node->threadid == tid) {
+ pthread_mutex_unlock(&nlookup.locktable);
+ return node->threadcond;
+ }
+ node = node->next;
+ }
+ pthread_mutex_unlock(&nlookup.locktable);
+ return tmp;
+}
+
+// Remove an entry from the hash table
+unsigned int notifyhashRemove(unsigned int tid) {
+ int index;
+ notifylistnode_t *curr, *prev;
+ notifylistnode_t *ptr, *node;
+
+ ptr = nlookup.table;
+ index = notifyhashFunction(tid);
+ curr = &ptr[index];
+
+ pthread_cond_t tmp = PTHREAD_COND_INITIALIZER;
+ pthread_mutex_lock(&nlookup.locktable);
+ for (; curr != NULL; curr = curr->next) {
+ if (curr->threadid == tid) { // Find a match in the hash table
+ nlookup.numelements--; // Decrement the number of elements in the global hashtable
+ if ((curr == &ptr[index]) && (curr->next == NULL)) { // Delete the first item inside the hashtable with no linked list of notifylistnode_t
+ curr->threadid = 0;
+ curr->threadcond = tmp;
+ } else if ((curr == &ptr[index]) && (curr->next != NULL)) { //Delete the first bin item with a linked list of notifylistnode_t connected
+ curr->threadid = curr->next->threadid;
+ curr->threadcond = curr->next->threadcond;
+ node = curr->next;
+ curr->next = curr->next->next;
+ free(node);
+ } else { // Regular delete from linked listed
+ prev->next = curr->next;
+ free(curr);
+ }
+ pthread_mutex_unlock(&nlookup.locktable);
+ return 0;
+ }
+ prev = curr;
+ }
+ pthread_mutex_unlock(&nlookup.locktable);
+ return 1;
+}
+
+// Resize table
+unsigned int notifyhashResize(unsigned int newsize) {
+ notifylistnode_t *node, *ptr, *curr, *next; // curr and next keep track of the current and the next notifyhashlistnodes in a linked list
+ unsigned int oldsize;
+ int isfirst; // Keeps track of the first element in the notifylistnode_t for each bin in hashtable
+ int i,index;
+ notifylistnode_t *newnode;
+
+ ptr = nlookup.table;
+ oldsize = nlookup.size;
+
+ if((node = calloc(newsize, sizeof(notifylistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
+ nlookup.table = node; //Update the global hashtable upon resize()
+ nlookup.size = newsize;
+ nlookup.numelements = 0;
+
+ for(i = 0; i < oldsize; i++) { //Outer loop for each bin in hash table
+ curr = &ptr[i];
+ isfirst = 1;
+ while (curr != NULL) { //Inner loop to go through linked lists
+ if (curr->threadid == 0) { //Exit inner loop if there the first element for a given bin/index is NULL
+ break; //threadid = threadcond =0 for element if not present within the hash table
+ }
+ next = curr->next;
+ index = notifyhashFunction(curr->threadid);
+#ifdef DEBUG
+ printf("DEBUG(resize) -> index = %d, threadid = %d\n", index, curr->threadid);
+#endif
+ // Insert into the new table
+ if(nlookup.table[index].next == NULL && nlookup.table[index].threadid == 0) {
+ nlookup.table[index].threadid = curr->threadid;
+ nlookup.table[index].threadcond = curr->threadcond;
+ nlookup.numelements++;
+ }else {
+ if((newnode = calloc(1, sizeof(notifylistnode_t))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+ newnode->threadid = curr->threadid;
+ newnode->threadcond = curr->threadcond;
+ newnode->next = nlookup.table[index].next;
+ nlookup.table[index].next = newnode;
+ nlookup.numelements++;
+ }
+
+ //free the linked list of notifylistnode_t if not the first element in the hash table
+ if (isfirst != 1) {
+ free(curr);
+ }
+
+ isfirst = 0;
+ curr = next;
+ }
+ }
+
+ free(ptr); //Free the memory of the old hash table
+ return 0;
+}
--- /dev/null
+#ifndef _THREADNOTIFY_H_
+#define _THREADNOTIFY_H_
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+
+#define N_LOADFACTOR 0.75
+#define N_HASH_SIZE 20
+
+//Structure to notify object of which other objects/threads are waiting on it
+typedef struct threadlist {
+ unsigned int threadid;
+ unsigned int mid;
+ struct threadlist *next;
+} threadlist_t;
+
+typedef struct notifylistnode {
+ unsigned int threadid;
+ pthread_cond_t threadcond;
+ struct notifylistnode *next;
+} notifylistnode_t;
+
+typedef struct notifyhashtable {
+ notifylistnode_t *table; //points to beginning of hash table
+ unsigned int size;
+ unsigned int numelements;
+ float loadfactor;
+ pthread_mutex_t locktable;
+} notifyhashtable_t;
+
+void insNode(threadlist_t *head, unsigned int threadid, unsigned int mid);
+void display(threadlist_t *head);
+unsigned int notifyhashCreate(unsigned int size, float loadfactor);
+unsigned int notifyhashFunction(unsigned int tid);
+unsigned notifyhashInsert(unsigned int tid, pthread_cond_t threadcond);
+pthread_cond_t notifyhashSearch(unsigned int tid); //returns val, NULL if not found
+unsigned int notifyhashRemove(unsigned int tid); //returns -1 if not found
+unsigned int notifyhashResize(unsigned int newsize);
+
+#endif
#include "llookup.h"
#include "plookup.h"
#include "prelookup.h"
+#include "threadnotify.h"
#include "queue.h"
#include <pthread.h>
#include <sys/types.h>
#include <errno.h>
#include <time.h>
#include <string.h>
-#include <pthread.h>
#define LISTEN_PORT 2156
#define RECEIVE_BUFFER_SIZE 2048
}
if(success == 0) {
- printf("DEBUG-> Unable to insert object in Prefetch cache\n");
return NULL;
}
}
/* This functions inserts randowm wait delays in the order of msec
* Mostly used when transaction commits retry*/
-void randomdelay(void)
+void randomdelay()
{
struct timespec req;
time_t t;
/* Get the object from the remote location */
machinenumber = lhashSearch(oid);
+ char* ipaddr;
+ midtoIP(machinenumber, ipaddr);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
printf("Object not found in Remote location %s, %d\n", __FILE__, __LINE__);
objheader_t *transCreateObj(transrecord_t *record, unsigned int size)
{
objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size));
+ tmp->notifylist = NULL;
OID(tmp) = getNewOID();
tmp->version = 1;
tmp->rcount = 1;
/* Create a list of machine ids(Participants) involved in transaction */
if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
- free(record);
+ //free(record);
return 1;
}
pListMid(pile, listmid);
pthread_mutex_destroy(&tlock);
pDelete(pile_ptr);
free(listmid);
- free(record);
+ //free(record);
return 1;
}
pDelete(pile_ptr);
free(listmid);
free(thread_data_array);
- free(record);
+ //free(record);
return 1;
}
free(listmid);
free(thread_data_array);
free(ltdata);
- free(record);
+ //free(record);
return 1;
}
tosend->f.control = TRANS_REQUEST;
free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- free(record);
+ //free(record);
return 1;
}
} else { /*Local*/
free(thread_data_array[i].buffer);
free(thread_data_array);
free(ltdata);
- free(record);
+ //free(record);
return 1;
}
}
free(thread_data_array[j].buffer);
free(thread_data_array);
free(ltdata);
- free(record);
+ //free(record);
return 1;
}
free(thread_data_array[i].buffer);
pthread_mutex_lock(&mainobjstore_mutex);
memcpy(header, tcptr, tmpsize + sizeof(objheader_t));
header->version += 1;
+ /* If threads are waiting on this object to be updated, notify them */
+ if(header->notifylist != NULL) {
+ notifyAll(&header->notifylist, OID(header), header->version);
+ }
+
pthread_mutex_unlock(&mainobjstore_mutex);
}
/* If object is newly created inside transaction then commit it */
return 1;
}
GETSIZE(tmpsize, header);
+ tmpsize += sizeof(objheader_t);
pthread_mutex_lock(&mainobjstore_mutex);
if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) {
printf("Error: transComProcess() failed objstrAlloc\n");
}
pthread_mutex_unlock(&mainobjstore_mutex);
memcpy(ptrcreate, header, tmpsize + sizeof(objheader_t));
-
mhashInsert(oidcreated[i], ptrcreate);
lhashInsert(oidcreated[i], myIpAddr);
}
oid = GET_PTR_OID(ptr);
endoffsets = GET_PTR_EOFF(ptr, ntuples);
arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+
/* Find offset length for each tuple */
int numoffset[ntuples];
numoffset[0] = endoffsets[0];
}
if(isArray == 1) {
int elementsize = classsize[TYPE(objheader)];
- struct ArrayObject *ao = (struct ArrayObject *) (tmp + sizeof(objheader_t));
objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
} else {
- objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]);
+ objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
}
//Update numoffset array
numoffset[i] = numoffset[i] - 1;
int elementsize = classsize[TYPE(header)];
objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex])));
} else {
- objoid = *(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]);
+ objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex]));
}
//Update numoffset array
numoffset[index] = numoffset[index] - 1;
/* Increment it to get the object */
/* TODO: For each object not found query DHT for new location and retrieve the object */
index += sizeof(char);
- memcpy(&oid, buffer + index, sizeof(unsigned int));
+ //memcpy(&oid, buffer + index, sizeof(unsigned int));
+ oid = *((unsigned int *)(buffer + index));
index += sizeof(unsigned int);
/* Throw an error */
printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n");
{
if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
{
- //prefetch(1, &oid, &numoffsets, NULL);
prefetch(1, &oid, numoffset, fieldoffset);
pthread_mutex_lock(&pflookup.lock);
while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
//not found
return -1;
}
+
+/* This function sends notification request per thread waiting on object(s) whose version
+ * changes */
+void reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int mid, unsigned int numoid) {
+ int sock,i;
+ objheader_t *objheader;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + sizeof(unsigned int) * 3];
+ char *ptr;
+ int bytesSent;
+ int status, size;
+ unsigned short version;
+ unsigned int oid, threadid;
+ pthread_mutex_t threadnotify; //Lock and condition var for threadjoin and notification
+ pthread_cond_t threadcond;
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("reqNotify():socket()");
+ return;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ /* Generate unique threadid */
+ threadid = (unsigned int) pthread_self();
+ if((status = notifyhashInsert(threadid, threadcond)) != 0) {
+ printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+
+ /* Save data that is sent for later processing */
+ //Save threadid, numoid, oidarray, versionarray, also the pthread_cond_variable in a linked list
+ //TODO
+
+ /* Send oidarry, version array, threadid and machine id */
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("reqNotify():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ } else {
+ msg[0] = THREAD_NOTIFY_REQUEST;
+ msg[1] = numoid;
+ /* Send array of oids */
+ size = sizeof(unsigned int);
+ {
+ i = 0;
+ while(i < numoid) {
+ oid = oidarry[i];
+ *((unsigned int *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned int);
+ i++;
+ }
+ }
+
+ /* Send array of version */
+ {
+ i = 0;
+ while(i < numoid) {
+ version = versionarry[i];
+ *((unsigned short *)(&msg[1] + size)) = oid;
+ size += sizeof(unsigned short);
+ i++;
+ }
+ }
+
+ *((unsigned int *)(&msg[1] + size)) = myIpAddr;
+ size += sizeof(unsigned int);
+ *((unsigned int *)(&msg[1] + size)) = threadid;
+
+ pthread_mutex_lock(&threadnotify);
+ bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 2 * sizeof(unsigned int) , 0);
+ if (bytesSent < 0){
+ perror("reqNotify():send()");
+ status = -1;
+ } else if (bytesSent != 1 + 5*sizeof(unsigned int)){
+ printf("reNotify(): error, sent %d bytes\n", bytesSent);
+ status = -1;
+ } else {
+ status = 0;
+ }
+ pthread_cond_wait(&threadcond, &threadnotify);
+ pthread_mutex_unlock(&threadnotify);
+ }
+
+ close(sock);
+}
+
+void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) {
+ pthread_cond_t ret;
+ //Look up the tid and call the corresponding pthread_cond_signal
+ ret = notifyhashSearch(tid);
+ pthread_cond_signal(&ret);
+ //TODO process oid and version
+}
+
+int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) {
+ threadlist_t *ptr;
+ unsigned int mid;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)];
+ int sock, status, size, bytesSent;
+ while(*head != NULL) {
+ ptr = *head;
+ mid = ptr->mid;
+ //create a socket connection to that machine
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
+ perror("notifyAll():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+ //send Thread Notify response and threadid to that machine
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
+ printf("notifyAll():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ } else {
+ msg[0] = THREAD_NOTIFY_RESPONSE;
+ msg[1] = oid;
+ size = sizeof(unsigned int);
+ *(&msg[1]+ size) = version;
+ size+= sizeof(unsigned short);
+ *(&msg[1]+ size) = ptr->threadid;
+
+ bytesSent = send(sock, msg, 1 + 2*sizeof(unsigned int) + sizeof(unsigned short), 0);
+ if (bytesSent < 0){
+ perror("notifyAll():send()");
+ status = -1;
+ } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){
+ printf("notifyAll(): error, sent %d bytes\n", bytesSent);
+ status = -1;
+ } else {
+ status = 0;
+ }
+ }
+ //close socket
+ close(sock);
+ // Update head
+ *head = ptr->next;
+ free(ptr);
+ }
+}
+
+void transAbort(transrecord_t *trans) {
+ objstrDelete(trans->cache);
+ chashDelete(trans->lookupTable);
+ free(trans);
+}
#include "option.h"
#include <signal.h>
#include <DSTM/interface/dstm.h>
+#include <DSTM/interface/llookup.h>
#include <stdio.h>
int threadcount;
pthread_mutex_t threadnotifylock;
pthread_cond_t threadnotifycond;
transrecord_t * trans;
+pthread_key_t oid;
void threadexit() {
- void *ptr;
+ objheader_t* ptr;
+ void *value;
+ unsigned int oidvalue;
+
#ifdef THREADS
struct ___Object___ *ll=pthread_getspecific(threadlocks);
while(ll!=NULL) {
threadcount--;
pthread_cond_signal(&gccond);
pthread_mutex_unlock(&gclistlock);
+#ifdef DSTM
/* Add transaction to check if thread finished for join operation */
+ value = pthread_getspecific(oid);
+ oidvalue = *((unsigned int *)value);
goto transstart;
-transretry:
-
transstart:
- trans = transStart();
- ptr = (void *)transRead(trans, (unsigned int) a);
- struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t));
- tmp->___threadDone___ = 1;
- if(transCommit(trans)) {
- goto transretry;
- } else {
- COMMIT_OBJ();
+ trans = transStart();
+ ptr = transRead(trans, oidvalue);
+ struct ___Thread___ *p = (struct ___Thread___ *) ptr;
+ p->___threadDone___ = 1;
+ while(!transCommit(trans)) {
+ printf("DEBUG-> Trans not committed yet\n");
+ transAbort(trans);
+ goto transstart;
}
+#endif
pthread_exit(NULL);
}
}
/* Add thread join capability */
-#ifdef DSTM
void CALL01(___Thread______join____, struct ___Thread___ * ___this___) {
- pthread_t thread;
printf("DEBUG -> Inside thread join\n");
- int status;
- if(VAR(___this___)->___threadDone___) {
+#ifdef DSTM
+ pthread_t thread;
+ unsigned int *oidarray, mid;
+ unsigned short *versionarray, version;
+ transrecord_t *trans;
+ objheader_t *ptr;
+ /* Add transaction to check if thread finished for join operation */
+transstart:
+ trans = transStart();
+ ptr = transRead(trans, (unsigned int) VAR(___this___));
+ struct ___Thread___ *p = (struct ___Thread___ *) ptr;
+ if(p->___threadDone___ == 1) {
+ transAbort(trans);
return;
} else {
- /* Request Notification */
- pthread_cond_broadcast(&objcond);
- pthread_mutex_unlock(&objlock);
- pthread_mutex_lock(&threadnotifylock);//wake everyone up
- status = reqNotify((unsigned int)VAR(___this___));
-
- if((status = reqNotify((unsigned int)VAR(___this___))) != 0) {
- printf("No notification is sent %s, %d\n", __FILE__, __LINE__);
- } else {
+ version = (ptr-1)->version;
+ if((oidarray = calloc(1, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return;
+ }
+
+ oidarray[0] = (unsigned int) VAR(___this___);
+
+ if((versionarray = calloc(1, sizeof(unsigned short))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ free(oidarray);
return;
}
+ versionarray[0] = version;
+ mid = lhashSearch((unsigned int) VAR(___this___));
+ /* Request Notification */
+ reqNotify(oidarray, versionarray, mid, 1);
+ free(oidarray);
+ free(versionarray);
+ transAbort(trans);
+ goto transstart;
}
-}
+ return;
#endif
+}
#ifdef THREADS
void CALL01(___Thread______nativeCreate____, struct ___Thread___ * ___this___) {
#endif
#ifdef DSTM
+void globalDestructor(void *value) {
+ free(value);
+ pthread_setspecific(oid, NULL);
+}
+
void initDSMthread(int *ptr) {
+ objheader_t *tmp;
+ void *threadData;
int oid=ptr[0];
int type=ptr[1];
free(ptr);
#else
((void (*)(void *))virtualtable[type*MAXCOUNT+RUNMETHOD])(oid);
#endif
+ threadData = calloc(1, sizeof(unsigned int));
+ *((unsigned int *) threadData) = oid;
+ pthread_setspecific(oid, threadData);
pthread_mutex_lock(&gclistlock);
- threadcount--;
- pthread_cond_signal(&gccond);
- pthread_mutex_unlock(&gclistlock);
- /* Add transaction to check if thread finished for join operation */
- goto transstart;
-transretry:
- //TODO
-
+ threadcount--;
+ pthread_cond_signal(&gccond);
+ pthread_mutex_unlock(&gclistlock);
+ /* Add transaction to check if thread finished for join operation */
+ goto transstart;
transstart:
- trans = transStart();
- ptr = (void *)transRead(trans, (unsigned int) oid);
- struct ___Thread___ *tmp = ((char *) ptr + sizeof(objheader_t));
- tmp->___threadDone___ = 1;
- if(transCommit(trans)) {
- goto transretry;
- } else {
- //TODO
- }
+ trans = transStart();
+ tmp = transRead(trans, (unsigned int) oid);
+ struct ___Thread___ *t = (struct ___Thread___ *) tmp;
+ t->___threadDone___ = 1;
+ while(!transCommit(trans)) {
+ printf("DEBUG-> Trans not committed yet\n");
+ transAbort(trans);
+ goto transstart;
+ }
+ pthread_exit(NULL);
}
void startDSMthread(int oid, int objType) {
int * ptr=malloc(sizeof(int)*2);
ptr[0]=oid;
ptr[1]=objType;
+ pthread_key_create(&oid, globalDestructor);
do {
retval=pthread_create(&thread, &nattr, (void * (*)(void *)) &initDSMthread, ptr);
if (retval!=0)
public class Atomic3 extends Thread {
public Atomic3() {
}
- static Tree root;
+ Tree root;
Integer count;
public static void main(String[] st) {
int mid = (128<<24)|(195<<16)|(175<<8)|70;
int b;
Atomic3 at3 = null;
- Integer z;
+ Integer y,z;
atomic {
at3 = global new Atomic3();
z = global new Integer(300);
at3.root.insert(z);
b = at3.root.value.intValue();
}
+ System.printString("b is ");
+ System.printInt(b);
atomic{
at3.root.item = 2445;
+ y = global new Integer(400);
+ at3.root.value = y;
+ b = at3.root.value.intValue();
}
+ System.printString("b is ");
System.printInt(b);
System.printString("\n");
System.printString("Starting\n");
public int run() {
int a;
atomic {
- //FIXME a bug value of trans commit is not saved
a = root.value.intValue();
- //a = root.item;
}
+ System.printString("a is ");
System.printInt(a);
System.printString("\n");
}
public boolean isSenior() {
if(this.getAge() > 65)
return true;
- return false;;
+ return false;
}
}
if $DSMFLAG
then
EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
fi
if $RECOVERFLAG