forgot file
authoradash <adash>
Tue, 7 Aug 2007 21:59:15 +0000 (21:59 +0000)
committeradash <adash>
Tue, 7 Aug 2007 21:59:15 +0000 (21:59 +0000)
add deallocation of node code
complete transRead calls
TODO: modify memcpy and take care of buffer overflow

Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/machinepile.c
Robust/src/Runtime/DSTM/interface/machinepile.h
Robust/src/Runtime/DSTM/interface/mcpileq.c
Robust/src/Runtime/DSTM/interface/mcpileq.h
Robust/src/Runtime/DSTM/interface/prefetchpile.c [new file with mode: 0644]
Robust/src/Runtime/DSTM/interface/queue.c
Robust/src/Runtime/DSTM/interface/queue.h
Robust/src/Runtime/DSTM/interface/trans.c

index 33cbd581ba2fcdca9f80b273716bfbf8bd1be1b6..7dee7f7379216f89ba546ace59f6024d5e659d64 100644 (file)
@@ -619,7 +619,7 @@ int prefetchReq(int acceptfd) {
                        }
                }
 
-               /* Send the buffer size */
+               /* Add the buffer size into buffer as a parameter */
                memcpy(buffer, &index, sizeof(unsigned int));
                /* Send the entire buffer with its size and oids found and not found */
                if(send((int)acceptfd, &buffer, sizeof(index - 1), MSG_NOSIGNAL) < sizeof(index -1)) {
index 4e8dc6a54a6dc7ec103d04031afc924bdfbfa413..9b5c416f4975d0b5720bfabff8a39ac5b778177b 100644 (file)
@@ -46,6 +46,8 @@ int insertPile(int mid, unsigned int oid, short numoffset, short *offset, prefet
        return 0;
 }
 
+//TODO
 int deletePile() {
 
+       return 0;
 }
index b8ca3d690c87223c33b5a3ba9b628e4d64646e68..70fd47fb331083f2f2f14fa86b44cc11cf0ac690 100644 (file)
@@ -6,5 +6,6 @@
 #include <stdlib.h>
 
 int insertPile(int, unsigned int, short, short *, prefetchpile_t *);
+int deletePile();
 
 #endif
index e5ec4a84d7c27c337e10937d4e32212d84b9bb02..27081a0890fa302f89492ba384e2e23753e87c08 100644 (file)
@@ -92,9 +92,16 @@ void mcpiledisplay() {
        }
 }
 
-
-
-
-
-
-
+void mcdealloc(prefetchpile_t *node) {
+       /* Remove the offset ptr and linked lists of objpile_t */
+       objpile_t *delnode;
+       while(node->objpiles != NULL) {
+               node->objpiles->offset = NULL;
+               delnode = node->objpiles;
+               node->objpiles = node->objpiles->next;
+               free(delnode);
+               node->objpiles->next = NULL;
+       }
+       free(node);
+       node->next = NULL;
+}
index 7add8debc3ba7c524d3b36476c940ad935df2900..93e206301e7c3a9925628db8eb07e60fcc400117 100644 (file)
@@ -33,5 +33,6 @@ prefetchpile_t *mcpiledequeue(void);
 void delnode();
 void mcpiledelete();
 void mcpiledisplay();
+void mcdealloc(prefetchpile_t *);
 
 #endif
diff --git a/Robust/src/Runtime/DSTM/interface/prefetchpile.c b/Robust/src/Runtime/DSTM/interface/prefetchpile.c
new file mode 100644 (file)
index 0000000..9f87bdb
--- /dev/null
@@ -0,0 +1,99 @@
+#include "dstm.h"
+
+/* Make a queue of prefetchpile_t type */
+prefetchpile_t poolqueue; //Global queue for machine piles
+
+/* Create new machine group */
+prefetchpile_t *createPile(int numoffsets) {
+       prefetchpile_t *pile;
+       if((pile = calloc(1, sizeof(prefetchpile_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+
+       /* Create a new object pile */
+       if((pile->objpiles = calloc(1, sizeof(objpile_t))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+
+       /* Create a ptr to the offset array for a given prefetch oid tuple */
+       if((pile->objpiles->offset = calloc(numoffsets, sizeof(short))) == NULL) {
+               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+               return NULL;
+       }
+       
+       pile->next = NULL;
+
+       return pile;
+}
+
+/* Into into prefetch pile*/
+void pileIns(prefetchpile_t *pile, short *endoffsets, short* arryfields, unsigned int *oid,int mnum,int noffsets, int index) {
+       prefetchpile_t *tmp, *ptr;
+       objpile_t *opile;
+       short *offsetarry;
+       int found = 0, k;
+
+       tmp = pile;
+       while(tmp != NULL) {
+               //Check if mnum already exists in the pile
+               if(tmp->mid == mnum) {
+                       /* Create a new object pile */
+                       if((opile = calloc(1, sizeof(objpile_t))) == NULL) {
+                               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+                       opile->next = tmp->objpiles;
+                       tmp->objpiles = opile;
+
+                       tmp->objpiles->oid = oid[index];
+                       if(index == 0)
+                               k = 0;
+                       else
+                               k = endoffsets[index -1];
+                       //Copy the offset values into objpile
+                       for(i = 0; i < numoffsets[i]; i++) {
+                               ptr->objpile->offsets[i] = arryfields[k];
+                               k++;
+                       }
+                       /* Create a ptr to the offset array for a given prefetch oid tuple */
+                       if((offsetarry = calloc(numoffsets, sizeof(short))) == NULL) {
+                               printf("Calloc error %s %d\n", __FILE__, __LINE__);
+                               return NULL;
+                       }
+
+
+                       found = 1;
+                       break;
+               }
+               tmp = tmp->next;
+       }
+
+       //Add New machine pile to the linked list
+       if(!found) {
+               if((ptr = createPile(noffsets)) == NULL) {
+                       printf("No new pile created %s %d\n", __FILE__, __LINE__);
+                       return;
+               }
+               ptr->mid = mnum;
+               ptr->objpile->oid = oid[index];
+               if(index == 0)
+                       k = 0;
+               else
+                       k = endoffsets[index -1];
+               //Copy the offset values into objpile
+               for(i = 0; i < numoffsets[i]; i++) {
+                       ptr->objpile->offsets[i] = arryfields[k];
+                       k++;
+               }
+               ptr->next = pile;
+               pile = ptr;
+       }
+
+       return pile;
+}
+
+/* Insert into object pile */
+
+
index 298e0d115b250a9320afa7a7b44a9c43bf250147..ce82aa7ec71a578d98d25db3361fce100b9b69fd 100644 (file)
@@ -73,6 +73,11 @@ void queueDisplay() {
        }
 }
 
+void predealloc(prefetchqelem_t *node) {
+       free(node);
+       node->next = NULL;
+}
+
 
 #if 0
 main() {
index ea7dba2d91a3d15a283e3aa47fde3a022bb90c55..ff614903566fb69e5f4d91482de39f0bd86e1f60 100644 (file)
@@ -21,7 +21,8 @@ typedef struct primarypfq {
 void queueInit(void);
 void delqnode(); 
 void queueDelete(void);
-void enqueue(prefetchqelem_t *qnode);
+void enqueue(prefetchqelem_t *);
 prefetchqelem_t *dequeue(void);
 void queueDisplay();
+void predealloc(prefetchqelem_t *);
 #endif
index fdbccf86a6faaa38fa75c5f48ecaee4b03383b27..f2efc67d9f9b1ae1b62f6f542290c6b9bd318f97 100644 (file)
@@ -14,6 +14,7 @@
 #include <netinet/in.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <errno.h>
 #include <time.h>
 #include <string.h>
 #include <pthread.h>
@@ -59,7 +60,7 @@ void prefetch(int ntuples, unsigned int *oids, short *endoffsets, short *arrayfi
        /* Allocate for the queue node*/
        char *node;
        qnodesize = sizeof(prefetchqelem_t) + sizeof(int) + ntuples * (sizeof(short) + sizeof(unsigned int)) + endoffsets[ntuples - 1] * sizeof(short); 
-       if((node = calloc(1,qnodesize)) == NULL) {
+       if((node = calloc(1, qnodesize)) == NULL) {
                printf("Calloc Error %s, %d\n", __FILE__, __LINE__);
                return;
        }
@@ -105,6 +106,7 @@ void transInit() {
                        return;
                }
        }
+       //TODO when to deletethreads
 }
 
 /* This function stops the threads spawned */
@@ -148,21 +150,58 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid)
        unsigned int machinenumber;
        objheader_t *tmp, *objheader;
        void *objcopy;
-       int size;
+       int size, rc, found = 0;
        void *buf;
-       /* Search local cache */
+       struct timespec ts;
+       struct timeval tp;
+        
+       rc = gettimeofday(&tp, NULL);
+
+       /* Convert from timeval to timespec */
+       ts.tv_nsec = tp.tv_usec * 1000;
+
+       /* Search local transaction cache */
        if((objheader = (objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
                return(objheader);
        } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
                /* Look up in machine lookup table  and copy  into cache*/
-               //              tmp = mhashSearch(oid);
+               tmp = mhashSearch(oid);
                size = sizeof(objheader_t)+classsize[tmp->type];
                objcopy = objstrAlloc(record->cache, size);
                memcpy(objcopy, (void *)objheader, size);
                /* Insert into cache's lookup table */
                chashInsert(record->lookupTable, objheader->oid, objcopy); 
                return(objcopy);
-       } else { /* If not found in machine look up */
+       } else if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+               found = 1;
+               size = sizeof(objheader_t)+classsize[tmp->type];
+               objcopy = objstrAlloc(record->cache, size);
+               memcpy(objcopy, (void *)tmp, size);
+               /* Insert into cache's lookup table */
+               chashInsert(record->lookupTable, tmp->oid, objcopy); 
+               return(objcopy);
+       } else { /* If not found anywhere, then block until object appears in prefetch cache */
+               pthread_mutex_lock(&pflookup.lock);
+               while(!found) {
+                       rc = pthread_cond_timedwait(&pflookup.cond, &pflookup.lock, &ts);
+                       if(rc == ETIMEDOUT) {
+                               printf("Wait timed out\n");
+                               /* Check Prefetch cache again */
+                               if((tmp = (objheader_t *) prehashSearch(oid)) != NULL) { /* Look up in prefetch cache */
+                                       found = 1;
+                                       size = sizeof(objheader_t)+classsize[tmp->type];
+                                       objcopy = objstrAlloc(record->cache, size);
+                                       memcpy(objcopy, (void *)tmp, size);
+                                       /* Insert into cache's lookup table */
+                                       chashInsert(record->lookupTable, tmp->oid, objcopy); 
+                                       return(objcopy);
+                               } else {
+                                       pthread_mutex_unlock(&pflookup.lock);
+                                       break;
+                               }
+                               pthread_mutex_unlock(&pflookup.lock);
+                       }
+               }
                /* Get the object from the remote location */
                machinenumber = lhashSearch(oid);
                objcopy = getRemoteObj(record, machinenumber, oid);
@@ -234,6 +273,7 @@ plistnode_t *createPiles(transrecord_t *record) {
 
                        /* Check if local or not */
                        if((localmachinenum = mhashSearch(curr->key)) != NULL) { 
+                               /* Set the pile->local flag*/
                                pile->local = 1; //True i.e. local
                        }
 
@@ -769,6 +809,9 @@ void *handleLocalReq(void *threadarg) {
        if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
                localtdata->tdata->recvmsg[localtdata->tdata->thread_id].rcv_status = TRANS_SOFT_ABORT;
                printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
+               //TODO  currently the only soft abort case that is supported is when object locked by previous
+               //transaction => v_matchlock > 0 
+               //The other case for SOFT ABORT i.e. when object is not found but versions match is not supported 
                /* Send number of oids not found and the missing oids if objects are missing in the machine */
                /* TODO Remember to store the oidnotfound for later use
                   if(objnotfound != 0) {
@@ -860,7 +903,6 @@ int transAbortProcess(void *modptr, unsigned int *objlocked, int numlocked, int
                header = mhashSearch(objlocked[i]);// find the header address
                ((objheader_t *)header)->status &= ~(LOCK);
        }
-       //TODO/* Unset the bit for local objects */
 
        /* Send ack to Coordinator */
        printf("DEBUG-> TRANS_SUCCESSFUL\n");
@@ -903,7 +945,6 @@ int transComProcess(trans_commit_data_t *transinfo) {
        }
 
        //TODO Update location lookup table
-       //TODO/* Unset the bit for local objects */
 
        /* Send ack to Coordinator */
        printf("DEBUG-> TRANS_SUCESSFUL\n");
@@ -1184,8 +1225,11 @@ void *transPrefetch(void *t) {
                mcpileenqueue(pilehead);
                /* Broadcast signal on machine pile queue */
                pthread_cond_broadcast(&mcqueue.qcond);
-               /* Unlock mutex of  mcahine pile queue */
+               /* Unlock mutex of  machine pile queue */
                pthread_mutex_unlock(&mcqueue.qlock);
+               /* Deallocate the prefetch queue pile node */
+               predealloc(qnode);
+
        }
 }
 
@@ -1200,11 +1244,11 @@ void *mcqProcess(void *threadid) {
        while(1) {
                /* Lock mutex of mc pile queue */
                pthread_mutex_lock(&mcqueue.qlock);
-               /* while mc pile queue is empty, then wait */
+               /* When mc pile queue is empty, wait */
                while((mcqueue.front == NULL) && (mcqueue.rear == NULL)) {
                        pthread_cond_wait(&mcqueue.qcond, &mcqueue.qlock);
                }
-               /* dequeue node to send remote machine connections*/
+               /* Dequeue node to send remote machine connections*/
                if((mcpilenode = mcpiledequeue()) == NULL) {
                        printf("Dequeue Error: No node returned %s %d\n", __FILE__, __LINE__);
                        return NULL;
@@ -1217,7 +1261,8 @@ void *mcqProcess(void *threadid) {
                sendPrefetchReq(mcpilenode, tid);
                /* TODO: For each object not found query DHT for new location and retrieve the object */
 
-               /* Deallocate the dequeued node */
+               /* Deallocate the machine queue pile node */
+               mcdealloc(mcpilenode);
        }
 }
 
@@ -1392,7 +1437,7 @@ void getPrefetchResponse(int count, int sd) {
                                        memcpy(modptr, buffer+index, objsize);
                                        index += sizeof(int);
                                        /* Add pointer and oid to hash table */
-                                       //TODO Do we need a version comparison here??
+                                       //TODO Do we need a version comparison here??
                                        prehashInsert(oid, modptr);
                                        /* Broadcast signal on prefetch cache condition variable */ 
                                        pthread_cond_broadcast(&pflookup.cond);