complete hashtable implementation with separate socket pools for read and
authoradash <adash>
Mon, 7 Apr 2008 21:26:26 +0000 (21:26 +0000)
committeradash <adash>
Mon, 7 Apr 2008 21:26:26 +0000 (21:26 +0000)
prefetch.
commented out array implementation of socket pools

Robust/src/Runtime/DSTM/interface/ISSUESTOADDRESS
Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/sockpool.c
Robust/src/Runtime/DSTM/interface/sockpool.h
Robust/src/Runtime/DSTM/interface/trans.c
Robust/src/buildscript

index d90400327788e917c33dfa07387c55b3166278d3..f008b349675d87ce42477bfce4efe1c1deba1209 100644 (file)
@@ -4,20 +4,30 @@ High priority list
   A) allocations always have to traverse to end of list
   B) do we need to zero first?? -- need to check about this one, it may be okay
 
+Status:Verified
+
 1) Wrap all receive calls in loops
   A) Perhaps the best way is to just define a macro or function call that
 does this. Look at GETSIZE macro for example...
 
+Status:DONE
+
 2) Check locking... There is likely a race condition on getObjType().  
 
+Status:DONE
+
 3) Receiving object code assume a maximum object size.  It is probably
 better to:
   A) read size in.
   B) allocate space for object at its final destination
   C) read into the space
 
+Status:DONE
+
 Low priority list
 ---------------------------------
 
 1) We shouldn't call memcopy for copying fixed-sized structs or primitive
 values...just use =
+
+Status: DONE in most places
index f7478881487e7feea66a4704faa909049cf9e669..8a86ea2fb9b61101e22c0b0f64a6e52ab8a60847 100644 (file)
 #define MAX_OBJECTS  20
 //Max remote-machine connections
 #define NUM_MACHINES 2
+#define LOADFACTOR 0.5
 #define DEFAULT_OBJ_STORE_SIZE 1048510 //1MB
 //Transaction id per machine
 #define TID_LEN 20
+#define LISTEN_PORT 2156
 
 
 #include <stdlib.h>
 #include "queue.h"
 #include "mcpileq.h"
 #include "threadnotify.h"
-
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <errno.h>
+#include <time.h>
+#include "sockpool.h"
 
 //bit designations for status field of objheader
 #define DIRTY 0x01
@@ -124,6 +134,11 @@ typedef struct objstr {
        struct objstr *next;
 } objstr_t;
 
+typedef struct oidmidpair {
+    unsigned int oid;
+    unsigned int mid;
+} oidmidpair_t;
+
 typedef struct transrecord {
   objstr_t *cache;
   chashtable_t *lookupTable;
index 043fbb889549d389f0e9377fd61ee0c2b0e068d8..1f892e35f0a5fb84eb12e64d9f990b9af398e52f 100644 (file)
@@ -1,14 +1,6 @@
 /* Coordinator => Machine that initiates the transaction request call for commiting a transaction
  * Participant => Machines that host the objects involved in a transaction commit */
 
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <netdb.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <string.h>
 #include "dstm.h"
 #include "mlookup.h"
 #include "llookup.h"
 #include "thread.h"
 #endif
 
-
-#define LISTEN_PORT 2156
 #define BACKLOG 10 //max pending connections
 #define RECEIVE_BUFFER_SIZE 2048
 
 extern int classsize[];
+extern int numHostsInSystem;
 
 objstr_t *mainobjstore;
 pthread_mutex_t mainobjstore_mutex;
 pthread_mutexattr_t mainobjstore_mutex_attr; /* Attribute for lock to make it a recursive lock */
-/**********************************************************
- * Global variables to map socketid and remote mid
- * to  resuse sockets
- **************************************************/
-midSocketInfo_t sockArray[NUM_MACHINES];
-int sockCount; //number of connections with all remote machines(one socket per mc)
-int sockIdFound; //track if socket file descriptor is already established
-pthread_mutex_t sockLock = PTHREAD_MUTEX_INITIALIZER; //lock to prevent global sock variables to be inconsistent
+
+sockPoolHashTable_t *transPResponseSocketPool;
 
 /* This function initializes the main objects store and creates the 
  * global machine and location lookup table */
@@ -54,14 +39,12 @@ int dstmInit(void)
 
        if (notifyhashCreate(N_HASH_SIZE, N_LOADFACTOR))
                return 1; //failure
-       
-       //Initialize mid to socketid mapping array
-       int t;
-       sockCount = 0;
-       for(t = 0; t < NUM_MACHINES; t++) {
-               sockArray[t].mid = 0;
-               sockArray[t].sockid = 0;
-       }
+
+    //Initialize socket pool
+    if((transPResponseSocketPool = createSockPool(transPResponseSocketPool, 2*numHostsInSystem+1, LOADFACTOR)) == NULL) {
+        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
+        return 0;
+    }
 
        return 0;
 }
@@ -138,11 +121,13 @@ void *dstmAccept(void *acceptfd) {
   unsigned short objType, *versionarry, version;
   unsigned int *oidarry, numoid, mid, threadid;
   
+  /*
   transinfo.objlocked = NULL;
   transinfo.objnotfound = NULL;
   transinfo.modptr = NULL;
   transinfo.numlocked = 0;
   transinfo.numnotfound = 0;
+  */
   
   /* Receive control messages from other machines */
   while(1) {
@@ -185,6 +170,11 @@ void *dstmAccept(void *acceptfd) {
       
     case TRANS_REQUEST:
       /* Read transaction request */
+      transinfo.objlocked = NULL;
+      transinfo.objnotfound = NULL;
+      transinfo.modptr = NULL;
+      transinfo.numlocked = 0;
+      transinfo.numnotfound = 0;
       if((val = readClientReq(&transinfo, (int)acceptfd)) != 0) {
        printf("Error: In readClientReq() %s, %d\n", __FILE__, __LINE__);
        pthread_exit(NULL);
@@ -559,7 +549,7 @@ char decideCtrlMessage(fixed_data_t *fixed, trans_commit_data_t *transinfo, int
        transinfo->modptr = modptr;
        transinfo->numlocked = *(objlocked);
        transinfo->numnotfound = *(objnotfound);
-       
+
        return control;
 }
 
@@ -617,77 +607,29 @@ int transCommitProcess(void *modptr, unsigned int *oidmod, unsigned int *oidlock
 
 int prefetchReq(int acceptfd) {
        int i, size, objsize, numbytes = 0, isArray = 0, numoffset = 0;
-       int length, sd = -1;
+       int length;
        char *recvbuffer, *sendbuffer, control;
        unsigned int oid, mid;
-       short *offsetarry;
        objheader_t *header;
        struct sockaddr_in remoteAddr;
+    oidmidpair_t oidmid;
 
        do {
                recv_data((int)acceptfd, &length, sizeof(int));
                if(length != -1) {
-                       size = length - sizeof(int);
-                       if((recvbuffer = calloc(1, size)) == NULL) {
-                               printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                               return -1;
-                       }
-                       recv_data((int)acceptfd, recvbuffer, size);
-                       oid = *((unsigned int *) recvbuffer);
-                       mid = *((unsigned int *) (recvbuffer + sizeof(unsigned int)));
-                       size = size - (2 * sizeof(unsigned int));
-                       numoffset = size / sizeof(short);
-                       if((offsetarry = calloc(numoffset, sizeof(short))) == NULL) {
-                               printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                               free(recvbuffer);
-                               return -1;
-                       }
-                       memcpy(offsetarry, recvbuffer + (2 * sizeof(unsigned int)), size);
-                       free(recvbuffer);
-                       pthread_mutex_lock(&sockLock);
-                       sockIdFound = 0;
-                       pthread_mutex_unlock(&sockLock);
-            /* If socket is already established then send data reusing socket */
-                       for(i = 0; i < NUM_MACHINES; i++) {
-                               if(sockArray[i].mid == mid) {
-                                       sd = sockArray[i].sockid;
-                                       pthread_mutex_lock(&sockLock);
-                                       sockIdFound = 1;
-                                       pthread_mutex_unlock(&sockLock);
-                                       break;
-                               }
-                       }
-
-                       if(sockIdFound == 0) {
-                               if(sockCount < NUM_MACHINES) {
-                                       /* Create socket to send information */
-                                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
-                                               perror("prefetchReq():socket()");
-                                               return -1;
-                                       }
-                                       bzero(&remoteAddr, sizeof(remoteAddr));
-                                       remoteAddr.sin_family = AF_INET;
-                                       remoteAddr.sin_port = htons(LISTEN_PORT);
-                                       remoteAddr.sin_addr.s_addr = htonl(mid);
-
-                                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0){
-                                               perror("connect");
-                                               printf("Error: prefetchReq():error %d connecting to %s:%d\n", errno,
-                                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                                               close(sd);
-                                               return -1;
-                                       }
-                                       sockArray[sockCount].mid = mid;
-                                       sockArray[sockCount].sockid = sd;
-                                       pthread_mutex_lock(&sockLock);
-                                       sockCount++;
-                                       pthread_mutex_unlock(&sockLock);
-                               } else {
-                                       //TODO Fix for connecting to more than 2 machines && close socket
-                                       printf("%s(): Error: Currently works for only 2 machines\n", __func__);
-                                       return -1;
-                               }
-                       }
+            recv_data((int)acceptfd, &oidmid, 2*sizeof(unsigned int));
+            oid = oidmid.oid;
+            mid = oidmid.mid;
+            size = length - sizeof(int) - (2 * sizeof(unsigned int));
+            numoffset = size/sizeof(short);
+            short offsetarry[numoffset];
+            recv_data((int) acceptfd, offsetarry, size);
+
+            int sd = -1;
+            if((sd = getSock(transPResponseSocketPool, mid)) == -1) {
+                printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+                exit(-1);
+            }
 
                        /*Process each oid */
                        if ((header = mhashSearch(oid)) == NULL) {/* Obj not found */
@@ -695,7 +637,6 @@ int prefetchReq(int acceptfd) {
                                size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
                                if((sendbuffer = calloc(1, size)) == NULL) {
                                        printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                       free(offsetarry);
                                        close(sd);
                                        return -1;
                                }
@@ -705,7 +646,6 @@ int prefetchReq(int acceptfd) {
 
                                control = TRANS_PREFETCH_RESPONSE;
                                if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                       free(offsetarry);
                                        printf("Error: %s() in sending prefetch response at %s, %d\n",
                                                        __func__, __FILE__, __LINE__);
                                        close(sd);
@@ -717,7 +657,6 @@ int prefetchReq(int acceptfd) {
                                size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
                                if((sendbuffer = calloc(1, size)) == NULL) {
                                        printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                       free(offsetarry);
                                        close(sd);
                                        return -1;
                                }
@@ -731,7 +670,6 @@ int prefetchReq(int acceptfd) {
 
                                control = TRANS_PREFETCH_RESPONSE;
                                if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                       free(offsetarry);
                                        printf("Error: %s() in sending prefetch response at %s, %d\n",
                                                        __func__, __FILE__, __LINE__);
                                        close(sd);
@@ -761,7 +699,6 @@ int prefetchReq(int acceptfd) {
                                                size = sizeof(int) + sizeof(char) + sizeof(unsigned int) ;
                                                if((sendbuffer = calloc(1, size)) == NULL) {
                                                        printf("Calloc error at %s,%d\n", __FILE__, __LINE__);
-                                                       free(offsetarry);
                                                        close(sd);
                                                        return -1;
                                                }
@@ -771,7 +708,6 @@ int prefetchReq(int acceptfd) {
 
                                                control = TRANS_PREFETCH_RESPONSE;
                                                if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                                       free(offsetarry);
                                                        printf("Error: %s() in sending prefetch response at %s, %d\n",
                                                                        __FILE__, __LINE__);
                                                        close(sd);
@@ -784,7 +720,6 @@ int prefetchReq(int acceptfd) {
                                                size = sizeof(int) + sizeof(char) + sizeof(unsigned int) + sizeof(objheader_t) + objsize;
                                                if((sendbuffer = calloc(1, size)) == NULL) {
                                                        printf("Calloc error at %s,%d\n", __func__, __FILE__, __LINE__);
-                                                       free(offsetarry);
                                                        close(sd);
                                                        return -1;
                                                }
@@ -798,7 +733,6 @@ int prefetchReq(int acceptfd) {
 
                                                control = TRANS_PREFETCH_RESPONSE;
                                                if(sendPrefetchResponse(sd, &control, sendbuffer, &size) != 0) {
-                                                       free(offsetarry);
                                                        printf("Error: %s() in sending prefetch response at %s, %d\n",
                                                                        __func__, __FILE__, __LINE__);
                                                        close(sd);
@@ -807,8 +741,14 @@ int prefetchReq(int acceptfd) {
                                        }
                                        isArray = 0;
                                }
-                               free(offsetarry);
                        }
+
+            //Release socket
+            int status;
+            if((status = freeSock(transPResponseSocketPool, mid, sd)) == -1) {
+                printf("Error: in releasing socket at %s line %d\n", __FILE__, __LINE__);
+                return -1;
+            }
                }
        } while (length != -1);
        return 0;
index 1a0ef84ae5a9d4afba295f7f64989f443dbb9156..78a4e2264d9b127c16d4555ff85588ee1f9a65d2 100644 (file)
@@ -1,7 +1,5 @@
 #include "sockpool.h"
 
-sockPoolHashTable_t sockhash;
-
 inline int CompareAndSwap(int *a, int oldval, int newval) {
     int temp = *a;
     if (temp == oldval) {
@@ -25,19 +23,27 @@ inline void UnLock(SpinLock *s) {
     *s = 0;
 }
 
-int createSockPool(unsigned int size, float loadfactor) {
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t * sockhash, unsigned int size, float loadfactor) {
+    if((sockhash = calloc(1, sizeof(sockPoolHashTable_t))) == NULL) {
+        printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
+        return NULL;
+    }
+
     socknode_t **nodelist;
     if ((nodelist = calloc(size, sizeof(socknode_t *))) < 0) {
         printf("Calloc error at %s line %d\n", __FILE__, __LINE__);
-        return -1;
+        free(sockhash);
+        return NULL;
     }
-    sockhash.table = nodelist;
-    sockhash.inuse = NULL;
-    sockhash.size = size;
-    sockhash.numelements = 0;
-    sockhash.loadfactor = loadfactor;
-    InitLock(&sockhash.mylock);
-    return 0;
+
+    sockhash->table = nodelist;
+    sockhash->inuse = NULL;
+    sockhash->size = size;
+    sockhash->numelements = 0;
+    sockhash->loadfactor = loadfactor;
+    InitLock(&sockhash->mylock);
+
+    return sockhash;
 }
 
 int createNewSocket(unsigned int mid) {
@@ -59,48 +65,95 @@ int createNewSocket(unsigned int mid) {
     return sd;
 }
 
-int getSock(unsigned int mid) {
-    int key = mid%(sockhash.size);
+int getSock(sockPoolHashTable_t *sockhash, unsigned int mid) {
+    int key = mid%(sockhash->size);
+
+    if (sockhash->table[key] == NULL) {
+        int sd;
+        if((sd = createNewSocket(mid)) != -1) {
+            socknode_t *inusenode = calloc(1, sizeof(socknode_t));
+            inusenode->mid = mid; 
+            inusenode->sd = sd; 
+            insToList(sockhash, inusenode);
+            return sd;
+        } else {
+            return -1;
+        }
+    }
+
+    int midFound = 0;
+    socknode_t *ptr = sockhash->table[key];
+    socknode_t *prev = (socknode_t *) &(sockhash->table[key]);
+    while (ptr != NULL) {
+        if (mid == ptr->mid) {
+            midFound = 1;
+            int sd = ptr->sd;
+            prev = ptr->next;
+            insToList(sockhash, ptr);
+            return sd;
+        }
+        prev = ptr;
+        ptr = ptr->next;
+    }
+
+    if(midFound == 0) {
+        int sd;
+        if((sd = createNewSocket(mid)) != -1) {
+            socknode_t *inusenode = calloc(1, sizeof(socknode_t));
+            inusenode->mid = mid; 
+            inusenode->sd = sd; 
+            insToList(sockhash, inusenode);
+            return sd;
+        } else {
+            return -1;
+        }
+    }
+    return -1;
+}
+
+int getSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid) {
+    int key = mid%(sockhash->size);
 
-    Lock(&sockhash.mylock);
-    if (sockhash.table[key] == NULL) {
-        UnLock(&sockhash.mylock);
+    Lock(&sockhash->mylock);
+    if (sockhash->table[key] == NULL) {
+        UnLock(&sockhash->mylock);
         int sd;
         if((sd = createNewSocket(mid)) != -1) {
             socknode_t *inusenode = calloc(1, sizeof(socknode_t));
             inusenode->mid = mid; 
             inusenode->sd = sd; 
-            insToList(inusenode);
+            insToListWithLock(sockhash, inusenode);
             return sd;
         } else {
             return -1;
         }
     }
-    UnLock(&sockhash.mylock);
+    UnLock(&sockhash->mylock);
     int midFound = 0;
-    Lock(&sockhash.mylock);
-    socknode_t *ptr = sockhash.table[key];
-    socknode_t *prev = (socknode_t *) &(sockhash.table[key]);
+    Lock(&sockhash->mylock);
+    socknode_t *ptr = sockhash->table[key];
+    socknode_t *prev = (socknode_t *) &(sockhash->table[key]);
     while (ptr != NULL) {
         if (mid == ptr->mid) {
             midFound = 1;
             int sd = ptr->sd;
             prev = ptr->next;
-            UnLock(&sockhash.mylock);
-            insToList(ptr);
+            UnLock(&sockhash->mylock);
+            insToListWithLock(sockhash, ptr);
             return sd;
         }
         prev = ptr;
         ptr = ptr->next;
     }
-    UnLock(&sockhash.mylock);
+    UnLock(&sockhash->mylock);
+
     if(midFound == 0) {
         int sd;
         if((sd = createNewSocket(mid)) != -1) {
             socknode_t *inusenode = calloc(1, sizeof(socknode_t));
             inusenode->mid = mid; 
             inusenode->sd = sd; 
-            insToList(inusenode);
+            insToListWithLock(sockhash, inusenode);
             return sd;
         } else {
             return -1;
@@ -109,25 +162,138 @@ int getSock(unsigned int mid) {
     return -1;
 }
 
-void insToList(socknode_t *inusenode) {
-    Lock(&sockhash.mylock);
-    inusenode->next = sockhash.inuse;
-    sockhash.inuse = inusenode;
-    UnLock(&sockhash.mylock);
+void insToList(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
+    inusenode->next = sockhash->inuse;
+    sockhash->inuse = inusenode;
+} 
+
+void insToListWithLock(sockPoolHashTable_t *sockhash, socknode_t *inusenode) {
+    Lock(&sockhash->mylock);
+    inusenode->next = sockhash->inuse;
+    sockhash->inuse = inusenode;
+    UnLock(&sockhash->mylock);
 } 
 
-int freeSock(unsigned int mid, int sd) {
-    if(sockhash.inuse != NULL) {
-        Lock(&sockhash.mylock);
-        socknode_t *ptr = sockhash.inuse; 
+int freeSock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
+    if(sockhash->inuse != NULL) {
+        socknode_t *ptr = sockhash->inuse; 
+        ptr->mid = mid;
+        ptr->sd = sd;
+        sockhash->inuse = ptr->next;
+        int key = mid%(sockhash->size);
+        ptr->next = sockhash->table[key];
+        sockhash->table[key] = ptr;
+        return 0;
+    }
+    return -1;
+}
+
+int freeSockWithLock(sockPoolHashTable_t *sockhash, unsigned int mid, int sd) {
+    if(sockhash->inuse != NULL) {
+        Lock(&sockhash->mylock);
+        socknode_t *ptr = sockhash->inuse; 
         ptr->mid = mid;
         ptr->sd = sd;
-        sockhash.inuse = ptr->next;
-        int key = mid%(sockhash.size);
-        ptr->next = sockhash.table[key];
-        sockhash.table[key] = ptr;
-        UnLock(&sockhash.mylock);
+        sockhash->inuse = ptr->next;
+        int key = mid%(sockhash->size);
+        ptr->next = sockhash->table[key];
+        sockhash->table[key] = ptr;
+        UnLock(&sockhash->mylock);
         return 0;
     }
     return -1;
 }
+
+#if 0
+/ ***************************************/
+* Array Implementation for socket resuse 
+* ***************************************/
+
+int num_machines;
+
+sock_pool_t *initSockPool(unsigned int *mid, int machines) {
+    sock_pool_t *sockpool;
+    num_machines = machines;
+    if ((sockpool = calloc(num_machines, sizeof(sock_pool_t))) < 0) {
+        printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+        return NULL;
+    }
+    int i;
+    for (i = 0; i < num_machines; i++) {
+        if ((sockpool[i].sd = calloc(MAX_CONN_PER_MACHINE, sizeof(int))) < 0) {
+            printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+            return NULL;
+        }
+        if ((sockpool[i].inuse = calloc(MAX_CONN_PER_MACHINE, sizeof(char))) < 0) {
+            printf("%s(), Calloc error at %s, line %d\n", __func__, __FILE__, __LINE__);
+            return NULL;
+        }
+        sockpool[i].mid = mid[i];
+        int j;
+        for(j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+            sockpool[i].sd[j] = -1;
+        }
+    }
+
+    return sockpool;
+}
+
+int getSock(sock_pool_t *sockpool, unsigned int mid) {
+    int i;
+    for (i = 0; i < num_machines; i++) {
+        if (sockpool[i].mid == mid) {
+            int j;
+            for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+                if (sockpool[i].sd[j] != -1 && (sockpool[i].inuse[j] == 0)) {
+                    sockpool[i].inuse[j] = 1;
+                    return sockpool[i].sd[j];
+                }
+                if (sockpool[i].sd[j] == -1) {
+                    //Open Connection
+                    int sd;
+                    if((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+                        printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
+                        return -1;
+                    }
+                    struct sockaddr_in remoteAddr;
+                    bzero(&remoteAddr, sizeof(remoteAddr));
+                    remoteAddr.sin_family = AF_INET;
+                    remoteAddr.sin_port = htons(LISTEN_PORT);
+                    remoteAddr.sin_addr.s_addr = htonl(mid);
+
+                    if(connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
+                        printf("%s(): Error %d connecting to %s:%d\n", __func__, errno, inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+                        close(sd);
+                        return -1;
+                    }
+                    sockpool[i].sd[j] = sd;
+                    sockpool[i].inuse[j] = 1;
+                    return sockpool[i].sd[j];
+                }
+            }
+            printf("%s()->Error: Less number of MAX_CONN_PER_MACHINE\n", __func__);
+            return -1;
+        }
+    }
+    printf("%s()-> Error: Machine id not found\n", __func__);
+
+    return -1;
+}
+
+int freeSock(sock_pool_t *sockpool, int sd) {
+    int i;
+    for (i = 0; i < num_machines; i++) {
+        int j;
+        for (j = 0; j < MAX_CONN_PER_MACHINE; j++) {
+            if (sockpool[i].sd[j] == sd) {
+                sockpool[i].inuse[j] = 0;
+                return 0;
+            }
+        }
+    }
+    printf("%s() Error: Illegal socket descriptor %d\n", __func__, sd);
+
+    return -1;
+}
+
+#endif
index 654d03f901498e85528accdf8b4726320267f6c5..1e868f3dec3882c234c510c69bd6cc38f7fea96e 100644 (file)
@@ -3,10 +3,7 @@
 
 #include "dstm.h"
 
-#define LOADFACTOR 0.5
-
 typedef int SpinLock;
-
 typedef struct socknode {
     int sd;
     unsigned int mid;
@@ -22,15 +19,33 @@ typedef struct sockPoolHashTable {
     SpinLock mylock;
 } sockPoolHashTable_t;
 
-int createSockPool(unsigned int, float);
-int getSock(unsigned int);
-int freeSock(unsigned int, int);
-int deleteSockpool(sockPoolHashTable_t *);
-void insToList(socknode_t *);
+sockPoolHashTable_t *createSockPool(sockPoolHashTable_t *, unsigned int, float);
+int getSock(sockPoolHashTable_t *, unsigned int);
+int getSockWithLock(sockPoolHashTable_t *, unsigned int);
+int freeSock(sockPoolHashTable_t *, unsigned int, int);
+int freeSockWithLock(sockPoolHashTable_t *, unsigned int, int);
+void insToList(sockPoolHashTable_t *, socknode_t *);
+void insToListWithLock(sockPoolHashTable_t *, socknode_t *);
 int createNewSocket(unsigned int);
 int CompareAndSwap(int *, int, int);
 void InitLock(SpinLock *);
 void Lock (SpinLock *);
 void UnLock (SpinLock *);
 
+#if 0
+/************************************************
+ * Array Implementation data structures 
+ ***********************************************/
+#define MAX_CONN_PER_MACHINE    10
+typedef struct sock_pool {
+    unsigned int mid;
+    int *sd;
+    char *inuse;
+} sock_pool_t;
+
+sock_pool_t *initSockPool(unsigned int *, int);
+int getSock(sock_pool_t *, unsigned int);
+int freeSock(sock_pool_t *, int);
+#endif
+
 #endif
index 66e524baed591614eef369ca87a9912efb429cf2..99e6356117ef19920ba791337a4ee86e0f73e95a 100644 (file)
@@ -8,21 +8,10 @@
 #include "prelookup.h"
 #include "threadnotify.h"
 #include "queue.h"
-#include <pthread.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <sys/types.h>
-#include <unistd.h>
-#include <errno.h>
-#include <time.h>
-#include <string.h>
 #ifdef COMPILER
 #include "thread.h"
 #endif
 
-#define LISTEN_PORT 2156
 #define NUM_THREADS 1
 #define PREFETCH_CACHE_SIZE 1048576 //1MB
 #define CONFIG_FILENAME "dstm.conf"
@@ -30,7 +19,6 @@
 /* Global Variables */
 extern int classsize[];
 extern primarypfq_t pqueue; //Shared prefetch queue
-extern mcpileq_t mcqueue;  //Shared queue containing prefetch requests sorted by remote machineids 
 objstr_t *prefetchcache; //Global Prefetch cache
 pthread_mutex_t prefetchcache_mutex;// Mutex to lock Prefetch Cache
 pthread_mutexattr_t prefetchcache_mutex_attr; /* Attribute for lock to make it a recursive lock */
@@ -48,16 +36,8 @@ unsigned int oidsPerBlock;
 unsigned int oidMin;
 unsigned int oidMax;
 
-/************************************************************************
- * Global variables to map socketid and remote mid to
- * reuse sockets for sending prefetches and making remote read requests
- ************************************************************************/
-midSocketInfo_t midSocketArray[NUM_MACHINES];
-int sockCount;                 //number of connections with all remote machines(one socket per mc)
-int sockIdFound;       //track if socket file descriptor is already established 
-midSocketInfo_t sockArrayRemoteRead[NUM_MACHINES];
-int sockCountRemoteRead;               //number of connections with all remote machines(one socket per mc)
-int sockIdFoundRemoteRead;     //track if socket file descriptor is already established 
+sockPoolHashTable_t *transReadSockPool;
+sockPoolHashTable_t *transPrefetchSockPool;
 
 void printhex(unsigned char *, int);
 plistnode_t *createPiles(transrecord_t *);
@@ -194,11 +174,19 @@ int dstmStartup(const char * option) {
          threadcount--;
 #endif
 
+    //Initialize socket pool
+    if((transReadSockPool = createSockPool(transReadSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
+        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
+        return 0;
+    }
+    if((transPrefetchSockPool = createSockPool(transPrefetchSockPool, 2*numHostsInSystem+1, 0.5)) == NULL) {
+        printf("Error in creating new socket pool at  %s line %d\n", __FILE__, __LINE__);
+        return 0;
+    }
+
        dstmInit();
        transInit();
 
-
-
        if (master) {
                pthread_attr_init(&attr);
                pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
@@ -268,22 +256,6 @@ void transInit() {
          retval=pthread_create(&tPrefetch, NULL, transPrefetch, NULL);
        } while(retval!=0);
        pthread_detach(tPrefetch);
-
-       //Initialize mid to socketid mapping array
-       sockCount = 0;
-       for(t = 0; t < NUM_MACHINES; t++) {
-               midSocketArray[t].mid = 0;
-               midSocketArray[t].sockid = 0;
-       }
-
-       //Create and Initialize a pool of threads 
-       /* Threads are active for the entire period runtime is running */
-       for(t = 0; t< NUM_THREADS; t++) {
-         do {
-               rc = pthread_create(&wthreads[t], NULL, mcqProcess, (void *)t);
-         } while(rc!=0);
-         pthread_detach(wthreads[t]);
-       }
 }
 
 /* This function stops the threads spawned */
@@ -833,8 +805,8 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
        control = *(tdata->replyctrl);
        send_data(sd, &control, sizeof(char));
 
-       //TODO read missing objects to be used during object migration
-       /* If the decided response is due to a soft abort and missing objects at the Participant's side */
+       //TODO read missing objects during object migration
+       /* If response is a soft abort due to missing objects at the Participant's side */
        /*
        if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
                // Read list of objects missing  
@@ -869,48 +841,17 @@ char sendResponse(thread_data_array_t *tdata, int sd) {
  * */ 
 
 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
-       int sd, size, val;
+       int size, val;
        struct sockaddr_in serv_addr;
        char machineip[16];
        char control;
        objheader_t *h;
-       void *objcopy;
+       void *objcopy = NULL;
 
-    int i;
-    for(i = 0; i < NUM_MACHINES; i++) {
-        if(sockArrayRemoteRead[i].mid == mnum) {
-            sd = sockArrayRemoteRead[i].sockid;
-            sockIdFoundRemoteRead = 1;
-            break;
-        }
-    }
-    
-    if(sockIdFoundRemoteRead == 0) {
-        if(sockCountRemoteRead < NUM_MACHINES) {
-            /* Create socket */
-            if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-                perror("Error in socket\n");
-                return NULL;
-            }
-
-            bzero((char*) &serv_addr, sizeof(serv_addr));
-            serv_addr.sin_family = AF_INET;
-            serv_addr.sin_port = htons(LISTEN_PORT);
-            serv_addr.sin_addr.s_addr = htonl(mnum);
-            // Open connection 
-            if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
-                perror("getRemoteObj() Error in connect\n");
-                close(sd);
-                return NULL;
-            }
-            sockArrayRemoteRead[sockCountRemoteRead].mid = mnum;
-            sockArrayRemoteRead[sockCountRemoteRead].sockid = sd;
-            sockCountRemoteRead++;
-        } else {
-            //TODO Fix for connecting to more than 2 machines && close socket
-            printf("%s(): Error: Currently works for two remote machines\n", __func__);
-            return NULL;
-        }
+    int sd;
+    if((sd = getSock(transReadSockPool, mnum)) == -1) {
+        printf("%s(): Error: no socket id in the pool of sockets at %s, %d\n", __func__, __FILE__, __LINE__);
+        return NULL;
     }
     
        char readrequest[sizeof(char)+sizeof(unsigned int)];
@@ -923,7 +864,8 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
 
        switch(control) {
                case OBJECT_NOT_FOUND:
-                       return NULL;
+            objcopy = NULL;
+            break;
                case OBJECT_FOUND:
                        /* Read object if found into local cache */
                        recv_data(sd, &size, sizeof(int));
@@ -935,9 +877,15 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
                        break;
                default:
                        printf("Error: in recv response from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);
-                       return NULL;
+            break;
        }
 
+    int status;
+    if((status = freeSock(transReadSockPool, mnum, sd)) == -1) {
+        printf("Error in releasing socket at %s line %d\n", __FILE__, __LINE__);
+        return NULL;
+    }
+
        return objcopy;
 }
 
@@ -1160,14 +1108,14 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
        int ntuples, slength;
        unsigned int *oid;
        unsigned short *endoffsets;
-       short *arryfields; 
+       short *offsets; 
 
        /* Check for the case x.y.z and a.b.c are same oids */ 
        ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
        oid = GET_PTR_OID(ptr);
        endoffsets = GET_PTR_EOFF(ptr, ntuples); 
-       arryfields = GET_PTR_ARRYFLD(ptr, ntuples);
+       offsets = GET_PTR_ARRYFLD(ptr, ntuples);
        
        /* Find offset length for each tuple */
        int numoffset[ntuples];
@@ -1205,7 +1153,7 @@ void checkPrefetchTuples(prefetchqelem_t *node) {
                                }
                                index = endoffsets[j -1];
                                for(count = 0; count < slength; count ++) {
-                                       if (arryfields[k] != arryfields[index]) { 
+                                       if (offsets[k] != offsets[index]) { 
                                                break;
                                        }
                                        index++;
@@ -1284,7 +1232,6 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        objheader_t *objheader;
        unsigned short *endoffsets;
        short *arryfields; 
-       prefetchpile_t *head = NULL;
 
        ptr = (char *) node;
        ntuples = *(GET_NTUPLES(ptr));
@@ -1384,6 +1331,7 @@ prefetchpile_t *foundLocal(prefetchqelem_t *node) {
        }
        
        /* Make machine groups */
+       prefetchpile_t *head = NULL;
        if((head = makePreGroups(node, numoffset)) == NULL) {
                printf("Error in makePreGroups() %s %d\n", __FILE__, __LINE__);
                return NULL;
@@ -1468,10 +1416,6 @@ void checkPreCache(prefetchqelem_t *node, int *numoffset, unsigned int objoid, i
 
 /* This function is called by the thread calling transPrefetch */
 void *transPrefetch(void *t) {
-       prefetchqelem_t *qnode;
-       prefetchpile_t *pilehead = NULL;
-       prefetchpile_t *ptr = NULL, *piletail = NULL;
-
        while(1) {
                /* lock mutex of primary prefetch queue */
                pthread_mutex_lock(&pqueue.qlock);
@@ -1481,6 +1425,7 @@ void *transPrefetch(void *t) {
                }
 
                /* dequeue node to create a machine piles and  finally unlock mutex */
+        prefetchqelem_t *qnode;
                if((qnode = pre_dequeue()) == NULL) {
                        printf("Error: No node returned %s, %d\n", __FILE__, __LINE__);
                        pthread_mutex_unlock(&pqueue.qlock);
@@ -1492,115 +1437,45 @@ void *transPrefetch(void *t) {
                checkPrefetchTuples(qnode);
                /* Check if the tuples are found locally, if yes then reduce them further*/ 
                /* and group requests by remote machine ids by calling the makePreGroups() */
+        prefetchpile_t *pilehead = NULL;
                if((pilehead = foundLocal(qnode)) == NULL) {
                        printf("Error: No node created for serving prefetch request %s %d\n", __FILE__, __LINE__);
                        pre_enqueue(qnode);
                        continue;
                }
 
-               ptr = pilehead;
-               while(ptr != NULL) {
-                       if(ptr->next == NULL) {
-                               piletail = ptr;
-                       } 
-                       ptr = ptr->next;
-               }
-
-               /* Lock mutex of pool queue */
-               pthread_mutex_lock(&mcqueue.qlock);
-               /* Update the pool queue with the new remote machine piles generated per prefetch call */
-               mcpileenqueue(pilehead, piletail);
-               /* Broadcast signal on machine pile queue */
-               pthread_cond_broadcast(&mcqueue.qcond);
-               /* Unlock mutex of  machine pile queue */
-               pthread_mutex_unlock(&mcqueue.qlock);
-               /* Deallocate the prefetch queue pile node */
-               predealloc(qnode);
-       }
-}
-
-/* Each thread in the  pool of threads calls this function to establish connection with
- * remote machines, send the prefetch requests and process the reponses from
- * the remote machines .
- * The thread is active throughout the period of runtime */
-
-void *mcqProcess(void *threadid) {
-       int tid, i;
-       prefetchpile_t *mcpilenode;
-       struct sockaddr_in remoteAddr;
-       int sd;
-
-       tid = (int) threadid;
-       while(1) {
-
-               sockIdFound = 0;
-               /* Lock mutex of mc pile queue */
-               pthread_mutex_lock(&mcqueue.qlock);
-               /* When mc pile queue is empty, wait */
-               while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
-                       pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
-               }
-               /* Dequeue node to send remote machine connections*/
-               if((mcpilenode = mcpiledequeue()) == NULL) {
-                       printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
-                       pthread_mutex_unlock(&mcqueue.qlock);
-                       continue;
-               }
-               /* Unlock mutex */
-               pthread_mutex_unlock(&mcqueue.qlock);
-
-               /*Initiate connection to remote host and send prefetch request */ 
-               if(mcpilenode->mid != myIpAddr) {
-                       /* Check to see if socket exists */
-                       for(i = 0; i < NUM_MACHINES; i++) {
-                               if(midSocketArray[i].mid == mcpilenode->mid) {
-                                       sendPrefetchReq(mcpilenode, midSocketArray[i].sockid);
-                                       sockIdFound = 1;
-                                       break;
-                               }
-                       }
+        // Get sock from shared pool 
+        int sd = -1;
+        if((sd = getSock(transPrefetchSockPool, pilehead->mid)) == -1) {
+            printf("Error: No socket id in pool of sockets at %s line %d\n", __FILE__, __LINE__);
+            exit(-1);
+        }
 
-                       if(sockIdFound == 0) {
-                               if(sockCount < NUM_MACHINES) {
-                                       /* Open Socket */
-                                       if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
-                                               printf("%s() Error: In creating socket at %s, %d\n", __func__, __FILE__, __LINE__);
-                                               return;
-                                       }
+        /* Send  Prefetch Request */
+        prefetchpile_t *ptr = pilehead;
+        while(ptr != NULL) {
+            sendPrefetchReq(ptr, sd);
+            ptr = ptr->next; 
+        }
 
-                                       bzero(&remoteAddr, sizeof(remoteAddr));
-                                       remoteAddr.sin_family = AF_INET;
-                                       remoteAddr.sin_port = htons(LISTEN_PORT);
-                                       remoteAddr.sin_addr.s_addr = htonl(mcpilenode->mid);
-
-                                       /* Open Connection */
-                                       if (connect(sd, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0) {
-                                               printf("%s():error %d connecting to %s:%d\n", __func__, errno,
-                                                               inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
-                                               close(sd);
-                                               return;
-                                       }
+        /* Release socket */
+        int status;
+        if((status = freeSock(transPrefetchSockPool, pilehead->mid, sd)) == -1) {
+            printf("Error: In realeasing socket at %s line %d\n", __FILE__, __LINE__);
+            return;
+        }
 
-                                       midSocketArray[sockCount].mid = mcpilenode->mid;
-                                       midSocketArray[sockCount].sockid = sd;
-                                       sendPrefetchReq(mcpilenode, midSocketArray[sockCount].sockid);
-                                       sockCount++;
-                               } else {
-                                       //TODO Fix for connecting to more than 2 machines && close socket
-                                       printf("%s(): Error: Currently works for only 2 machines\n", __func__);
-                                       return;
-                               }
-                       }
-               }
+        /* Deallocated pilehead */
+        mcdealloc(pilehead);
 
-               /* Deallocate the machine queue pile node */
-               mcdealloc(mcpilenode);
+               // Deallocate the prefetch queue pile node
+               predealloc(qnode);
        }
 }
 
 void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
-       int i, off, len, endpair, count = 0;
-       char machineip[16], control;
+       int off, len, endpair, count = 0;
+       char control;
        objpile_t *tmp;
 
        /* Send TRANS_PREFETCH control message */
@@ -1621,6 +1496,7 @@ void sendPrefetchReq(prefetchpile_t *mcpilenode, int sd) {
                off += sizeof(unsigned int);
                *((unsigned int *)(oidnoffset + off)) = myIpAddr; 
                off += sizeof(unsigned int);
+        int i;
                for(i = 0; i < tmp->numoffset; i++) {
                        *((short*)(oidnoffset + off)) = tmp->offset[i];
                        off+=sizeof(short);
@@ -1752,7 +1628,7 @@ int startRemoteThread(unsigned int oid, unsigned int mid)
        else
        {
                msg[0] = START_REMOTE_THREAD;
-               memcpy(&msg[1], &oid, sizeof(unsigned int));
+        *((unsigned int *) &msg[1]) = oid;
                send_data(sock, msg, 1 + sizeof(unsigned int));
        }
 
index 7ad7dcd5c998f9782f6bd73d2b940fc443cc00bf..e9e5565a9130df4a5028e674f0492ea60992313d 100755 (executable)
@@ -238,7 +238,7 @@ $ROBUSTROOT/Runtime/GenericHashtable.c $ROBUSTROOT/Runtime/object.c"
 if $DSMFLAG
 then
 EXTRAOPTIONS="$EXTRAOPTIONS -lpthread -DCOMPILER -DDSTM -I$DSMRUNTIME"
-FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c"
+FILES="$FILES $DSMRUNTIME/trans.c $DSMRUNTIME/mcpileq.c $DSMRUNTIME/objstr.c $DSMRUNTIME/dstm.c $DSMRUNTIME/mlookup.c $DSMRUNTIME/clookup.c $DSMRUNTIME/llookup.c $DSMRUNTIME/threadnotify.c $DSMRUNTIME/dstmserver.c $DSMRUNTIME/plookup.c $DSMRUNTIME/ip.c $DSMRUNTIME/queue.c $DSMRUNTIME/prelookup.c $DSMRUNTIME/machinepile.c $DSMRUNTIME/localobjects.c $ROBUSTROOT/Runtime/thread.c $DSMRUNTIME/sockpool.c" 
 fi
 
 if $RECOVERFLAG