X-Git-Url: http://demsky.eecs.uci.edu/git/?a=blobdiff_plain;f=Robust%2Fsrc%2FRuntime%2FDSTM%2Finterface%2Ftrans.c;h=d678293433e7fc2701dcadb7d6aeeeb10c5d13f0;hb=cdcf09c40af1419fa42932aae249cb79b69b5daf;hp=881baf1fba251acfcc921d89ac8fbc4a5dd4db8e;hpb=7cc26734a218bb53fad56f09220be58d2509ec39;p=IRC.git diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 881baf1f..d6782934 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -6,6 +6,7 @@ #include "llookup.h" #include "plookup.h" #include "prelookup.h" +#include "threadnotify.h" #include "queue.h" #include #include @@ -17,25 +18,56 @@ #include #include #include -#include +#ifdef COMPILER +#include "thread.h" +#endif #define LISTEN_PORT 2156 #define RECEIVE_BUFFER_SIZE 2048 #define NUM_THREADS 10 #define PREFETCH_CACHE_SIZE 1048576 //1MB +#define CONFIG_FILENAME "dstm.conf" /* Global Variables */ extern int classsize[]; -extern primarypfq_t pqueue; // shared prefetch queue +extern primarypfq_t pqueue; //Shared prefetch queue extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids -objstr_t *prefetchcache; //Global Prefetch cache +objstr_t *prefetchcache; //Global Prefetch cache +pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache +pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */ +extern pthread_mutex_t mainobjstore_mutex;// Mutex to lock main Object store extern prehashtable_t pflookup; //Global Prefetch cache's lookup table pthread_t wthreads[NUM_THREADS]; //Worker threads for working on the prefetch queue -pthread_t tPrefetch; +pthread_t tPrefetch; /* Primary Prefetch thread that processes the prefetch queue */ extern objstr_t *mainobjstore; unsigned int myIpAddr; - +unsigned int *hostIpAddrs; +int sizeOfHostArray; +int numHostsInSystem; +int myIndexInHostArray; +unsigned int oidsPerBlock; +unsigned int oidMin; +unsigned int oidMax; +void *mlist[10000]; +pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER; + +void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); + +void printhex(unsigned char *ptr, int numBytes) +{ + int i; + for (i = 0; i < numBytes; i++) + { + if (ptr[i] < 16) + printf("0%x ", ptr[i]); + else + printf("%x ", ptr[i]); + } + printf("\n"); + return; +} + inline int arrayLength(int *array) { int i; for(i=0 ;array[i] != -1; i++) @@ -54,56 +86,87 @@ inline int findmax(int *array, int arraylength) { } /* This function is a prefetch call generated by the compiler that * populates the shared primary prefetch queue*/ -void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfields) { +void prefetch(int ntuples, unsigned int *oids, unsigned short *endoffsets, short *arrayfields) { int qnodesize; int len = 0; - + int i, rc; + /* Allocate for the queue node*/ char *node; - qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); - if((node = calloc(1, qnodesize)) == NULL) { - printf("Calloc Error %s, %d\n", __FILE__, __LINE__); - return; + if(ntuples > 0) { + qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(unsigned short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(unsigned short); + if((node = calloc(1, qnodesize)) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return; + } + /* Set queue node values */ + len = sizeof(prefetchqelem_t); + memcpy(node + len, &ntuples, sizeof(int)); + len += sizeof(int); + memcpy(node + len, oids, ntuples*sizeof(unsigned int)); + len += ntuples * sizeof(unsigned int); + memcpy(node + len, endoffsets, ntuples*sizeof(unsigned short)); + len += ntuples * sizeof(unsigned short); + memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(unsigned short)); + /* Lock and insert into primary prefetch queue */ + pthread_mutex_lock(&pqueue.qlock); + pre_enqueue((prefetchqelem_t *)node); + pthread_cond_signal(&pqueue.qcond); + pthread_mutex_unlock(&pqueue.qlock); } - /* Set queue node values */ - len = sizeof(prefetchqelem_t); - memcpy(node + len, &ntuples, sizeof(int)); - len += sizeof(int); - memcpy(node + len, oids, ntuples*sizeof(unsigned int)); - len += ntuples * sizeof(unsigned int); - memcpy(node + len, endoffsets, ntuples*sizeof(short)); - len += ntuples * sizeof(short); - memcpy(node + len, arrayfields, endoffsets[ntuples-1]*sizeof(short)); - /* Lock and insert into primary prefetch queue */ - pthread_mutex_lock(&pqueue.qlock); - pre_enqueue((prefetchqelem_t *)node); - pthread_cond_signal(&pqueue.qcond); - pthread_mutex_unlock(&pqueue.qlock); } /* This function starts up the transaction runtime. */ int dstmStartup(const char * option) { - pthread_t thread_Listen; - pthread_attr_t attr; - int master=strcmp(option, "master")==0; + pthread_t thread_Listen; + pthread_attr_t attr; + int master=option!=NULL && strcmp(option, "master")==0; - myIpAddr = getMyIpAddr("eth0"); + if (processConfigFile() != 0) + return 0; //TODO: return error value, cause main program to exit +#ifdef COMPILER + if (!master) + threadcount--; +#endif - dstmInit(); - transInit(); + dstmInit(); + transInit(); - if (master) { - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_create(&thread_Listen, &attr, dstmListen, NULL); - return 1; - } else { - dstmListen(); - return 0; - } + if (master) { + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + pthread_create(&thread_Listen, &attr, dstmListen, NULL); + return 1; + } else { + dstmListen(); + return 0; + } } +//TODO Use this later +void *pCacheAlloc(objstr_t *store, unsigned int size) { + void *tmp; + objstr_t *ptr; + ptr = store; + int success = 0; + + while(ptr->next != NULL) { + /* check if store is empty */ + if(((unsigned int)ptr->top - (unsigned int)ptr - sizeof(objstr_t) + size) <= ptr->size) { + tmp = ptr->top; + ptr->top += size; + success = 1; + return tmp; + } else { + ptr = ptr-> next; + } + } + + if(success == 0) { + return NULL; + } +} /* This function initiates the prefetch thread * A queue is shared between the main thread of execution @@ -112,8 +175,18 @@ int dstmStartup(const char * option) { * processes the prefetch requests */ void transInit() { int t, rc; + int retval; //Create and initialize prefetch cache structure prefetchcache = objstrCreate(PREFETCH_CACHE_SIZE); + //prefetchcache->next = objstrCreate(PREFETCH_CACHE_SIZE); + //prefetchcache->next->next = objstrCreate(PREFETCH_CACHE_SIZE); + + /* Initialize attributes for mutex */ + pthread_mutexattr_init(&prefetchcache_mutex_attr); + pthread_mutexattr_settype(&prefetchcache_mutex_attr, PTHREAD_MUTEX_RECURSIVE_NP); + + pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr); + //Create prefetch cache lookup table if(prehashCreate(HASH_SIZE, LOADFACTOR)) return; //Failure @@ -121,16 +194,20 @@ void transInit() { queueInit(); //Initialize machine pile w/prefetch oids and offsets shared queue mcpileqInit(); + //Create the primary prefetch thread - pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + do { + retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); + } while(retval!=0); + pthread_detach(tPrefetch); + //Create and Initialize a pool of threads /* Threads are active for the entire period runtime is running */ for(t = 0; t< NUM_THREADS; t++) { + do { rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t); - if (rc) { - printf("Thread create error %s, %d\n", __FILE__, __LINE__); - return; - } + } while(rc!=0); + pthread_detach(wthreads[t]); } } @@ -146,22 +223,22 @@ void transExit() { /* This functions inserts randowm wait delays in the order of msec * Mostly used when transaction commits retry*/ -void randomdelay(void) +void randomdelay() { - struct timespec req, rem; + struct timespec req; time_t t; t = time(NULL); req.tv_sec = 0; req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec - nanosleep(&req, &rem); + nanosleep(&req, NULL); return; } /* This function initializes things required in the transaction start*/ transrecord_t *transStart() { - transrecord_t *tmp = malloc(sizeof(transrecord_t)); + transrecord_t *tmp = calloc(1, sizeof(transrecord_t)); tmp->cache = objstrCreate(1048576); tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR); #ifdef COMPILER @@ -175,85 +252,118 @@ transrecord_t *transStart() objheader_t *transRead(transrecord_t *record, unsigned int oid) { unsigned int machinenumber; objheader_t *tmp, *objheader; - void *objcopy; + objheader_t *objcopy; int size, rc, found = 0; void *buf; struct timespec ts; struct timeval tp; + + if(oid == 0) { + return NULL; + } rc = gettimeofday(&tp, NULL); + /* 1ms delay */ + tp.tv_usec += 1000; + if (tp.tv_usec >= 1000000) + { + tp.tv_usec -= 1000000; + tp.tv_sec += 1; + } /* Convert from timeval to timespec */ + ts.tv_sec = tp.tv_sec; ts.tv_nsec = tp.tv_usec * 1000; /* Search local transaction cache */ if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){ - return(objheader); +#ifdef COMPILER + return &objheader[1]; +#else + return objheader; +#endif } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { /* Look up in machine lookup table and copy into cache*/ - tmp = mhashSearch(oid); - size = sizeof(objheader_t)+classsize[TYPE(tmp)]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)objheader, size); + GETSIZE(size, objheader); + size += sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, objheader, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, OID(objheader), objcopy); - return(objcopy); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ - found = 1; - size = sizeof(objheader_t)+classsize[TYPE(tmp)]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); + GETSIZE(size, tmp); + size+=sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, tmp, size); /* Insert into cache's lookup table */ chashInsert(record->lookupTable, OID(tmp), objcopy); - return(objcopy); - } else { /* If not found anywhere, then block until object appears in prefetch cache */ -#if 0 - printf("Inside remote machine\n"); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else { + /*If object not found in prefetch cache then block until object appears in the prefetch cache */ pthread_mutex_lock(&pflookup.lock); while(!found) { rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts); - if(rc == ETIMEDOUT) { - printf("Wait timed out\n"); - /* Check Prefetch cache again */ - if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */ - found = 1; - size = sizeof(objheader_t)+classsize[TYPE(tmp)]; - objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)tmp, size); - /* Insert into cache's lookup table */ - chashInsert(record->lookupTable, OID(tmp), objcopy); - return(objcopy); - } else { - pthread_mutex_unlock(&pflookup.lock); - break; - } + /* Check Prefetch cache again */ + if((tmp =(objheader_t *) prehashSearch(oid)) != NULL) { + found = 1; + GETSIZE(size,tmp); + size+=sizeof(objheader_t); + objcopy = (objheader_t *) objstrAlloc(record->cache, size); + memcpy(objcopy, tmp, size); + chashInsert(record->lookupTable, OID(tmp), objcopy); + pthread_mutex_unlock(&pflookup.lock); +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif + } else if (rc == ETIMEDOUT) { pthread_mutex_unlock(&pflookup.lock); + break; } } -#endif + /* Get the object from the remote location */ machinenumber = lhashSearch(oid); objcopy = getRemoteObj(record, machinenumber, oid); if(objcopy == NULL) { - //If object is not found in Remote location + printf("Error: Object not found in Remote location %s, %d\n", __FILE__, __LINE__); return NULL; + } else { +#ifdef COMPILER + return &objcopy[1]; +#else + return objcopy; +#endif } - else { - return(objcopy); - } - } + } } /* This function creates objects in the transaction record */ objheader_t *transCreateObj(transrecord_t *record, unsigned int size) { - objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); - OID(tmp) = getNewOID(); - tmp->version = 1; - tmp->rcount = 1; - STATUS(tmp) = NEW; - chashInsert(record->lookupTable, OID(tmp), tmp); - return tmp; + objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + size)); + tmp->notifylist = NULL; + OID(tmp) = getNewOID(); + tmp->version = 1; + tmp->rcount = 1; + STATUS(tmp) = NEW; + chashInsert(record->lookupTable, OID(tmp), tmp); + +#ifdef COMPILER + return &tmp[1]; //want space after object header +#else + return tmp; +#endif } /* This function creates machine piles based on all machines involved in a @@ -266,7 +376,7 @@ plistnode_t *createPiles(transrecord_t *record) { unsigned int machinenum; void *localmachinenum; objheader_t *headeraddr; - + ptr = record->lookupTable->table; size = record->lookupTable->size; @@ -286,7 +396,7 @@ plistnode_t *createPiles(transrecord_t *record) { } //Get machine location for object id (and whether local or not) - if (STATUS(headeraddr) & NEW || mhashSearch(curr->key) != NULL) { + if (STATUS(headeraddr) & NEW || (mhashSearch(curr->key) != NULL)) { machinenum = myIpAddr; } else if ((machinenum = lhashSearch(curr->key)) == 0) { printf("Error: No such machine %s, %d\n", __FILE__, __LINE__); @@ -313,8 +423,8 @@ plistnode_t *createPiles(transrecord_t *record) { int transCommit(transrecord_t *record) { unsigned int tot_bytes_mod, *listmid; plistnode_t *pile, *pile_ptr; - int i, rc, val; - int pilecount = 0, offset, threadnum = 0, trecvcount = 0, tmachcount = 0; + int i, j, rc, val; + int pilecount, offset, threadnum = 0, trecvcount = 0; char buffer[RECEIVE_BUFFER_SIZE],control; char transid[TID_LEN]; trans_req_data_t *tosend; @@ -323,125 +433,186 @@ int transCommit(transrecord_t *record) { char treplyctrl = 0, treplyretry = 0; /* keeps track of the common response that needs to be sent */ char localstat = 0; - do { - /* Look through all the objects in the transaction record and make piles - * for each machine involved in the transaction*/ - pile_ptr = pile = createPiles(record); - /* Create the packet to be sent in TRANS_REQUEST */ + /* Look through all the objects in the transaction record and make piles + * for each machine involved in the transaction*/ + pile_ptr = pile = createPiles(record); - /* Count the number of participants */ - pilecount = pCount(pile); + /* Create the packet to be sent in TRANS_REQUEST */ - /* Create a list of machine ids(Participants) involved in transaction */ - if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); - return 1; - } - pListMid(pile, listmid); - - - /* Initialize thread variables, - * Spawn a thread for each Participant involved in a transaction */ - pthread_t thread[pilecount]; - pthread_attr_t attr; - pthread_cond_t tcond; - pthread_mutex_t tlock; - pthread_mutex_t tlshrd; - - thread_data_array_t *thread_data_array; - thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount); - local_thread_data_array_t *ltdata; - if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { + /* Count the number of participants */ + pilecount = pCount(pile); + + /* Create a list of machine ids(Participants) involved in transaction */ + if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + return 1; + } + pListMid(pile, listmid); + + + /* Initialize thread variables, + * Spawn a thread for each Participant involved in a transaction */ + pthread_t thread[pilecount]; + pthread_attr_t attr; + pthread_cond_t tcond; + pthread_mutex_t tlock; + pthread_mutex_t tlshrd; + + thread_data_array_t *thread_data_array; + if((thread_data_array = (thread_data_array_t *) calloc(pilecount, sizeof(thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + return 1; + } + + local_thread_data_array_t *ltdata; + if((ltdata = calloc(1, sizeof(local_thread_data_array_t))) == NULL) { + printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + return 1; + } + + thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ + + /* Initialize and set thread detach attribute */ + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_mutex_init(&tlock, NULL); + pthread_cond_init(&tcond, NULL); + + /* Process each machine pile */ + while(pile != NULL) { + //Create transaction id + newtid++; + if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + free(thread_data_array); + free(ltdata); return 1; } - - thread_response_t rcvd_control_msg[pilecount]; /* Shared thread array that keeps track of responses of participants */ - - /* Initialize and set thread detach attribute */ - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); - pthread_mutex_init(&tlock, NULL); - pthread_cond_init(&tcond, NULL); - - /* Process each machine pile */ - while(pile != NULL) { - //Create transaction id - newtid++; - if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) { - printf("Calloc error %s, %d\n", __FILE__, __LINE__); + tosend->f.control = TRANS_REQUEST; + sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); + tosend->f.mcount = pilecount; + tosend->f.numread = pile->numread; + tosend->f.nummod = pile->nummod; + tosend->f.numcreated = pile->numcreated; + tosend->f.sum_bytes = pile->sum_bytes; + tosend->listmid = listmid; + tosend->objread = pile->objread; + tosend->oidmod = pile->oidmod; + tosend->oidcreated = pile->oidcreated; + thread_data_array[threadnum].thread_id = threadnum; + thread_data_array[threadnum].mid = pile->mid; + thread_data_array[threadnum].buffer = tosend; + thread_data_array[threadnum].recvmsg = rcvd_control_msg; + thread_data_array[threadnum].threshold = &tcond; + thread_data_array[threadnum].lock = &tlock; + thread_data_array[threadnum].count = &trecvcount; + thread_data_array[threadnum].replyctrl = &treplyctrl; + thread_data_array[threadnum].replyretry = &treplyretry; + thread_data_array[threadnum].rec = record; + /* If local do not create any extra connection */ + if(pile->mid != myIpAddr) { /* Not local */ + do { + rc = pthread_create(&thread[threadnum], &attr, transRequest, (void *) &thread_data_array[threadnum]); + } while(rc!=0); + if(rc) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); return 1; } - tosend->f.control = TRANS_REQUEST; - sprintf(tosend->f.trans_id, "%x_%d", pile->mid, newtid); - tosend->f.mcount = pilecount; - tosend->f.numread = pile->numread; - tosend->f.nummod = pile->nummod; - tosend->f.numcreated = pile->numcreated; - tosend->f.sum_bytes = pile->sum_bytes; - tosend->listmid = listmid; - tosend->objread = pile->objread; - tosend->oidmod = pile->oidmod; - tosend->oidcreated = pile->oidcreated; - thread_data_array[threadnum].thread_id = threadnum; - thread_data_array[threadnum].mid = pile->mid; - thread_data_array[threadnum].pilecount = pilecount; - thread_data_array[threadnum].buffer = tosend; - thread_data_array[threadnum].recvmsg = rcvd_control_msg; - thread_data_array[threadnum].threshold = &tcond; - thread_data_array[threadnum].lock = &tlock; - thread_data_array[threadnum].count = &trecvcount; - thread_data_array[threadnum].replyctrl = &treplyctrl; - thread_data_array[threadnum].replyretry = &treplyretry; - thread_data_array[threadnum].rec = record; - /* If local do not create any extra connection */ - if(pile->mid != myIpAddr) { /* Not local */ - rc = pthread_create(&thread[threadnum], NULL, transRequest, (void *) &thread_data_array[threadnum]); - if (rc) { - perror("Error in pthread create\n"); - return 1; - } - } else { /*Local*/ - ltdata->tdata = &thread_data_array[threadnum]; - ltdata->transinfo = &transinfo; - val = pthread_create(&thread[threadnum], NULL, handleLocalReq, (void *) ltdata); - if (val) { - perror("Error in pthread create\n"); - return 1; - } + } else { /*Local*/ + ltdata->tdata = &thread_data_array[threadnum]; + ltdata->transinfo = &transinfo; + do { + val = pthread_create(&thread[threadnum], &attr, handleLocalReq, (void *) ltdata); + } while(val!=0); + if(val) { + perror("Error in pthread create\n"); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (i = 0; i < threadnum; i++) + free(thread_data_array[i].buffer); + free(thread_data_array); + free(ltdata); + return 1; } - threadnum++; - pile = pile->next; } - /* Free attribute and wait for the other threads */ - pthread_attr_destroy(&attr); - for (i = 0 ;i < pilecount ; i++) { - rc = pthread_join(thread[i], NULL); - if (rc) - { - printf("ERROR return code from pthread_join() is %d\n", rc); - return 1; + threadnum++; + pile = pile->next; + } + + /* Free attribute and wait for the other threads */ + pthread_attr_destroy(&attr); + + for (i = 0; i < threadnum; i++) { + rc = pthread_join(thread[i], NULL); + if(rc) + { + printf("Error: return code from pthread_join() is %d\n", rc); + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + pDelete(pile_ptr); + free(listmid); + for (j = i; j < threadnum; j++) { + free(thread_data_array[j].buffer); } - free(thread_data_array[i].buffer); + return 1; } + free(thread_data_array[i].buffer); + } - /* Free resources */ - pthread_cond_destroy(&tcond); - pthread_mutex_destroy(&tlock); - free(listmid); - pDelete(pile_ptr); + /* Free resources */ + pthread_cond_destroy(&tcond); + pthread_mutex_destroy(&tlock); + free(listmid); + pDelete(pile_ptr); + + + if(treplyctrl == TRANS_ABORT) { + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); free(thread_data_array); free(ltdata); - - /* wait a random amount of time */ - if (treplyretry == 1) - randomdelay(); - - /* Retry trans commit procedure if not sucessful in the first try */ - } while (treplyretry == 1); + return TRANS_ABORT; + } else if(treplyctrl == TRANS_COMMIT) { + /* Free Resources */ + objstrDelete(record->cache); + chashDelete(record->lookupTable); + free(record); + free(thread_data_array); + free(ltdata); + return 0; + } else { + //TODO Add other cases + printf("DEBUG-> THIS SHOULD NOT HAPPEN.....EXIT PROGRAM\n"); + exit(-1); + } return 0; } @@ -453,18 +624,18 @@ int transCommit(transrecord_t *record) { void *transRequest(void *threadarg) { int sd, i, n; struct sockaddr_in serv_addr; - struct hostent *server; thread_data_array_t *tdata; objheader_t *headeraddr; char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol; char machineip[16], retval; + tdata = (thread_data_array_t *) threadarg; /* Send Trans Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket for TRANS_REQUEST\n"); - return NULL; + pthread_exit(NULL); } bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; @@ -475,21 +646,23 @@ void *transRequest(void *threadarg) { /* Open Connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { perror("Error in connect for TRANS_REQUEST\n"); - return NULL; + close(sd); + pthread_exit(NULL); } - printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip); /* Send bytes of data with TRANS_REQUEST control message */ if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) { perror("Error sending fixed bytes for thread\n"); - return NULL; + close(sd); + pthread_exit(NULL); } /* Send list of machines involved in the transaction */ { - int size=sizeof(unsigned int)*tdata->pilecount; + int size=sizeof(unsigned int)*tdata->buffer->f.mcount; if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) { perror("Error sending list of machines for thread\n"); - return NULL; + close(sd); + pthread_exit(NULL); } } /* Send oids and version number tuples for objects that are read */ @@ -497,25 +670,30 @@ void *transRequest(void *threadarg) { int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread; if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) { perror("Error sending tuples for thread\n"); - return NULL; + close(sd); + pthread_exit(NULL); } } /* Send objects that are modified */ for(i = 0; i < tdata->buffer->f.nummod ; i++) { int size; headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]); - size=sizeof(objheader_t)+classsize[TYPE(headeraddr)]; + GETSIZE(size,headeraddr); + size+=sizeof(objheader_t); if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) { perror("Error sending obj modified for thread\n"); - return NULL; + close(sd); + pthread_exit(NULL); } } /* Read control message from Participant */ if((n = read(sd, &control, sizeof(char))) <= 0) { perror("Error in reading control message from Participant\n"); - return NULL; + close(sd); + pthread_exit(NULL); } + recvcontrol = control; /* Update common data structure and increment count */ @@ -528,13 +706,8 @@ void *transRequest(void *threadarg) { (*(tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ - if(*(tdata->count) == tdata->pilecount) { - if (decideResponse(tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); - close(sd); - return NULL; - } + if(*(tdata->count) == tdata->buffer->f.mcount) { + decideResponse(tdata); pthread_cond_broadcast(tdata->threshold); } else { pthread_cond_wait(tdata->threshold, tdata->lock); @@ -545,9 +718,20 @@ void *transRequest(void *threadarg) { * to all participants in their respective socket */ if (sendResponse(tdata, sd) == 0) { printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(tdata->lock); close(sd); - return NULL; + pthread_exit(NULL); + } + + do { + retval = recv((int)sd, &control, sizeof(char), 0); + } while (retval < sizeof(char)); + + if(control == TRANS_UNSUCESSFUL) { + //printf("DEBUG-> TRANS_ABORTED\n"); + } else if(control == TRANS_SUCESSFUL) { + //printf("DEBUG-> TRANS_SUCCESSFUL\n"); + } else { + //printf("DEBUG-> Error: Incorrect Transaction End Message %d\n", control); } /* Close connection */ @@ -557,15 +741,18 @@ void *transRequest(void *threadarg) { /* This function decides the reponse that needs to be sent to * all Participant machines after the TRANS_REQUEST protocol */ -int decideResponse(thread_data_array_t *tdata) { +void decideResponse(thread_data_array_t *tdata) { char control; int i, transagree = 0, transdisagree = 0, transsoftabort = 0; /* Counters to formulate decision of what message to send */ - for (i = 0 ; i < tdata->pilecount ; i++) { + for (i = 0 ; i < tdata->buffer->f.mcount; i++) { control = tdata->recvmsg[i].rcv_status; /* tdata: keeps track of all participant responses written onto the shared array */ switch(control) { + default: + printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); + /* treat as disagree, pass thru */ case TRANS_DISAGREE: transdisagree++; break; @@ -577,45 +764,49 @@ int decideResponse(thread_data_array_t *tdata) { case TRANS_SOFT_ABORT: transsoftabort++; break; - default: - printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__); - return -1; } } - /* Send Abort */ if(transdisagree > 0) { + /* Send Abort */ *(tdata->replyctrl) = TRANS_ABORT; - /* Free resources */ - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transagree == tdata->pilecount){ + *(tdata->replyretry) = 0; + /* clear objects from prefetch cache */ + for (i = 0; i < tdata->buffer->f.numread; i++) + prehashRemove(*((unsigned int *)(((char *)tdata->buffer->objread) + (sizeof(unsigned int) + sizeof(unsigned short))*i))); + for (i = 0; i < tdata->buffer->f.nummod; i++) + prehashRemove(tdata->buffer->oidmod[i]); + } else if(transagree == tdata->buffer->f.mcount){ /* Send Commit */ *(tdata->replyctrl) = TRANS_COMMIT; - /* Free resources */ - objstrDelete(tdata->rec->cache); - chashDelete(tdata->rec->lookupTable); - free(tdata->rec); - } else if(transsoftabort > 0 && transdisagree == 0) { + *(tdata->replyretry) = 0; + } else { /* Send Abort in soft abort case followed by retry commiting transaction again*/ *(tdata->replyctrl) = TRANS_ABORT; *(tdata->replyretry) = 1; - } else { - return -1; } - return 0; + return; } -/* This function sends the final response to remote machines per thread in their respective socket id */ +/* This function sends the final response to remote machines per thread in their respective socket id + * It returns a char that is only needed to check the correctness of execution of this function inside + * transRequest()*/ char sendResponse(thread_data_array_t *tdata, int sd) { - int n, N, sum, oidcount = 0; + int n, N, sum, oidcount = 0, control; char *ptr, retval = 0; unsigned int *oidnotfound; + control = *(tdata->replyctrl); + if (send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { + perror("Error sending ctrl message for participant\n"); + return 0; + } + + //FIXME read missing objects /* If the decided response is due to a soft abort and missing objects at the Participant's side */ + /* if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { - /* Read list of objects missing */ + // Read list of objects missing if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) { N = oidcount * sizeof(unsigned int); if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) { @@ -630,61 +821,57 @@ char sendResponse(thread_data_array_t *tdata, int sd) { } retval = TRANS_SOFT_ABORT; } + */ + /* If the decided response is TRANS_ABORT */ if(*(tdata->replyctrl) == TRANS_ABORT) { retval = TRANS_ABORT; - } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ + } else if(*(tdata->replyctrl) == TRANS_COMMIT) { /* If the decided response is TRANS_COMMIT */ retval = TRANS_COMMIT; } - - if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) { - perror("Error sending ctrl message for participant\n"); - } - + return retval; } /* This function opens a connection, places an object read request to the * remote machine, reads the control message and object if available and * copies the object and its header to the local cache. - * TODO replace mnum and midtoIP() with MACHINE_IP address later */ + * */ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int sd, size, val; struct sockaddr_in serv_addr; - struct hostent *server; char control; char machineip[16]; objheader_t *h; void *objcopy; + if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { perror("Error in socket\n"); return NULL; } + bzero((char*) &serv_addr, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); midtoIP(mnum,machineip); machineip[15] = '\0'; serv_addr.sin_addr.s_addr = inet_addr(machineip); + /* Open connection */ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect\n"); + perror("getRemoteObj() Error in connect\n"); return NULL; } char readrequest[sizeof(char)+sizeof(unsigned int)]; readrequest[0] = READ_REQUEST; *((unsigned int *)(&readrequest[1])) = oid; - if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { + if (send(sd, readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) { perror("Error sending message\n"); return NULL; } -#ifdef DEBUG1 - printf("DEBUG -> ready to rcv ...\n"); -#endif /* Read response from the Participant */ if((val = read(sd, &control, sizeof(char))) <= 0) { perror("No control response for getRemoteObj sent\n"); @@ -700,7 +887,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } objcopy = objstrAlloc(record->cache, size); - if((val = read(sd, objcopy, size)) <= 0) { + if((val = read(sd, (char *)objcopy, size)) <= 0) { perror("No objects are read from the remote participant\n"); return NULL; } @@ -723,73 +910,51 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { * based on common agreement it either commits or aborts the transaction. * It also frees the memory resources */ void *handleLocalReq(void *threadarg) { - int val, i = 0, size, offset = 0; - short version; - char control = 0, *ptr; - unsigned int oid; unsigned int *oidnotfound = NULL, *oidlocked = NULL; - void *mobj, *modptr; - objheader_t *headptr, *headeraddr; local_thread_data_array_t *localtdata; + int objnotfound = 0, objlocked = 0; + int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; + int numread, i; + unsigned int oid; + unsigned short version; + void *mobj; + objheader_t *headptr; localtdata = (local_thread_data_array_t *) threadarg; /* Counters and arrays to formulate decision on control message to be sent */ oidnotfound = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); oidlocked = (unsigned int *) calloc((localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod), sizeof(unsigned int)); - int objnotfound = 0, objlocked = 0; - int v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0; - - /* modptr points to the beginning of the object store - * created at the Pariticipant */ - if ((modptr = objstrAlloc(mainobjstore, localtdata->tdata->buffer->f.sum_bytes)) == NULL) { - printf("objstrAlloc error for modified objects %s, %d\n", __FILE__, __LINE__); - return NULL; - } - /* Write modified objects into the mainobject store */ - for(i = 0; i< localtdata->tdata->buffer->f.nummod; i++) { - headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i]); - size = sizeof(objheader_t) + classsize[TYPE(headeraddr)]; - memcpy(modptr+offset, headeraddr, size); - offset += size; - } - /* Write new objects into the mainobject store */ - for(i = 0; i< localtdata->tdata->buffer->f.numcreated; i++) { - headeraddr = chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidcreated[i]); - size = sizeof(objheader_t) + classsize[TYPE(headeraddr)]; - memcpy(modptr+offset, headeraddr, size); - offset += size; - } - - ptr = modptr; - offset = 0; //Reset + numread = localtdata->tdata->buffer->f.numread; /* Process each oid in the machine pile/ group per thread */ for (i = 0; i < localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod; i++) { - if (i < localtdata->tdata->buffer->f.numread) {//Objs only read and not modified + if (i < localtdata->tdata->buffer->f.numread) { int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array incr *= i; - oid = *((unsigned int *)(localtdata->tdata->buffer->objread + incr)); - incr += sizeof(unsigned int); - version = *((short *)(localtdata->tdata->buffer->objread + incr)); - } else {//Objs modified - headptr = (objheader_t *)ptr; + oid = *((unsigned int *)(((char *)localtdata->tdata->buffer->objread) + incr)); + version = *((short *)(((char *)localtdata->tdata->buffer->objread) + incr + sizeof(unsigned int))); + } else { // Objects Modified + int tmpsize; + headptr = (objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, localtdata->tdata->buffer->oidmod[i-numread]); + if (headptr == NULL) { + printf("Error: handleLocalReq() returning NULL %s, %d\n", __FILE__, __LINE__); + return NULL; + } oid = OID(headptr); version = headptr->version; - ptr += sizeof(objheader_t) + classsize[TYPE(headptr)]; } - /* Check if object is still present in the machine since the beginning of TRANS_REQUEST */ /* Save the oids not found and number of oids not found for later use */ - if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */ + if ((mobj = mhashSearch(oid)) == NULL) { /* Obj not found */ /* Save the oids not found and number of oids not found for later use */ oidnotfound[objnotfound] = oid; objnotfound++; } else { /* If Obj found in machine (i.e. has not moved) */ /* Check if Obj is locked by any previous transaction */ - if (STATUS(((objheader_t *)mobj)) & LOCK) { - if (version == ((objheader_t *)mobj)->version) { /* If not locked then match versions */ + if ((STATUS((objheader_t *)mobj) & LOCK) == LOCK) { + if (version == ((objheader_t *)mobj)->version) { /* If locked then match versions */ v_matchlock++; } else {/* If versions don't match ...HARD ABORT */ v_nomatch++; @@ -798,9 +963,6 @@ void *handleLocalReq(void *threadarg) { } } else {/* If Obj is not locked then lock object */ STATUS(((objheader_t *)mobj)) |= LOCK; - //TODO Remove this for Testing - randomdelay(); - /* Save all object oids that are locked on this machine during this transaction request call */ oidlocked[objlocked] = OID(((objheader_t *)mobj)); objlocked++; @@ -813,8 +975,7 @@ void *handleLocalReq(void *threadarg) { } } } - } - + } // End for /* Condition to send TRANS_AGREE */ if(v_matchnolock == localtdata->tdata->buffer->f.numread + localtdata->tdata->buffer->f.nummod) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_AGREE; @@ -822,86 +983,60 @@ void *handleLocalReq(void *threadarg) { /* Condition to send TRANS_SOFT_ABORT */ if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) { localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT; - //TODO currently the only soft abort case that is supported is when object locked by previous - //transaction => v_matchlock > 0 - //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported - /* Send number of oids not found and the missing oids if objects are missing in the machine */ - /* TODO Remember to store the oidnotfound for later use - if(objnotfound != 0) { - int size = sizeof(unsigned int)* objnotfound; - } - */ } /* Fill out the trans_commit_data_t data structure. This is required for a trans commit process * if Participant receives a TRANS_COMMIT */ localtdata->transinfo->objlocked = oidlocked; localtdata->transinfo->objnotfound = oidnotfound; - localtdata->transinfo->modptr = modptr; + localtdata->transinfo->modptr = NULL; localtdata->transinfo->numlocked = objlocked; localtdata->transinfo->numnotfound = objnotfound; - /* Lock and update count */ //Thread sleeps until all messages from pariticipants are received by coordinator pthread_mutex_lock(localtdata->tdata->lock); (*(localtdata->tdata->count))++; /* keeps track of no of messages received by the coordinator */ /* Wake up the threads and invoke decideResponse (once) */ - if(*(localtdata->tdata->count) == localtdata->tdata->pilecount) { - if (decideResponse(localtdata->tdata) != 0) { - printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__); - pthread_mutex_unlock(localtdata->tdata->lock); - return NULL; - } + if(*(localtdata->tdata->count) == localtdata->tdata->buffer->f.mcount) { + decideResponse(localtdata->tdata); pthread_cond_broadcast(localtdata->tdata->threshold); } else { pthread_cond_wait(localtdata->tdata->threshold, localtdata->tdata->lock); } pthread_mutex_unlock(localtdata->tdata->lock); - - /*Based on DecideResponse(), Either COMMIT or ABORT the operation*/ if(*(localtdata->tdata->replyctrl) == TRANS_ABORT){ - if(transAbortProcess(modptr,oidlocked, localtdata->transinfo->numlocked, localtdata->tdata->buffer->f.nummod) != 0) { + if(transAbortProcess(localtdata) != 0) { printf("Error in transAbortProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } - }else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT){ - if(transComProcess(modptr, localtdata->tdata->buffer->oidmod, localtdata->tdata->buffer->oidcreated, oidlocked, localtdata->tdata->buffer->f.nummod, localtdata->tdata->buffer->f.numcreated, localtdata->transinfo->numlocked) != 0) { + } else if(*(localtdata->tdata->replyctrl) == TRANS_COMMIT) { + if(transComProcess(localtdata) != 0) { printf("Error in transComProcess() %s,%d\n", __FILE__, __LINE__); - return NULL; + pthread_exit(NULL); } } - /* Free memory */ - if (localtdata->transinfo->objlocked != NULL) { free(localtdata->transinfo->objlocked); - localtdata->transinfo->objlocked = NULL; } if (localtdata->transinfo->objnotfound != NULL) { free(localtdata->transinfo->objnotfound); - localtdata->transinfo->objnotfound = NULL; } pthread_exit(NULL); } -/* This function completes the ABORT process if the transaction is aborting -*/ -int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int nummod) { - char *ptr; - int i; - objheader_t *tmp_header; + +/* This function completes the ABORT process if the transaction is aborting */ +int transAbortProcess(local_thread_data_array_t *localtdata) { + int i, numlocked; + unsigned int *objlocked; void *header; - /* Set all ref counts as 1 and do garbage collection */ - ptr = (char *)modptr; - for(i = 0; i< nummod; i++) { - tmp_header = (objheader_t *)ptr; - tmp_header->rcount = 0; - ptr += sizeof(objheader_t) + classsize[TYPE(tmp_header)]; - } - /* Unlock objects that was locked due to this transaction */ - for(i = 0; i< numlocked; i++) { + numlocked = localtdata->transinfo->numlocked; + objlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < numlocked; i++) { if((header = mhashSearch(objlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; @@ -909,60 +1044,70 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int STATUS(((objheader_t *)header)) &= ~(LOCK); } - /* Send ack to Coordinator */ - - /*Free the pointer */ - ptr = NULL; return 0; } -/*This function completes the COMMIT process is the transaction is commiting -*/ -int transComProcess(void *modptr, unsigned int *oidmod, unsigned int *oidcreated, unsigned int *objlocked, int nummod, int numcreated, int numlocked) { - objheader_t *header; - int i = 0, offset = 0; - char control; - - /* Process each modified object saved in the mainobject store */ - for(i = 0; i < nummod; i++) { +/*This function completes the COMMIT process is the transaction is commiting*/ +int transComProcess(local_thread_data_array_t *localtdata) { + objheader_t *header, *tcptr; + int i, nummod, tmpsize, numcreated, numlocked; + unsigned int *oidmod, *oidcreated, *oidlocked; + void *ptrcreate; + + nummod = localtdata->tdata->buffer->f.nummod; + oidmod = localtdata->tdata->buffer->oidmod; + numcreated = localtdata->tdata->buffer->f.numcreated; + oidcreated = localtdata->tdata->buffer->oidcreated; + numlocked = localtdata->transinfo->numlocked; + oidlocked = localtdata->transinfo->objlocked; + + for (i = 0; i < nummod; i++) { if((header = (objheader_t *) mhashSearch(oidmod[i])) == NULL) { - printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); + printf("Error: transComProcess() mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } - /* Change reference count of older address and free space in objstr ?? */ - header->rcount = 0; - - /* Change ptr address in mhash table */ - mhashRemove(oidmod[i]); //TODO: this shouldn't be necessary - mhashInsert(oidmod[i], (((char *)modptr) + offset)); - offset += sizeof(objheader_t) + classsize[TYPE(header)]; - - /* Update object version number */ - header = (objheader_t *) mhashSearch(oidmod[i]); + /* Copy from transaction cache -> main object store */ + if ((tcptr = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidmod[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + pthread_mutex_lock(&mainobjstore_mutex); + memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize); header->version += 1; + if(header->notifylist != NULL) { + notifyAll(&header->notifylist, OID(header), header->version); + } + pthread_mutex_unlock(&mainobjstore_mutex); } - - for (i = 0; i < numcreated; i++) - { - header = (objheader_t *)(((char *)modptr) + offset); - mhashInsert(oidcreated[i], (((char *)modptr) + offset)); - offset += sizeof(objheader_t) + classsize[TYPE(header)]; - + /* If object is newly created inside transaction then commit it */ + for (i = 0; i < numcreated; i++) { + if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) { + printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__); + return 1; + } + GETSIZE(tmpsize, header); + tmpsize += sizeof(objheader_t); + pthread_mutex_lock(&mainobjstore_mutex); + if ((ptrcreate = objstrAlloc(mainobjstore, tmpsize)) == NULL) { + printf("Error: transComProcess() failed objstrAlloc %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mainobjstore_mutex); + return 1; + } + pthread_mutex_unlock(&mainobjstore_mutex); + memcpy(ptrcreate, header, tmpsize); + mhashInsert(oidcreated[i], ptrcreate); lhashInsert(oidcreated[i], myIpAddr); } - /* Unlock locked objects */ for(i = 0; i < numlocked; i++) { - if((header = (objheader_t *) mhashSearch(objlocked[i])) == NULL) { + if((header = (objheader_t *) mhashSearch(oidlocked[i])) == NULL) { printf("mhashsearch returns NULL at %s, %d\n", __FILE__, __LINE__); return 1; } STATUS(header) &= ~(LOCK); } - //TODO Update location lookup table - - /* Send ack to Coordinator */ return 0; } @@ -984,6 +1129,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) { oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ int numoffset[ntuples]; numoffset[0] = endoffsets[0]; @@ -992,10 +1138,10 @@ void checkPrefetchTuples(prefetchqelem_t *node) { } /* Check for redundant tuples by comparing oids of each tuple */ for(i = 0; i < ntuples; i++) { - if(oid[i] == -1) + if(oid[i] == 0) continue; for(j = i+1 ; j < ntuples; j++) { - if(oid[j] == -1) + if(oid[j] == 0) continue; /*If oids of tuples match */ if (oid[i] == oid[j]) { @@ -1015,102 +1161,31 @@ void checkPrefetchTuples(prefetchqelem_t *node) { if(i == 0) { k = 0; - index = endoffsets[j -1]; - for(count = 0; count < slength; count ++) { - if (arryfields[k] != arryfields[index]) { - break; - } - index++; - k++; - } } else { k = endoffsets[i-1]; - index = endoffsets[j-1]; - printf("Value of slength = %d\n", slength); - for(count = 0; count < slength; count++) { - if(arryfields[k] != arryfields[index]) { - break; - } - index++; - k++; - } } - + index = endoffsets[j -1]; + for(count = 0; count < slength; count ++) { + if (arryfields[k] != arryfields[index]) { + break; + } + index++; + k++; + } if(slength == count) { - oid[sindex] = -1; + oid[sindex] = 0; } } } } } - -void checkPreCache(prefetchqelem_t *node, int *numoffset, int counter, int loopcount, unsigned int objoid, int index, int iter, int oidnfound) { - char *ptr, *tmp; - int ntuples, i, k, flag; - unsigned int * oid; - short *endoffsets, *arryfields; - objheader_t *header; - - ptr = (char *) node; - ntuples = *(GET_NTUPLES(ptr)); - oid = GET_PTR_OID(ptr); - endoffsets = GET_PTR_EOFF(ptr, ntuples); - arryfields = GET_PTR_ARRYFLD(ptr, ntuples); - - if(oidnfound == 1) { - if((header = (objheader_t *) prehashSearch(objoid)) == NULL) { - return; - } else { //Found in Prefetch Cache - //TODO Decide if object is too old, if old remove from cache - tmp = (char *) header; - /* Check if any of the offset oid is available in the Prefetch cache */ - for(i = counter; i < loopcount; i++) { - objoid = *(tmp + sizeof(objheader_t) + arryfields[counter]); - if((header = (objheader_t *)prehashSearch(objoid)) != NULL) { - flag = 0; - } else { - flag = 1; - break; - } - } - } - } else { - for(i = counter; inext != NULL) { + if((tmp = (prefetchpile_t *) calloc(1, sizeof(prefetchpile_t))) == NULL) { + printf("Calloc error: %s %d\n", __FILE__, __LINE__); + return NULL; + } + tmp->mid = myIpAddr; + tmp->next = head; + head = tmp; + } else { + head->mid = myIpAddr; + } continue; + } /* For each tuple make piles */ if ((machinenum = lhashSearch(oid[i])) == 0) { printf("Error: No such Machine %s, %d\n", __FILE__, __LINE__); return NULL; } /* Insert into machine pile */ - offset = &arryfields[endoffsets[i-1]]; - insertPile(machinenum, oid[i], numoffset[i], offset, head); + if(i == 0){ + offset = &arryfields[0]; + } else { + offset = &arryfields[endoffsets[i-1]]; + } + + if((head = insertPile(machinenum, oid[i], numoffset[i], offset, head)) == NULL){ + printf("Error: Couldn't create a pile %s, %d\n", __FILE__, __LINE__); + return NULL; + } } return head; } - -/* This function checks if the oids within the prefetch tuples are available locally. - * If yes then makes the tuple invalid. If no then rearranges oid and offset values in - * the prefetchqelem_t node to represent a new prefetch tuple */ prefetchpile_t *foundLocal(prefetchqelem_t *node) { - int ntuples,i, j, k, oidnfound = 0, index, flag; + int ntuples,i, j, k, oidnfound = 0, arryfieldindex,nextarryfieldindex, flag = 0, val; unsigned int *oid; unsigned int objoid; + int isArray = 0; char *ptr, *tmp; objheader_t *objheader; short *endoffsets, *arryfields; @@ -1154,6 +1251,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + /* Find offset length for each tuple */ int numoffset[ntuples];//Number of offsets for each tuple numoffset[0] = endoffsets[0]; @@ -1161,57 +1259,159 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { numoffset[i] = endoffsets[i] - endoffsets[i-1]; } for(i = 0; i < ntuples; i++) { - if(oid[i] == -1) + if(oid[i] == 0){ + if(i == 0) { + arryfieldindex = 0; + nextarryfieldindex = endoffsets[0]; + }else { + arryfieldindex = endoffsets[i-1]; + nextarryfieldindex = endoffsets[i]; + } + numoffset[i] = 0; + endoffsets[0] = val = numoffset[0]; + for(k = 1; k < ntuples; k++) { + val = val + numoffset[k]; + endoffsets[k] = val; + } + + for(k = 0; k NUMCLASSES) { + isArray = 1; + } + if(isArray == 1) { + int elementsize = classsize[TYPE(objheader)]; + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); + } else { + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); + } + //Update numoffset array + numoffset[i] = numoffset[i] - 1; + //Update oid array oid[i] = objoid; - numoffset[i] = numoffset[i] - (j+1); - for(k = 0; k < numoffset[i]; k++) - arryfields[endoffsets[j]+ k] = arryfields[endoffsets[j]+k+1]; - index++; - /*New offset oid not found */ - if((objheader = (objheader_t*) mhashSearch(objoid)) == NULL) { + //Update endoffset array + endoffsets[0] = val = numoffset[0]; + for(k = 1; k < ntuples; k++) { + val = val + numoffset[k]; + endoffsets[k] = val; + } + //Update arrayfields array + for(k = 0; k < endoffsets[ntuples-1]; k++) { + arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1]; + } + if((objheader = (objheader_t*) mhashSearch(oid[i])) == NULL) { flag = 1; - checkPreCache(node, numoffset, j, numoffset[i], objoid, index, i, oidnfound); + checkPreCache(node, numoffset, oid[i], i); break; - } else - flag = 0; + } + tmp = (char *) objheader; + isArray = 0; } - /*If all offset oids are found locally,make the prefetch tuple invalid */ if(flag == 0) { - oid[i] = -1; - numoffset[i] = 0; + oid[i] = 0; } } else { - oidnfound = 1; /* Look in Prefetch cache */ - checkPreCache(node, numoffset, 0, numoffset[i], oid[i], 0, i, oidnfound); + checkPreCache(node, numoffset, oid[i],i); } - } + /* Make machine groups */ - head = makePreGroups(node, numoffset); + if((head = makePreGroups(node, numoffset)) == NULL) { + printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__); + return NULL; + } + return head; } +void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, int index) { + char *ptr, *tmp; + int ntuples, i, k, flag=0, isArray =0, arryfieldindex, val; + unsigned int * oid; + short *endoffsets, *arryfields; + objheader_t *header; + + ptr = (char *) node; + ntuples = *(GET_NTUPLES(ptr)); + oid = GET_PTR_OID(ptr); + endoffsets = GET_PTR_EOFF(ptr, ntuples); + arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + + if((header = (objheader_t *) prehashSearch(objoid)) == NULL) { + return; + } else { //Found in Prefetch Cache + //TODO Decide if object is too old, if old remove from cache + tmp = (char *) header; + int loopcount = numoffset[index]; + if(index == 0) + arryfieldindex = 0; + else + arryfieldindex = endoffsets[(index - 1)]; + // Check if any of the offset oid is available in the Prefetch cache + for(i = 0; i < loopcount; i++) { + /* Check for arrays */ + if(TYPE(header) > NUMCLASSES) { + isArray = 1; + } + if(isArray == 1) { + int elementsize = classsize[TYPE(header)]; + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + sizeof(struct ArrayObject) + (elementsize*arryfields[arryfieldindex]))); + } else { + objoid = *((unsigned int *)(tmp + sizeof(objheader_t) + arryfields[arryfieldindex])); + } + //Update numoffset array + numoffset[index] = numoffset[index] - 1; + //Update oid array + oid[index] = objoid; + //Update endoffset array + endoffsets[0] = val = numoffset[0]; + for(k = 1; k < ntuples; k++) { + val = val + numoffset[k]; + endoffsets[k] = val; + } + //Update arrayfields array + for(k = 0; k < endoffsets[ntuples-1]; k++) { + arryfields[arryfieldindex+k] = arryfields[arryfieldindex+k+1]; + } + if((header = (objheader_t *)prehashSearch(oid[index])) != NULL) { + tmp = (char *) header; + isArray = 0; + } else { + flag = 1; + break; + } + } + } + //Found in the prefetch cache + if(flag == 0 && (numoffset[index] == 0)) { + oid[index] = 0; + } +} + + + /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { prefetchqelem_t *qnode; prefetchpile_t *pilehead = NULL; + prefetchpile_t *ptr = NULL, *piletail = NULL; while(1) { /* lock mutex of primary prefetch queue */ @@ -1224,26 +1424,38 @@ void *transPrefetch(void *t) { /* dequeue node to create a machine piles and finally unlock mutex */ if((qnode = pre_dequeue()) == NULL) { printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); - return NULL; + pthread_mutex_unlock(&pqueue.qlock); + pthread_exit(NULL); } pthread_mutex_unlock(&pqueue.qlock); + /* Reduce redundant prefetch requests */ checkPrefetchTuples(qnode); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ - pilehead = foundLocal(qnode); + if((pilehead = foundLocal(qnode)) == NULL) { + printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__); + pthread_exit(NULL); + } + + ptr = pilehead; + while(ptr != NULL) { + if(ptr->next == NULL) { + piletail = ptr; + } + ptr = ptr->next; + } /* Lock mutex of pool queue */ pthread_mutex_lock(&mcqueue.qlock); /* Update the pool queue with the new remote machine piles generated per prefetch call */ - mcpileenqueue(pilehead); + mcpileenqueue(pilehead, piletail); /* Broadcast signal on machine pile queue */ pthread_cond_broadcast(&mcqueue.qcond); /* Unlock mutex of machine pile queue */ pthread_mutex_unlock(&mcqueue.qlock); /* Deallocate the prefetch queue pile node */ predealloc(qnode); - } } @@ -1267,44 +1479,47 @@ void *mcqProcess(void *threadid) { /* Dequeue node to send remote machine connections*/ if((mcpilenode = mcpiledequeue()) == NULL) { printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); - return NULL; + pthread_mutex_unlock(&mcqueue.qlock); + pthread_exit(NULL); } /* Unlock mutex */ pthread_mutex_unlock(&mcqueue.qlock); /*Initiate connection to remote host and send request */ /* Process Request */ - sendPrefetchReq(mcpilenode, tid); + if(mcpilenode->mid != myIpAddr) + sendPrefetchReq(mcpilenode); /* Deallocate the machine queue pile node */ mcdealloc(mcpilenode); } } -void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { - int sd, i, offset, off, len, endpair, numoffsets, count = 0; - struct sockaddr_in serv_addr; - struct hostent *server; +void sendPrefetchReq(prefetchpile_t *mcpilenode) { + int sd, i, off, len, endpair, count = 0; + struct sockaddr_in remoteAddr; char machineip[16], control; objpile_t *tmp; - + unsigned int mid; /* Send Trans Prefetch Request */ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket for TRANS_REQUEST\n"); + perror("Error in socket for SEND_PREFETCH_REQUEST\n"); return; } - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP); - midtoIP(mcpilenode->mid ,machineip); - machineip[15] = '\0'; - serv_addr.sin_addr.s_addr = inet_addr(machineip); + + mid = mcpilenode->mid; + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); /* Open Connection */ - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("Error in connect for TRANS_REQUEST\n"); + if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s():error %d connecting to %s:%d\n", __func__, errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); return; } @@ -1312,28 +1527,33 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { control = TRANS_PREFETCH; if(send(sd, &control, sizeof(char), MSG_NOSIGNAL) < sizeof(char)) { perror("Error in sending prefetch control\n"); + close(sd); return; } /* Send Oids and offsets in pairs */ tmp = mcpilenode->objpiles; while(tmp != NULL) { - off = offset = 0; + off = 0; count++; /* Keeps track of the number of oid and offset tuples sent per remote machine */ - len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(short)); + len = sizeof(int) + sizeof(unsigned int) + ((tmp->numoffset) * sizeof(unsigned short)); char oidnoffset[len]; - memcpy(oidnoffset, &len, sizeof(int)); + bzero(oidnoffset, len); + *((unsigned int*)oidnoffset) = len; off = sizeof(int); - memcpy(oidnoffset + off, &tmp->oid, sizeof(unsigned int)); + *((unsigned int *)((char *)oidnoffset + off)) = tmp->oid; off += sizeof(unsigned int); - for(i = 0; i < numoffsets; i++) { - offset = off + (i * sizeof(short)); - memcpy(oidnoffset + offset, tmp->offset, sizeof(short)); + for(i = 0; i < tmp->numoffset; i++) { + *((unsigned short*)((char *)oidnoffset + off)) = tmp->offset[i]; + off+=sizeof(unsigned short); } - if (send(sd, &oidnoffset, sizeof(oidnoffset),MSG_NOSIGNAL) < sizeof(oidnoffset)) { + + if (send(sd, oidnoffset, len , MSG_NOSIGNAL) < len) { perror("Error sending fixed bytes for thread\n"); + close(sd); return; } + tmp = tmp->next; } @@ -1341,6 +1561,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { endpair = -1; if (send(sd, &endpair, sizeof(int), MSG_NOSIGNAL) < sizeof(int)) { perror("Error sending fixed bytes for thread\n"); + close(sd); return; } @@ -1353,7 +1574,8 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int threadid) { void getPrefetchResponse(int count, int sd) { int i = 0, val, n, N, sum, index, objsize; unsigned int bufsize,oid; - char buffer[RECEIVE_BUFFER_SIZE], control; + char *buffer; + char control; char *ptr; void *modptr, *oldptr; @@ -1365,41 +1587,40 @@ void getPrefetchResponse(int count, int sd) { if(control == TRANS_PREFETCH_RESPONSE) { /*For each oid and offset tuple sent as prefetch request to remote machine*/ - while(i < count) { - /* Clear contents of buffer */ - memset(buffer, 0, RECEIVE_BUFFER_SIZE); - sum = 0; - index = 0; - /* Read the size of buffer to be received */ - if((N = read(sd, buffer, sizeof(unsigned int))) <= 0) { - perror("Size of buffer not recv\n"); + while(N = recv((int)sd, &bufsize, sizeof(unsigned int), 0) != 0) { + if((buffer = calloc(1, bufsize)) == NULL) { + printf("Calloc Error in %s() at %s, %d\n", __func__, __FILE__, __LINE__); return; } - memcpy(&bufsize, buffer, sizeof(unsigned int)); - ptr = buffer + sizeof(unsigned int); + sum = 0; + index = 0; + ptr = buffer; /* Keep receiving the buffer containing oid info */ do { n = recv((int)sd, (void *)ptr+sum, bufsize-sum, 0); sum +=n; } while(sum < bufsize && n != 0); + /* Decode the contents of the buffer */ - index = sizeof(unsigned int); - while(index < (bufsize - sizeof(unsigned int))) { + while(index < bufsize ) { if(buffer[index] == OBJECT_FOUND) { /* Increment it to get the object */ index += sizeof(char); - memcpy(&oid, buffer + index, sizeof(unsigned int)); + oid = *((unsigned int *)(buffer+index)); index += sizeof(unsigned int); - /* Lock the Prefetch Cache look up table*/ - pthread_mutex_lock(&pflookup.lock); /* For each object found add to Prefetch Cache */ - memcpy(&objsize, buffer + index, sizeof(int)); + objsize = *((int *)(buffer+index)); + index+=sizeof(int); + pthread_mutex_lock(&prefetchcache_mutex); if ((modptr = objstrAlloc(prefetchcache, objsize)) == NULL) { - printf("objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + printf("Error: objstrAlloc error for copying into prefetch cache %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&prefetchcache_mutex); + free(buffer); return; } + pthread_mutex_unlock(&prefetchcache_mutex); memcpy(modptr, buffer+index, objsize); - index += sizeof(int); + index += objsize; /* Insert the oid and its address into the prefetch hash lookup table */ /* Do a version comparison if the oid exists */ if((oldptr = prehashSearch(oid)) != NULL) { @@ -1409,13 +1630,16 @@ void getPrefetchResponse(int count, int sd) { prehashInsert(oid, modptr); } else if(((objheader_t *)oldptr)->version == ((objheader_t *)modptr)->version) { /* Add the new object ptr to hash table */ + prehashRemove(oid); prehashInsert(oid, modptr); - } else { /* Do nothing */ + } else { /* Do nothing: TODO modptr should be reference counted */ ; } } else {/*If doesn't no match found in hashtable, add the object ptr to hash table*/ prehashInsert(oid, modptr); } + /* Lock the Prefetch Cache look up table*/ + pthread_mutex_lock(&pflookup.lock); /* Broadcast signal on prefetch cache condition variable */ pthread_cond_broadcast(&pflookup.cond); /* Unlock the Prefetch Cache look up table*/ @@ -1424,16 +1648,18 @@ void getPrefetchResponse(int count, int sd) { /* Increment it to get the object */ /* TODO: For each object not found query DHT for new location and retrieve the object */ index += sizeof(char); - memcpy(&oid, buffer + index, sizeof(unsigned int)); + oid = *((unsigned int *)(buffer + index)); index += sizeof(unsigned int); /* Throw an error */ printf("OBJECT NOT FOUND.... THIS SHOULD NOT HAPPEN...TERMINATE PROGRAM\n"); exit(-1); - } else - printf("Error in decoding the index value %s, %d\n",__FILE__, __LINE__); + } else { + printf("Error in decoding the index value %d, %s, %d\n",index, __FILE__, __LINE__); + free(buffer); + return; + } } - - i++; + free(buffer); } } else printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__); @@ -1443,16 +1669,19 @@ void getPrefetchResponse(int count, int sd) { unsigned short getObjType(unsigned int oid) { objheader_t *objheader; - unsigned short numoffsets = 0; + unsigned short numoffset[] ={0}; + short fieldoffset[] ={}; if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL) { if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) { - prefetch(1, &oid, &numoffsets, NULL); + prefetch(1, &oid, numoffset, fieldoffset); pthread_mutex_lock(&pflookup.lock); while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL) + { pthread_cond_wait(&pflookup.cond, &pflookup.lock); + } pthread_mutex_unlock(&pflookup.lock); } } @@ -1511,3 +1740,322 @@ int startRemoteThread(unsigned int oid, unsigned int mid) return status; } +//TODO: when reusing oids, make sure they are not already in use! +unsigned int getNewOID(void) { + static unsigned int id = 0xFFFFFFFF; + + id += 2; + if (id > oidMax || id < oidMin) + { + id = (oidMin | 1); + } + return id; +} + +int processConfigFile() +{ + FILE *configFile; + const int maxLineLength = 200; + char lineBuffer[maxLineLength]; + char *token; + const char *delimiters = " \t\n"; + char *commentBegin; + in_addr_t tmpAddr; + + configFile = fopen(CONFIG_FILENAME, "r"); + if (configFile == NULL) + { + printf("error opening %s:\n", CONFIG_FILENAME); + perror(""); + return -1; + } + + numHostsInSystem = 0; + sizeOfHostArray = 8; + hostIpAddrs = calloc(sizeOfHostArray, sizeof(unsigned int)); + + while(fgets(lineBuffer, maxLineLength, configFile) != NULL) + { + commentBegin = strchr(lineBuffer, '#'); + if (commentBegin != NULL) + *commentBegin = '\0'; + token = strtok(lineBuffer, delimiters); + while (token != NULL) + { + tmpAddr = inet_addr(token); + if ((int)tmpAddr == -1) + { + printf("error in %s: bad token:%s\n", CONFIG_FILENAME, token); + fclose(configFile); + return -1; + } + else + addHost(htonl(tmpAddr)); + token = strtok(NULL, delimiters); + } + } + + fclose(configFile); + + if (numHostsInSystem < 1) + { + printf("error in %s: no IP Adresses found\n", CONFIG_FILENAME); + return -1; + } +#ifdef MAC + myIpAddr = getMyIpAddr("en1"); +#else + myIpAddr = getMyIpAddr("eth0"); +#endif + myIndexInHostArray = findHost(myIpAddr); + if (myIndexInHostArray == -1) + { + printf("error in %s: IP Address of eth0 not found\n", CONFIG_FILENAME); + return -1; + } + oidsPerBlock = (0xFFFFFFFF / numHostsInSystem) + 1; + oidMin = oidsPerBlock * myIndexInHostArray; + if (myIndexInHostArray == numHostsInSystem - 1) + oidMax = 0xFFFFFFFF; + else + oidMax = oidsPerBlock * (myIndexInHostArray + 1) - 1; + + return 0; +} + +void addHost(unsigned int hostIp) +{ + unsigned int *tmpArray; + + if (findHost(hostIp) != -1) + return; + + if (numHostsInSystem == sizeOfHostArray) + { + tmpArray = calloc(sizeOfHostArray * 2, sizeof(unsigned int)); + memcpy(tmpArray, hostIpAddrs, sizeof(unsigned int) * numHostsInSystem); + free(hostIpAddrs); + hostIpAddrs = tmpArray; + } + + hostIpAddrs[numHostsInSystem++] = hostIp; + + return; +} + +int findHost(unsigned int hostIp) +{ + int i; + for (i = 0; i < numHostsInSystem; i++) + if (hostIpAddrs[i] == hostIp) + return i; + + //not found + return -1; +} + +/* This function sends notification request per thread waiting on object(s) whose version + * changes */ +int reqNotify(unsigned int *oidarry, unsigned short *versionarry, unsigned int numoid) { + int sock,i; + objheader_t *objheader; + struct sockaddr_in remoteAddr; + char msg[1 + numoid * (sizeof(short) + sizeof(unsigned int)) + 3 * sizeof(unsigned int)]; + char *ptr; + int bytesSent; + int status, size; + unsigned short version; + unsigned int oid,mid; + static unsigned int threadid = 0; + pthread_mutex_t threadnotify = PTHREAD_MUTEX_INITIALIZER; //Lock and condition var for threadjoin and notification + pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER; + notifydata_t *ndata; + + //FIXME currently all oids belong to one machine + oid = oidarry[0]; + mid = lhashSearch(oid); + + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("reqNotify():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + /* Generate unique threadid */ + threadid++; + + /* Save threadid, numoid, oidarray, versionarray, pthread_cond_variable for later processing */ + if((ndata = calloc(1, sizeof(notifydata_t))) == NULL) { + printf("Calloc Error %s, %d\n", __FILE__, __LINE__); + return -1; + } + ndata->numoid = numoid; + ndata->threadid = threadid; + ndata->oidarry = oidarry; + ndata->versionarry = versionarry; + ndata->threadcond = threadcond; + ndata->threadnotify = threadnotify; + if((status = notifyhashInsert(threadid, ndata)) != 0) { + printf("reqNotify(): Insert into notify hash table not successful %s, %d\n", __FILE__, __LINE__); + free(ndata); + return -1; + } + + /* Send number of oids, oidarry, version array, machine id and threadid */ + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("reqNotify():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + free(ndata); + return -1; + } else { + msg[0] = THREAD_NOTIFY_REQUEST; + *((unsigned int *)(&msg[1])) = numoid; + /* Send array of oids */ + size = sizeof(unsigned int); + { + i = 0; + while(i < numoid) { + oid = oidarry[i]; + *((unsigned int *)(&msg[1] + size)) = oid; + size += sizeof(unsigned int); + i++; + } + } + + /* Send array of version */ + { + i = 0; + while(i < numoid) { + version = versionarry[i]; + *((unsigned short *)(&msg[1] + size)) = version; + size += sizeof(unsigned short); + i++; + } + } + + *((unsigned int *)(&msg[1] + size)) = myIpAddr; + size += sizeof(unsigned int); + *((unsigned int *)(&msg[1] + size)) = threadid; + + pthread_mutex_lock(&(ndata->threadnotify)); + bytesSent = send(sock, msg, 1 + numoid * (sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int) , 0); + if (bytesSent < 0){ + perror("reqNotify():send()"); + status = -1; + } else if (bytesSent != 1 + numoid*(sizeof(unsigned int) + sizeof(unsigned short)) + 3 * sizeof(unsigned int)){ + printf("reNotify(): error, sent %d bytes %s, %d\n", bytesSent, __FILE__, __LINE__); + status = -1; + } else { + status = 0; + } + + pthread_cond_wait(&(ndata->threadcond), &(ndata->threadnotify)); + pthread_mutex_unlock(&(ndata->threadnotify)); + } + + pthread_cond_destroy(&threadcond); + pthread_mutex_destroy(&threadnotify); + free(ndata); + close(sock); + return status; +} + +void threadNotify(unsigned int oid, unsigned short version, unsigned int tid) { + notifydata_t *ndata; + int i, objIsFound = 0, index; + void *ptr; + + //Look up the tid and call the corresponding pthread_cond_signal + if((ndata = notifyhashSearch(tid)) == NULL) { + printf("threadnotify(): No such threadid is present %s, %d\n", __FILE__, __LINE__); + return; + } else { + for(i = 0; i < ndata->numoid; i++) { + if(ndata->oidarry[i] == oid){ + objIsFound = 1; + index = i; + } + } + if(objIsFound == 0){ + printf("threadNotify(): Oid not found %s, %d\n", __FILE__, __LINE__); + return; + } else { + if(version <= ndata->versionarry[index]){ + printf("threadNotify(): New version %d has not changed since last version %s, %d\n", version, __FILE__, __LINE__); + return; + } else { + /* Clear from prefetch cache and free thread related data structure */ + if((ptr = prehashSearch(oid)) != NULL) { + prehashRemove(oid); + } + pthread_cond_signal(&(ndata->threadcond)); + } + } + } + return; +} + +int notifyAll(threadlist_t **head, unsigned int oid, unsigned int version) { + threadlist_t *ptr; + unsigned int mid; + struct sockaddr_in remoteAddr; + char msg[1 + sizeof(unsigned short) + 2*sizeof(unsigned int)]; + int sock, status, size, bytesSent; + + while(*head != NULL) { + ptr = *head; + mid = ptr->mid; + //create a socket connection to that machine + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){ + perror("notifyAll():socket()"); + return -1; + } + + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + //send Thread Notify response and threadid to that machine + if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ + printf("notifyAll():error %d connecting to %s:%d\n", errno, + inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + status = -1; + } else { + bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int))); + msg[0] = THREAD_NOTIFY_RESPONSE; + *((unsigned int *)&msg[1]) = oid; + size = sizeof(unsigned int); + *((unsigned short *)(&msg[1]+ size)) = version; + size+= sizeof(unsigned short); + *((unsigned int *)(&msg[1]+ size)) = ptr->threadid; + + bytesSent = send(sock, msg, (1 + 2*sizeof(unsigned int) + sizeof(unsigned short)), 0); + if (bytesSent < 0){ + perror("notifyAll():send()"); + status = -1; + } else if (bytesSent != 1 + 2*sizeof(unsigned int) + sizeof(unsigned short)){ + printf("notifyAll(): error, sent %d bytes %s, %d\n", + bytesSent, __FILE__, __LINE__); + status = -1; + } else { + status = 0; + } + } + //close socket + close(sock); + // Update head + *head = ptr->next; + free(ptr); + } + return status; +} + +void transAbort(transrecord_t *trans) { + objstrDelete(trans->cache); + chashDelete(trans->lookupTable); + free(trans); +}