From: adash Date: Mon, 7 Apr 2008 21:26:26 +0000 (+0000) Subject: complete hashtable implementation with separate socket pools for read and X-Git-Tag: preEdgeChange~180 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=6334efaf8ede651a85a0e541e6382b1ed3299408;p=IRC.git complete hashtable implementation with separate socket pools for read and prefetch. commented out array implementation of socket pools --- diff --git a/Robust/src/Runtime/DSTM/interface/ISSUESTOADDRESS b/Robust/src/Runtime/DSTM/interface/ISSUESTOADDRESS index d9040032..f008b349 100644 --- a/Robust/src/Runtime/DSTM/interface/ISSUESTOADDRESS +++ b/Robust/src/Runtime/DSTM/interface/ISSUESTOADDRESS @@ -4,20 +4,30 @@ High priority list A) allocations always have to traverse to end of list B) do we need to zero first?? -- need to check about this one, it may be okay +Status:Verified + 1) Wrap all receive calls in loops A) Perhaps the best way is to just define a macro or function call that does this. Look at GETSIZE macro for example... +Status:DONE + 2) Check locking... There is likely a race condition on getObjType(). +Status:DONE + 3) Receiving object code assume a maximum object size. It is probably better to: A) read size in. B) allocate space for object at its final destination C) read into the space +Status:DONE + Low priority list --------------------------------- 1) We shouldn't call memcopy for copying fixed-sized structs or primitive values...just use = + +Status: DONE in most places diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index f7478881..8a86ea2f 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -48,9 +48,11 @@ #define MAX_OBJECTS 20 //Max remote-machine connections #define NUM_MACHINES 2 +#define LOADFACTOR 0.5 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB //Transaction id per machine #define TID_LEN 20 +#define LISTEN_PORT 2156 #include @@ -61,7 +63,15 @@ #include "queue.h" #include "mcpileq.h" #include "threadnotify.h" - +#include +#include +#include +#include +#include +#include +#include +#include +#include "sockpool.h" //bit designations for status field of objheader #define DIRTY 0x01 @@ -124,6 +134,11 @@ typedef struct objstr { struct objstr *next; } objstr_t; +typedef struct oidmidpair { + unsigned int oid; + unsigned int mid; +} oidmidpair_t; + typedef struct transrecord { objstr_t *cache; chashtable_t *lookupTable; diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index 043fbb88..1f892e35 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -1,14 +1,6 @@ /* Coordinator => Machine that initiates the transaction request call for commiting a transaction * Participant => Machines that host the objects involved in a transaction commit */ -#include -#include -#include -#include -#include -#include -#include -#include #include "dstm.h" #include "mlookup.h" #include "llookup.h" @@ -17,24 +9,17 @@ #include "thread.h" #endif - -#define LISTEN_PORT 2156 #define BACKLOG 10 //max pending connections #define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; +extern int numHostsInSystem; objstr_t *mainobjstore; pthread_mutex_t mainobjstore_mutex; pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */ -/********************************************************** - * Global variables to map socketid and remote mid - * to resuse sockets - **************************************************/ -midSocketInfo_t sockArray[NUM_MACHINES]; -int sockCount; //number of connections with all remote machines(one socket per mc) -int sockIdFound; //track if socket file descriptor is already established -pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent + +sockPoolHashTable_t *transPResponseSocketPool; /* This function initializes the main objects store and creates the * global machine and location lookup table */ @@ -54,14 +39,12 @@ int dstmInit(void) if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR)) return 1; //failure - - //Initialize mid to socketid mapping array - int t; - sockCount = 0; - for(t = 0; t < NUM_MACHINES; t++) { - sockArray[t].mid = 0; - sockArray[t].sockid = 0; - } + + //Initialize socket pool + if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } return 0; } @@ -138,11 +121,13 @@ void *dstmAccept(void *acceptfd) { unsigned short objType, *versionarry, version; unsigned int *oidarry, numoid, mid, threadid; + /* transinfo.objlocked = NULL; transinfo.objnotfound = NULL; transinfo.modptr = NULL; transinfo.numlocked = 0; transinfo.numnotfound = 0; + */ /* Receive control messages from other machines */ while(1) { @@ -185,6 +170,11 @@ void *dstmAccept(void *acceptfd) { case TRANS_REQUEST: /* Read transaction request */ + transinfo.objlocked = NULL; + transinfo.objnotfound = NULL; + transinfo.modptr = NULL; + transinfo.numlocked = 0; + transinfo.numnotfound = 0; if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) { printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__); pthread_exit(NULL); @@ -559,7 +549,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int transinfo->modptr = modptr; transinfo->numlocked = *(objlocked); transinfo->numnotfound = *(objnotfound); - + return control; } @@ -617,77 +607,29 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock int prefetchReq(int acceptfd) { int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0; - int length, sd = -1; + int length; char *recvbuffer, *sendbuffer, control; unsigned int oid, mid; - short *offsetarry; objheader_t *header; struct sockaddr_in remoteAddr; + oidmidpair_t oidmid; do { recv_data((int)acceptfd, &length, sizeof(int)); if(length != -1) { - size = length - sizeof(int); - if((recvbuffer = calloc(1, size)) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - return -1; - } - recv_data((int)acceptfd, recvbuffer, size); - oid = *((unsigned int *) recvbuffer); - mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int))); - size = size - (2 * sizeof(unsigned int)); - numoffset = size / sizeof(short); - if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) { - printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(recvbuffer); - return -1; - } - memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size); - free(recvbuffer); - pthread_mutex_lock(&sockLock); - sockIdFound = 0; - pthread_mutex_unlock(&sockLock); - /* If socket is already established then send data reusing socket */ - for(i = 0; i < NUM_MACHINES; i++) { - if(sockArray[i].mid == mid) { - sd = sockArray[i].sockid; - pthread_mutex_lock(&sockLock); - sockIdFound = 1; - pthread_mutex_unlock(&sockLock); - break; - } - } - - if(sockIdFound == 0) { - if(sockCount < NUM_MACHINES) { - /* Create socket to send information */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){ - perror("prefetchReq():socket()"); - return -1; - } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mid); - - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){ - perror("connect"); - printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return -1; - } - sockArray[sockCount].mid = mid; - sockArray[sockCount].sockid = sd; - pthread_mutex_lock(&sockLock); - sockCount++; - pthread_mutex_unlock(&sockLock); - } else { - //TODO Fix for connecting to more than 2 machines && close socket - printf("%s(): Error: Currently works for only 2 machines\n", __func__); - return -1; - } - } + recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int)); + oid = oidmid.oid; + mid = oidmid.mid; + size = length - sizeof(int) - (2 * sizeof(unsigned int)); + numoffset = size/sizeof(short); + short offsetarry[numoffset]; + recv_data((int) acceptfd, offsetarry, size); + + int sd = -1; + if((sd = getSock(transPResponseSocketPool, mid)) == -1) { + printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); + exit(-1); + } /*Process each oid */ if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */ @@ -695,7 +637,6 @@ int prefetchReq(int acceptfd) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; if((sendbuffer = calloc(1, size)) == NULL) { printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); close(sd); return -1; } @@ -705,7 +646,6 @@ int prefetchReq(int acceptfd) { control = TRANS_PREFETCH_RESPONSE; if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); printf("Error: %s() in sending prefetch response at %s, %d\n", __func__, __FILE__, __LINE__); close(sd); @@ -717,7 +657,6 @@ int prefetchReq(int acceptfd) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; if((sendbuffer = calloc(1, size)) == NULL) { printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); close(sd); return -1; } @@ -731,7 +670,6 @@ int prefetchReq(int acceptfd) { control = TRANS_PREFETCH_RESPONSE; if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); printf("Error: %s() in sending prefetch response at %s, %d\n", __func__, __FILE__, __LINE__); close(sd); @@ -761,7 +699,6 @@ int prefetchReq(int acceptfd) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ; if((sendbuffer = calloc(1, size)) == NULL) { printf("Calloc error at %s,%d\n", __FILE__, __LINE__); - free(offsetarry); close(sd); return -1; } @@ -771,7 +708,6 @@ int prefetchReq(int acceptfd) { control = TRANS_PREFETCH_RESPONSE; if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); printf("Error: %s() in sending prefetch response at %s, %d\n", __FILE__, __LINE__); close(sd); @@ -784,7 +720,6 @@ int prefetchReq(int acceptfd) { size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize; if((sendbuffer = calloc(1, size)) == NULL) { printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__); - free(offsetarry); close(sd); return -1; } @@ -798,7 +733,6 @@ int prefetchReq(int acceptfd) { control = TRANS_PREFETCH_RESPONSE; if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) { - free(offsetarry); printf("Error: %s() in sending prefetch response at %s, %d\n", __func__, __FILE__, __LINE__); close(sd); @@ -807,8 +741,14 @@ int prefetchReq(int acceptfd) { } isArray = 0; } - free(offsetarry); } + + //Release socket + int status; + if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) { + printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__); + return -1; + } } } while (length != -1); return 0; diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.c b/Robust/src/Runtime/DSTM/interface/sockpool.c index 1a0ef84a..78a4e226 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.c +++ b/Robust/src/Runtime/DSTM/interface/sockpool.c @@ -1,7 +1,5 @@ #include "sockpool.h" -sockPoolHashTable_t sockhash; - inline int CompareAndSwap(int *a, int oldval, int newval) { int temp = *a; if (temp == oldval) { @@ -25,19 +23,27 @@ inline void UnLock(SpinLock *s) { *s = 0; } -int createSockPool(unsigned int size, float loadfactor) { +sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size, float loadfactor) { + if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) { + printf("Calloc error at %s line %d\n", __FILE__, __LINE__); + return NULL; + } + socknode_t **nodelist; if ((nodelist = calloc(size, sizeof(socknode_t *))) < 0) { printf("Calloc error at %s line %d\n", __FILE__, __LINE__); - return -1; + free(sockhash); + return NULL; } - sockhash.table = nodelist; - sockhash.inuse = NULL; - sockhash.size = size; - sockhash.numelements = 0; - sockhash.loadfactor = loadfactor; - InitLock(&sockhash.mylock); - return 0; + + sockhash->table = nodelist; + sockhash->inuse = NULL; + sockhash->size = size; + sockhash->numelements = 0; + sockhash->loadfactor = loadfactor; + InitLock(&sockhash->mylock); + + return sockhash; } int createNewSocket(unsigned int mid) { @@ -59,48 +65,95 @@ int createNewSocket(unsigned int mid) { return sd; } -int getSock(unsigned int mid) { - int key = mid%(sockhash.size); +int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) { + int key = mid%(sockhash->size); + + if (sockhash->table[key] == NULL) { + int sd; + if((sd = createNewSocket(mid)) != -1) { + socknode_t *inusenode = calloc(1, sizeof(socknode_t)); + inusenode->mid = mid; + inusenode->sd = sd; + insToList(sockhash, inusenode); + return sd; + } else { + return -1; + } + } + + int midFound = 0; + socknode_t *ptr = sockhash->table[key]; + socknode_t *prev = (socknode_t *) &(sockhash->table[key]); + while (ptr != NULL) { + if (mid == ptr->mid) { + midFound = 1; + int sd = ptr->sd; + prev = ptr->next; + insToList(sockhash, ptr); + return sd; + } + prev = ptr; + ptr = ptr->next; + } + + if(midFound == 0) { + int sd; + if((sd = createNewSocket(mid)) != -1) { + socknode_t *inusenode = calloc(1, sizeof(socknode_t)); + inusenode->mid = mid; + inusenode->sd = sd; + insToList(sockhash, inusenode); + return sd; + } else { + return -1; + } + } + return -1; +} + +int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) { + int key = mid%(sockhash->size); - Lock(&sockhash.mylock); - if (sockhash.table[key] == NULL) { - UnLock(&sockhash.mylock); + Lock(&sockhash->mylock); + if (sockhash->table[key] == NULL) { + UnLock(&sockhash->mylock); int sd; if((sd = createNewSocket(mid)) != -1) { socknode_t *inusenode = calloc(1, sizeof(socknode_t)); inusenode->mid = mid; inusenode->sd = sd; - insToList(inusenode); + insToListWithLock(sockhash, inusenode); return sd; } else { return -1; } } - UnLock(&sockhash.mylock); + UnLock(&sockhash->mylock); int midFound = 0; - Lock(&sockhash.mylock); - socknode_t *ptr = sockhash.table[key]; - socknode_t *prev = (socknode_t *) &(sockhash.table[key]); + Lock(&sockhash->mylock); + socknode_t *ptr = sockhash->table[key]; + socknode_t *prev = (socknode_t *) &(sockhash->table[key]); while (ptr != NULL) { if (mid == ptr->mid) { midFound = 1; int sd = ptr->sd; prev = ptr->next; - UnLock(&sockhash.mylock); - insToList(ptr); + UnLock(&sockhash->mylock); + insToListWithLock(sockhash, ptr); return sd; } prev = ptr; ptr = ptr->next; } - UnLock(&sockhash.mylock); + UnLock(&sockhash->mylock); + if(midFound == 0) { int sd; if((sd = createNewSocket(mid)) != -1) { socknode_t *inusenode = calloc(1, sizeof(socknode_t)); inusenode->mid = mid; inusenode->sd = sd; - insToList(inusenode); + insToListWithLock(sockhash, inusenode); return sd; } else { return -1; @@ -109,25 +162,138 @@ int getSock(unsigned int mid) { return -1; } -void insToList(socknode_t *inusenode) { - Lock(&sockhash.mylock); - inusenode->next = sockhash.inuse; - sockhash.inuse = inusenode; - UnLock(&sockhash.mylock); +void insToList(sockPoolHashTable_t *sockhash, socknode_t *inusenode) { + inusenode->next = sockhash->inuse; + sockhash->inuse = inusenode; +} + +void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) { + Lock(&sockhash->mylock); + inusenode->next = sockhash->inuse; + sockhash->inuse = inusenode; + UnLock(&sockhash->mylock); } -int freeSock(unsigned int mid, int sd) { - if(sockhash.inuse != NULL) { - Lock(&sockhash.mylock); - socknode_t *ptr = sockhash.inuse; +int freeSock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) { + if(sockhash->inuse != NULL) { + socknode_t *ptr = sockhash->inuse; + ptr->mid = mid; + ptr->sd = sd; + sockhash->inuse = ptr->next; + int key = mid%(sockhash->size); + ptr->next = sockhash->table[key]; + sockhash->table[key] = ptr; + return 0; + } + return -1; +} + +int freeSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) { + if(sockhash->inuse != NULL) { + Lock(&sockhash->mylock); + socknode_t *ptr = sockhash->inuse; ptr->mid = mid; ptr->sd = sd; - sockhash.inuse = ptr->next; - int key = mid%(sockhash.size); - ptr->next = sockhash.table[key]; - sockhash.table[key] = ptr; - UnLock(&sockhash.mylock); + sockhash->inuse = ptr->next; + int key = mid%(sockhash->size); + ptr->next = sockhash->table[key]; + sockhash->table[key] = ptr; + UnLock(&sockhash->mylock); return 0; } return -1; } + +#if 0 +/ ***************************************/ +* Array Implementation for socket resuse +* ***************************************/ + +int num_machines; + +sock_pool_t *initSockPool(unsigned int *mid, int machines) { + sock_pool_t *sockpool; + num_machines = machines; + if ((sockpool = calloc(num_machines, sizeof(sock_pool_t))) < 0) { + printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__); + return NULL; + } + int i; + for (i = 0; i < num_machines; i++) { + if ((sockpool[i].sd = calloc(MAX_CONN_PER_MACHINE, sizeof(int))) < 0) { + printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__); + return NULL; + } + if ((sockpool[i].inuse = calloc(MAX_CONN_PER_MACHINE, sizeof(char))) < 0) { + printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__); + return NULL; + } + sockpool[i].mid = mid[i]; + int j; + for(j = 0; j < MAX_CONN_PER_MACHINE; j++) { + sockpool[i].sd[j] = -1; + } + } + + return sockpool; +} + +int getSock(sock_pool_t *sockpool, unsigned int mid) { + int i; + for (i = 0; i < num_machines; i++) { + if (sockpool[i].mid == mid) { + int j; + for (j = 0; j < MAX_CONN_PER_MACHINE; j++) { + if (sockpool[i].sd[j] != -1 && (sockpool[i].inuse[j] == 0)) { + sockpool[i].inuse[j] = 1; + return sockpool[i].sd[j]; + } + if (sockpool[i].sd[j] == -1) { + //Open Connection + int sd; + if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__); + return -1; + } + struct sockaddr_in remoteAddr; + bzero(&remoteAddr, sizeof(remoteAddr)); + remoteAddr.sin_family = AF_INET; + remoteAddr.sin_port = htons(LISTEN_PORT); + remoteAddr.sin_addr.s_addr = htonl(mid); + + if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { + printf("%s(): Error %d connecting to %s:%d\n", __func__, errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); + close(sd); + return -1; + } + sockpool[i].sd[j] = sd; + sockpool[i].inuse[j] = 1; + return sockpool[i].sd[j]; + } + } + printf("%s()->Error: Less number of MAX_CONN_PER_MACHINE\n", __func__); + return -1; + } + } + printf("%s()-> Error: Machine id not found\n", __func__); + + return -1; +} + +int freeSock(sock_pool_t *sockpool, int sd) { + int i; + for (i = 0; i < num_machines; i++) { + int j; + for (j = 0; j < MAX_CONN_PER_MACHINE; j++) { + if (sockpool[i].sd[j] == sd) { + sockpool[i].inuse[j] = 0; + return 0; + } + } + } + printf("%s() Error: Illegal socket descriptor %d\n", __func__, sd); + + return -1; +} + +#endif diff --git a/Robust/src/Runtime/DSTM/interface/sockpool.h b/Robust/src/Runtime/DSTM/interface/sockpool.h index 654d03f9..1e868f3d 100644 --- a/Robust/src/Runtime/DSTM/interface/sockpool.h +++ b/Robust/src/Runtime/DSTM/interface/sockpool.h @@ -3,10 +3,7 @@ #include "dstm.h" -#define LOADFACTOR 0.5 - typedef int SpinLock; - typedef struct socknode { int sd; unsigned int mid; @@ -22,15 +19,33 @@ typedef struct sockPoolHashTable { SpinLock mylock; } sockPoolHashTable_t; -int createSockPool(unsigned int, float); -int getSock(unsigned int); -int freeSock(unsigned int, int); -int deleteSockpool(sockPoolHashTable_t *); -void insToList(socknode_t *); +sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float); +int getSock(sockPoolHashTable_t *, unsigned int); +int getSockWithLock(sockPoolHashTable_t *, unsigned int); +int freeSock(sockPoolHashTable_t *, unsigned int, int); +int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int); +void insToList(sockPoolHashTable_t *, socknode_t *); +void insToListWithLock(sockPoolHashTable_t *, socknode_t *); int createNewSocket(unsigned int); int CompareAndSwap(int *, int, int); void InitLock(SpinLock *); void Lock (SpinLock *); void UnLock (SpinLock *); +#if 0 +/************************************************ + * Array Implementation data structures + ***********************************************/ +#define MAX_CONN_PER_MACHINE 10 +typedef struct sock_pool { + unsigned int mid; + int *sd; + char *inuse; +} sock_pool_t; + +sock_pool_t *initSockPool(unsigned int *, int); +int getSock(sock_pool_t *, unsigned int); +int freeSock(sock_pool_t *, int); +#endif + #endif diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 66e524ba..99e63561 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -8,21 +8,10 @@ #include "prelookup.h" #include "threadnotify.h" #include "queue.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #ifdef COMPILER #include "thread.h" #endif -#define LISTEN_PORT 2156 #define NUM_THREADS 1 #define PREFETCH_CACHE_SIZE 1048576 //1MB #define CONFIG_FILENAME "dstm.conf" @@ -30,7 +19,6 @@ /* Global Variables */ extern int classsize[]; extern primarypfq_t pqueue; //Shared prefetch queue -extern mcpileq_t mcqueue; //Shared queue containing prefetch requests sorted by remote machineids objstr_t *prefetchcache; //Global Prefetch cache pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */ @@ -48,16 +36,8 @@ unsigned int oidsPerBlock; unsigned int oidMin; unsigned int oidMax; -/************************************************************************ - * Global variables to map socketid and remote mid to - * reuse sockets for sending prefetches and making remote read requests - ************************************************************************/ -midSocketInfo_t midSocketArray[NUM_MACHINES]; -int sockCount; //number of connections with all remote machines(one socket per mc) -int sockIdFound; //track if socket file descriptor is already established -midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES]; -int sockCountRemoteRead; //number of connections with all remote machines(one socket per mc) -int sockIdFoundRemoteRead; //track if socket file descriptor is already established +sockPoolHashTable_t *transReadSockPool; +sockPoolHashTable_t *transPrefetchSockPool; void printhex(unsigned char *, int); plistnode_t *createPiles(transrecord_t *); @@ -194,11 +174,19 @@ int dstmStartup(const char * option) { threadcount--; #endif + //Initialize socket pool + if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } + if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) { + printf("Error in creating new socket pool at %s line %d\n", __FILE__, __LINE__); + return 0; + } + dstmInit(); transInit(); - - if (master) { pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); @@ -268,22 +256,6 @@ void transInit() { retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL); } while(retval!=0); pthread_detach(tPrefetch); - - //Initialize mid to socketid mapping array - sockCount = 0; - for(t = 0; t < NUM_MACHINES; t++) { - midSocketArray[t].mid = 0; - midSocketArray[t].sockid = 0; - } - - //Create and Initialize a pool of threads - /* Threads are active for the entire period runtime is running */ - for(t = 0; t< NUM_THREADS; t++) { - do { - rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t); - } while(rc!=0); - pthread_detach(wthreads[t]); - } } /* This function stops the threads spawned */ @@ -833,8 +805,8 @@ char sendResponse(thread_data_array_t *tdata, int sd) { control = *(tdata->replyctrl); send_data(sd, &control, sizeof(char)); - //TODO read missing objects to be used during object migration - /* If the decided response is due to a soft abort and missing objects at the Participant's side */ + //TODO read missing objects during object migration + /* If response is a soft abort due to missing objects at the Participant's side */ /* if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) { // Read list of objects missing @@ -869,48 +841,17 @@ char sendResponse(thread_data_array_t *tdata, int sd) { * */ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { - int sd, size, val; + int size, val; struct sockaddr_in serv_addr; char machineip[16]; char control; objheader_t *h; - void *objcopy; + void *objcopy = NULL; - int i; - for(i = 0; i < NUM_MACHINES; i++) { - if(sockArrayRemoteRead[i].mid == mnum) { - sd = sockArrayRemoteRead[i].sockid; - sockIdFoundRemoteRead = 1; - break; - } - } - - if(sockIdFoundRemoteRead == 0) { - if(sockCountRemoteRead < NUM_MACHINES) { - /* Create socket */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - perror("Error in socket\n"); - return NULL; - } - - bzero((char*) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(LISTEN_PORT); - serv_addr.sin_addr.s_addr = htonl(mnum); - // Open connection - if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) { - perror("getRemoteObj() Error in connect\n"); - close(sd); - return NULL; - } - sockArrayRemoteRead[sockCountRemoteRead].mid = mnum; - sockArrayRemoteRead[sockCountRemoteRead].sockid = sd; - sockCountRemoteRead++; - } else { - //TODO Fix for connecting to more than 2 machines && close socket - printf("%s(): Error: Currently works for two remote machines\n", __func__); - return NULL; - } + int sd; + if((sd = getSock(transReadSockPool, mnum)) == -1) { + printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__); + return NULL; } char readrequest[sizeof(char)+sizeof(unsigned int)]; @@ -923,7 +864,8 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { switch(control) { case OBJECT_NOT_FOUND: - return NULL; + objcopy = NULL; + break; case OBJECT_FOUND: /* Read object if found into local cache */ recv_data(sd, &size, sizeof(int)); @@ -935,9 +877,15 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { break; default: printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__); - return NULL; + break; } + int status; + if((status = freeSock(transReadSockPool, mnum, sd)) == -1) { + printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__); + return NULL; + } + return objcopy; } @@ -1160,14 +1108,14 @@ void checkPrefetchTuples(prefetchqelem_t *node) { int ntuples, slength; unsigned int *oid; unsigned short *endoffsets; - short *arryfields; + short *offsets; /* Check for the case x.y.z and a.b.c are same oids */ ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); oid = GET_PTR_OID(ptr); endoffsets = GET_PTR_EOFF(ptr, ntuples); - arryfields = GET_PTR_ARRYFLD(ptr, ntuples); + offsets = GET_PTR_ARRYFLD(ptr, ntuples); /* Find offset length for each tuple */ int numoffset[ntuples]; @@ -1205,7 +1153,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) { } index = endoffsets[j -1]; for(count = 0; count < slength; count ++) { - if (arryfields[k] != arryfields[index]) { + if (offsets[k] != offsets[index]) { break; } index++; @@ -1284,7 +1232,6 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { objheader_t *objheader; unsigned short *endoffsets; short *arryfields; - prefetchpile_t *head = NULL; ptr = (char *) node; ntuples = *(GET_NTUPLES(ptr)); @@ -1384,6 +1331,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) { } /* Make machine groups */ + prefetchpile_t *head = NULL; if((head = makePreGroups(node, numoffset)) == NULL) { printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__); return NULL; @@ -1468,10 +1416,6 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i /* This function is called by the thread calling transPrefetch */ void *transPrefetch(void *t) { - prefetchqelem_t *qnode; - prefetchpile_t *pilehead = NULL; - prefetchpile_t *ptr = NULL, *piletail = NULL; - while(1) { /* lock mutex of primary prefetch queue */ pthread_mutex_lock(&pqueue.qlock); @@ -1481,6 +1425,7 @@ void *transPrefetch(void *t) { } /* dequeue node to create a machine piles and finally unlock mutex */ + prefetchqelem_t *qnode; if((qnode = pre_dequeue()) == NULL) { printf("Error: No node returned %s, %d\n", __FILE__, __LINE__); pthread_mutex_unlock(&pqueue.qlock); @@ -1492,115 +1437,45 @@ void *transPrefetch(void *t) { checkPrefetchTuples(qnode); /* Check if the tuples are found locally, if yes then reduce them further*/ /* and group requests by remote machine ids by calling the makePreGroups() */ + prefetchpile_t *pilehead = NULL; if((pilehead = foundLocal(qnode)) == NULL) { printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__); pre_enqueue(qnode); continue; } - ptr = pilehead; - while(ptr != NULL) { - if(ptr->next == NULL) { - piletail = ptr; - } - ptr = ptr->next; - } - - /* Lock mutex of pool queue */ - pthread_mutex_lock(&mcqueue.qlock); - /* Update the pool queue with the new remote machine piles generated per prefetch call */ - mcpileenqueue(pilehead, piletail); - /* Broadcast signal on machine pile queue */ - pthread_cond_broadcast(&mcqueue.qcond); - /* Unlock mutex of machine pile queue */ - pthread_mutex_unlock(&mcqueue.qlock); - /* Deallocate the prefetch queue pile node */ - predealloc(qnode); - } -} - -/* Each thread in the pool of threads calls this function to establish connection with - * remote machines, send the prefetch requests and process the reponses from - * the remote machines . - * The thread is active throughout the period of runtime */ - -void *mcqProcess(void *threadid) { - int tid, i; - prefetchpile_t *mcpilenode; - struct sockaddr_in remoteAddr; - int sd; - - tid = (int) threadid; - while(1) { - - sockIdFound = 0; - /* Lock mutex of mc pile queue */ - pthread_mutex_lock(&mcqueue.qlock); - /* When mc pile queue is empty, wait */ - while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) { - pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock); - } - /* Dequeue node to send remote machine connections*/ - if((mcpilenode = mcpiledequeue()) == NULL) { - printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__); - pthread_mutex_unlock(&mcqueue.qlock); - continue; - } - /* Unlock mutex */ - pthread_mutex_unlock(&mcqueue.qlock); - - /*Initiate connection to remote host and send prefetch request */ - if(mcpilenode->mid != myIpAddr) { - /* Check to see if socket exists */ - for(i = 0; i < NUM_MACHINES; i++) { - if(midSocketArray[i].mid == mcpilenode->mid) { - sendPrefetchReq(mcpilenode, midSocketArray[i].sockid); - sockIdFound = 1; - break; - } - } + // Get sock from shared pool + int sd = -1; + if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) { + printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__); + exit(-1); + } - if(sockIdFound == 0) { - if(sockCount < NUM_MACHINES) { - /* Open Socket */ - if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__); - return; - } + /* Send Prefetch Request */ + prefetchpile_t *ptr = pilehead; + while(ptr != NULL) { + sendPrefetchReq(ptr, sd); + ptr = ptr->next; + } - bzero(&remoteAddr, sizeof(remoteAddr)); - remoteAddr.sin_family = AF_INET; - remoteAddr.sin_port = htons(LISTEN_PORT); - remoteAddr.sin_addr.s_addr = htonl(mcpilenode->mid); - - /* Open Connection */ - if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) { - printf("%s():error %d connecting to %s:%d\n", __func__, errno, - inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT); - close(sd); - return; - } + /* Release socket */ + int status; + if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) { + printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__); + return; + } - midSocketArray[sockCount].mid = mcpilenode->mid; - midSocketArray[sockCount].sockid = sd; - sendPrefetchReq(mcpilenode, midSocketArray[sockCount].sockid); - sockCount++; - } else { - //TODO Fix for connecting to more than 2 machines && close socket - printf("%s(): Error: Currently works for only 2 machines\n", __func__); - return; - } - } - } + /* Deallocated pilehead */ + mcdealloc(pilehead); - /* Deallocate the machine queue pile node */ - mcdealloc(mcpilenode); + // Deallocate the prefetch queue pile node + predealloc(qnode); } } void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { - int i, off, len, endpair, count = 0; - char machineip[16], control; + int off, len, endpair, count = 0; + char control; objpile_t *tmp; /* Send TRANS_PREFETCH control message */ @@ -1621,6 +1496,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) { off += sizeof(unsigned int); *((unsigned int *)(oidnoffset + off)) = myIpAddr; off += sizeof(unsigned int); + int i; for(i = 0; i < tmp->numoffset; i++) { *((short*)(oidnoffset + off)) = tmp->offset[i]; off+=sizeof(short); @@ -1752,7 +1628,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid) else { msg[0] = START_REMOTE_THREAD; - memcpy(&msg[1], &oid, sizeof(unsigned int)); + *((unsigned int *) &msg[1]) = oid; send_data(sock, msg, 1 + sizeof(unsigned int)); } diff --git a/Robust/src/buildscript b/Robust/src/buildscript index 7ad7dcd5..e9e5565a 100755 --- a/Robust/src/buildscript +++ b/Robust/src/buildscript @@ -238,7 +238,7 @@ $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c" if $DSMFLAG then EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME" -FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c" +FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c" fi if $RECOVERFLAG