server portions done except a few things
authoradash <adash>
Sat, 31 Mar 2007 12:13:29 +0000 (12:13 +0000)
committeradash <adash>
Sat, 31 Mar 2007 12:13:29 +0000 (12:13 +0000)
added more testcases for debugging
fixed several bugs
TODO : Handle events at server side when it sees TRANS_COMMIT and send ACK to Coordinator
Handle special cases such as leader election

Robust/src/Runtime/DSTM/interface/dstm.h
Robust/src/Runtime/DSTM/interface/dstmserver.c
Robust/src/Runtime/DSTM/interface/mlookup.c
Robust/src/Runtime/DSTM/interface/plookup.c
Robust/src/Runtime/DSTM/interface/testclient.c
Robust/src/Runtime/DSTM/interface/testserver.c
Robust/src/Runtime/DSTM/interface/trans.c

index 047626fa0d312302138f8847d8c502dc316b0745..cb3a9bd003d849fd31c3099e345358ac6da04edd 100644 (file)
 #define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING   9
 
 //Participant Messages
-#define OBJECT_FOUND           10
-#define OBJECT_NOT_FOUND       11
-#define OBJECTS_FOUND          12
-#define OBJECTS_NOT_FOUND      13
-#define TRANS_AGREE            14
-#define TRANS_DISAGREE         15
-#define TRANS_AGREE_BUT_MISSING_OBJECTS        16
-#define TRANS_SOFT_ABORT       17
-#define TRANS_SUCESSFUL                18
+#define OBJECT_FOUND                   10
+#define OBJECT_NOT_FOUND               11
+#define OBJECTS_FOUND                  12
+#define OBJECTS_NOT_FOUND              13
+#define OBJ_LOCKED_BUT_VERSION_MATCH   14
+#define OBJ_UNLOCK_BUT_VERSION_MATCH   15
+#define VERSION_NO_MATCH               16
+#define TRANS_AGREE                    17
+#define TRANS_DISAGREE                 18
+#define TRANS_AGREE_BUT_MISSING_OBJECTS        19
+#define TRANS_SOFT_ABORT               20
+#define TRANS_SUCESSFUL                        21
 
 #include <stdlib.h>
 #include <stdio.h>
@@ -97,6 +100,12 @@ typedef struct thread_data_array {
        transrecord_t *rec;     // To send modified objects
 }thread_data_array_t;
 
+// Structure to save information about an oid necesaary for the decideControl()
+typedef struct objinfo {
+       unsigned int oid;
+       int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc) 
+}objinfo_t;
+
 /* Initialize main object store and lookup tables, start server thread. */
 int dstmInit(void);
 
@@ -114,15 +123,18 @@ void *objstrAlloc(objstr_t *store, unsigned int size); //size in bytes
 /* Prototypes for server portion */
 void *dstmListen();
 void *dstmAccept(void *);
+int readClientReq(int);
+int handleTransReq(int, fixed_data_t *, unsigned int *, char *, void *);
 /* end server portion */
 
 /* Prototypes for transactions */
 transrecord_t *transStart();
 objheader_t *transRead(transrecord_t *record, unsigned int oid);
 objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
+int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides what response to send to the participant
 void *transRequest(void *);    //the C routine that the thread will execute when TRANS_REQUEST begins
 int transCommit(transrecord_t *record); //return 0 if successful
-int decideResponse(thread_data_array_t *tdata, char *buffer, int sd);// Coordinator decides what response to send to the participant
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
 /* end transactions */
 
 void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
index efa0efb79d6b94c499cf7cf27100d3637ccc7886..b1a89d8e7ff98e63d9ba30a97562d4f6bb3fffc4 100644 (file)
@@ -78,254 +78,253 @@ void *dstmListen()
 
 void *dstmAccept(void *acceptfd)
 {
-       int numbytes,i,choice, oid;
+       int numbytes,i, val;
+       unsigned int oid;
        char buffer[RECEIVE_BUFFER_SIZE], control;
+       char *ptr;
        void *srcObj;
        objheader_t *h;
+       
        int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
 
        printf("Recieved connection: fd = %d\n", (int)acceptfd);
-       while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0) 
-       {
-               printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
-               control = buffer[0];
-               switch(control) {
-                       case READ_REQUEST:
-                               printf("DEBUG -> READ_REQUEST\n");
-                               oid = *((int *)(buffer+1));
-#ifdef DEBUG1
-                               printf("DEBUG -> Received oid is %d\n", oid);
-#endif
-                               srcObj = mhashSearch(oid);
-                               h = (objheader_t *) srcObj;
-                               if (h == NULL) {
-                                       buffer[0] = OBJECT_NOT_FOUND;
-                               } else {
-                                       buffer[0] = OBJECT_FOUND;
-                                       size = sizeof(objheader_t) + sizeof(classsize[h->type]);
-                                       memcpy(buffer+1, srcObj, size);
-                               }
-#ifdef DEBUG1
-                               printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
-#endif
+       recv((int)acceptfd, &control, sizeof(char), 0);
+       switch(control) {
+               case READ_REQUEST:
+                       recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+                       srcObj = mhashSearch(oid);
+                       h = (objheader_t *) srcObj;
+                       if (h == NULL) {
+                               buffer[0] = OBJECT_NOT_FOUND;
+                       } else {
+                               buffer[0] = OBJECT_FOUND;
+                               size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+                               memcpy(buffer+1, srcObj, size);
+                       }
+                       if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
+                               perror("");
+                       }
+                       break;
+               
+               case READ_MULT_REQUEST:
+                       printf("DEBUG-> READ_MULT_REQUEST\n");
+                       break;
+       
+               case MOVE_REQUEST:
+                       printf("DEBUG -> MOVE_REQUEST\n");
+                       break;
+
+               case MOVE_MULT_REQUEST:
+                       printf("DEBUG -> MOVE_MULT_REQUEST\n");
+                       break;
+
+               case TRANS_REQUEST:
+                       //printf("DEBUG -> TRANS_REQUEST\n");
+                       if((val = readClientReq((int)acceptfd)) == 1) {
+                               printf("Error in readClientReq\n");
+                       }
+                       break;
+
+               default:
+                       printf("Error receiving\n");
+       }
+       
+       //Read for new control message from Coordiator
+       recv((int)acceptfd, &control, sizeof(char), 0);
+       switch(control) {
+               case TRANS_ABORT:
+                       printf("DEBUG -> TRANS_ABORT\n");
+                       write((int)acceptfd, &control, sizeof(char));
+                       break;
+
+               case TRANS_COMMIT:
+                       printf("DEBUG -> TRANS_COMMIT\n");
+                       write((int)acceptfd, &control, sizeof(char));
+                       //TODO 
+                       //change ptr address in mhash table
+                       //unlock objects
+                       //update object version
+                       //change reference count of older address??
+                       //free space in objstr ??
+                       //Update location lookup table
+                       break;
+       }
 
-                               if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
-                                       perror("");
-                               }
-                               break;
-                       case READ_MULT_REQUEST:
-                               printf("DEBUG-> READ_MULT_REQUEST\n");
-                               break;
-                       case MOVE_REQUEST:
-                               printf("DEBUG -> MOVE_REQUEST\n");
-                               break;
-                       case MOVE_MULT_REQUEST:
-                               printf("DEBUG -> MOVE_MULT_REQUEST\n");
-                               break;
-                       case TRANS_REQUEST:
-                               printf("DEBUG -> TRANS_REQUEST\n");
-                               printf("Client sent %d\n",buffer[0]);
-       //                      handleTransReq(acceptfd, buffer);
-                               break;
-                       case TRANS_ABORT:
-                               printf("DEBUG -> TRANS_ABORT\n");
-                               break;
-                       case TRANS_COMMIT:
-                               printf("DEBUG -> TRANS_COMMIT\n");
-                               printf("Client sent %d\n",buffer[0]);
-                               //TODO copy the objects into the machine 
-                               /*copy the object into the object store from its old 
-                                 location in the objstore(pointer to its header is already stored before)*/
-                               break;
-                       default:
-                               printf("Error receiving");
-               }
-               //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
-               //printf("%s", buffer);
-       } 
        if (close((int)acceptfd) == -1)
        {
                perror("close");
        }
        else
                printf("Closed connection: fd = %d\n", (int)acceptfd);
+       
        pthread_exit(NULL);
 }
 
-//TOOD put __FILE__ __LINE__ for all error conditions
-#if 0
-int handleTransReq(int acceptfd, char *buf) {
-       short numread = 0, nummod = 0;
-       char control;
-       int offset = 0, size,i;
-       int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
-       objheader_t *headptr = NULL;
-       objstr_t *tmpholder;
-       void *top, *mobj;
+int readClientReq(int acceptfd) {
+       char *ptr, control;
+       void *modptr;
+       objheader_t *h, tmp_header;
+       fixed_data_t fixed;
+       int sum = 0, N, n;
+
+       // Read fixed_data
+       N = sizeof(fixed) - 1;
+       ptr = (char *)&fixed;;
+       fixed.control = TRANS_REQUEST;
+       do {
+               n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
+       //      printf("DEBUG -> 1. Reading %d bytes \n", n);
+               sum += n;
+       } while(sum < N && n != 0); 
+
+       //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
+       // Read list of mids
+       int mcount = fixed.mcount;
+       N = mcount * sizeof(unsigned int);
+       unsigned int listmid[mcount];
+       ptr = (char *) listmid;
+       sum = 0;
+       do {
+               n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
+       //      printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
+               sum += n;
+       } while(sum < N && n != 0);
+
+       // Read oid and version tuples
+       int numread = fixed.numread;
+       N = numread * (sizeof(unsigned int) + sizeof(short));
+       char objread[N];
+       sum = 0;
+       do {
+               n = recv((int)acceptfd, (void *) objread, N, 0);
+       //      printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
+               sum += n;
+       } while(sum < N && n != 0);
+       //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
+
+       // Read modified objects
+       if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
+       //      printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+               return 1;
+       }
+       sum = 0;
+       do {
+               n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+               //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
+               sum += n;
+       } while (sum < fixed.sum_bytes && n != 0);
+       //Send control message as per all votes from the particpants
+       handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
+
        
-       char sendbuf[RECEIVE_BUFFER_SIZE];
+       
+       return 0;
+}
 
-       control = buf[0];
-       offset = sizeof(fixed_data_t);
-       list = *((short *)(buf+offset));
-       offset += sizeof(short);
-       nummod = *((short *)(buf+offset));
-       offset += sizeof(short);
-       if (numread) {
-               //Make an array to store the object headers for all objects that are only read
-               if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
-                       perror("handleTransReq: Calloc error");
-                       return 1;
+//This function runs a decision after all objects are weighed under one of the 4 possibilities 
+//and returns the appropriate control message to the Ccordinator 
+int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
+       short version;
+       char control, *ptr;
+       int i;
+       unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
+       int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
+       void *mobj;
+       objheader_t *headptr;
+       objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
+       
+       //Process each object present in the pile 
+       ptr = modptr;
+       //Process each oid in the machine pile/ group
+       for (i = 0; i < fixed->numread + fixed->nummod; i++) {
+               if (i < fixed->numread) {//Object is read
+                       int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
+                       incr *= i;
+                       oid = *((unsigned int *)(objread + incr));
+                       incr += sizeof(unsigned int);
+                       version = *((short *)(objread + incr));
+               } else {//Obj is modified
+                       headptr = (objheader_t *) ptr;
+                       oid = headptr->oid;
+                       version = headptr->version;
+                       ptr += sizeof(objheader_t) + classsize[headptr->type];
                }
-               //Process each object id that is only read
-               for (i = 0; i < numread; i++) {
-                       objheader_t *tmp;
-                       tmp = (objheader_t *) (buf + offset);
-                       //find if object is still present in the same machine since TRANS_REQUEST
-                       if ((mobj = mhashSearch(tmp->oid)) == NULL) {
-                               objnotfound++;
-                               /*
-                               sendbuf[0] = OBJECT_NOT_FOUND;
-                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                       perror("");
+               //Check if object is still present in the machine since the beginning of TRANS_REQUEST
+               if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
+                       objinfo[i].poss_val = OBJECT_NOT_FOUND;
+                       //Save the oids not found for later use
+                       oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+                       objnotfound++;
+               } else { // If obj found in machine (i.e. has not moved)
+                       //Check if obj is locked
+                       if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {           
+                               if (version == ((objheader_t *)mobj)->version) {      // If version match
+                                       objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
+                                       v_matchlock++;
+                               } else {//If versions don't match ..HARD ABORT
+                                       objinfo[i].poss_val = VERSION_NO_MATCH;
+                                       v_nomatch++;
+                                       //send TRANS_DISAGREE to Coordinator
+                                       control = TRANS_DISAGREE;
+                                       write(acceptfd, &control, sizeof(char));
+                                       //TODO when TRANS_DISAGREE is sent
+                                       //Free space allocated in main objstore
+                                       //Unlock objects that was locked in the trans
+                                       return 0;
                                }
-                               */
-                               } else { // If obj found in machine (i.e. has not moved)
-                               //Check if obj is locked
-                               if ((((objheader_t *)mobj)->status >> 3) == 1) {
-                                       //Check version of the object
-                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
-                                               transdis++;
-                                               /*
-                                               sendbuf[0] = TRANS_DISAGREE;
-                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                       perror("");
-                                               }
-                                       */
-                                       } else {//If versions don't match ..HARD ABORT
-                                               transabort++;
-                                               /*
-                                               sendbuf[0] = TRANS_DISAGREE_ABORT;
-                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                       perror("");
-                                               }
-                                               */
-                                       }
-                               } else {// If object not locked then lock it
-                                       ((objheader_t *)mobj)->status |= LOCK;
-                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
-                                               transagree++;
-                                               /*
-                                               sendbuf[0] = TRANS_AGREE;
-                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                       perror("");
-                                               }
-                                               */
-                                       } else {//If versions don't match
-                                               transabort++;
-                                               /*
-                                               sendbuf[0] = TRANS_DISAGREE_ABORT;
-                                               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                       perror("");
-                                               }
-                                               */
-                                       }
+                       } else {//Obj is not locked , so lock object
+                               ((objheader_t *)mobj)->status |= LOCK;
+                               //Save all object oids that are locked on this machine during this transaction request call
+                               oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+                               objlocked++;
+                               if (version == ((objheader_t *)mobj)->version) { //If versions match
+                                       objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
+                                       v_matchnolock++;
+                               } else { //If versions don't match
+                                       objinfo[i].poss_val = VERSION_NO_MATCH;
+                                       v_nomatch++;
+                                       //send TRANS_DISAGREE to Coordinator
+                                       control = TRANS_DISAGREE;
+                                       write(acceptfd, &control, sizeof(char));
+                                       return 0;
                                }
-                       }       
-                       memcpy(headptr, buf+offset, sizeof(objheader_t));
-                       offset += sizeof(objheader_t);
+                       }
                }
        }
-       if (nummod) {
-               if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
-                       perror("handleTransReq: Calloc error");
-                       return 1;
-               }
-               
-               //Process each object id that is only modified
-               for(i = 0; i < nummod; i++) {
-                       objheader_t *tmp;
-                       tmp = (objheader_t *)(buf + offset);
-                       //find if object is still present in the same machine since TRANS_REQUEST
-                       if ((mobj = mhashSearch(tmp->oid)) == NULL) {
-                               objnotfound++;
-                               /*
-                                  sendbuf[0] = OBJECT_NOT_FOUND;
-                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                  perror("");
-                                  }
-                                */
-                       } else { // If obj found in machine (i.e. has not moved)
-                               //Check if obj is locked
-                               if ((((objheader_t *)mobj)->status >> 3) == 1) {
-                                       //Check version of the object
-                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
-                                               transdis++;
-                                               /*
-                                                  sendbuf[0] = TRANS_DISAGREE;
-                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                  perror("");
-                                                  }
-                                                */
-                                       } else {//If versions don't match ..HARD ABORT
-                                               transabort++;
-                                               /*
-                                                  sendbuf[0] = TRANS_DISAGREE_ABORT;
-                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                  perror("");
-                                                  }
-                                                */
-                                       }
-                               } else {// If object not locked then lock it
-                                       ((objheader_t *)mobj)->status |= LOCK;
-                                       if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
-                                               transagree++;
-                                               /*
-                                                  sendbuf[0] = TRANS_AGREE;
-                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                  perror("");
-                                                  }
-                                                */
-                                       } else {//If versions don't match
-                                               transabort++;
-                                               /*
-                                                  sendbuf[0] = TRANS_DISAGREE_ABORT;
-                                                  if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                                                  perror("");
-                                                  }
-                                                */
-                                       }
-                               }
-                       }       
 
-                       size = sizeof(objheader_t) + classsize[tmp->type];
-                       if ((top = objstrAlloc(tmpholder, size)) == NULL) {
-                                       perror("handleTransReq: Calloc error");
-                                       return 1;
-                       }
-                       memcpy(top, buf+offset, size);
-                       offset += size;
-               }
+       //Decide what control message(s) to send
+       if(v_matchnolock == fixed->numread + fixed->nummod) {
+               //send TRANS_AGREE to Coordinator
+               control = TRANS_AGREE;
+               write(acceptfd, &control, sizeof(char));
        }
-       /*
-       if(transabort > 0) {
-               sendbuf[0] = TRANS_DISAGREE_ABORT;
-               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                       perror("");
-               }
        
-       } else if(transagree == numread+nummod) {
-               sendbuf[0] = TRANS_AGREE;
-               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                       perror("");
-               }
-       } else {
-               sendbuf[0] = TRANS_DISAGREE;
-               if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
-                       perror("");
-               }
+       if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
+               //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
+               control = TRANS_AGREE_BUT_MISSING_OBJECTS;
+               write(acceptfd, &control, sizeof(char));
+               //send missing oids  and number of oids not found with it
+               write(acceptfd, &objnotfound, sizeof(int));
+               write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
        }
-       */
+       
+       if(v_matchlock > 0 && v_nomatch == 0) {
+               //send TRANS_SOFT_ABORT to Coordinator
+               control = TRANS_SOFT_ABORT;
+               write(acceptfd, &control, sizeof(char));
+               //send missing oids  and number of oids not found with it
+               write(acceptfd, &objnotfound, sizeof(int));
+               write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+       }
+       
+       //TODO when TRANS_DISAGREE is sent
+       //Free space allocated in main objstore
+       //Unlock objects that was locked in the trans
+       if(control == TRANS_DISAGREE) {
+               for(i = 0; i< objlocked ; i++) {
+                       mobj = mhashSearch(oidlocked[i]);// find the header address
+                       ((objheader_t *)mobj)->status &= ~(LOCK);               
+               }       
+       }       
        return 0;
 }
-#endif
index bcee6650bf4f0eb39ca2d376c1e5a9dc803f4f6e..8bc046dc90428338f64cb35c36de9c7af815a11f 100644 (file)
@@ -187,83 +187,3 @@ unsigned int mhashResize(unsigned int newsize) {
        return 0;
 }
 
-#if 0
-// Hash Resize
-vkey resize(obj_addr_table_t * table){
-       int newCapacity = 2*(table->size) + 1;
-       obj_listnode_t **old;
-       //if ((table->hash = (obj_listnode_t **) malloc(sizeof(obj_listnode_t *)*size)) == NULL) {
-}
-
-// Hashing for the Key
-int hashKey(unsigned int key, obj_addr_table_t *table) {
-       // hash32shiftmult
-       int c2=0x27d4eb2d; // a prime or an odd constant
-       key = (key ^ 61) ^ (key >> 16);
-       key = key + (key << 3);
-       key = key ^ (key >> 4);
-       key = key * c2;
-       key = key ^ (key >> 15);
-       printf("The bucket number is %d\n", key % (table->size));
-       return (key % (table->size));
-}
-
-//Add key and its address to the new ob_listnode_t 
-vkey addKey(unsigned int key, objheader_t *ptr, obj_addr_table_t *table) {
-       int index;
-       obj_listnode_t *node;
-       
-       table->numelements++;
-       if(table->numelements > (table->loadfactor * table->size)){
-       //TODO : check if table is nearly full and then resize
-       }
-
-       index = hashKey(key,table);
-       if ((node = (obj_listnode_t *) malloc(sizeof(obj_listnode_t))) == NULL) {
-               printf("Malloc error %s %d\n", __FILE__, __LINE__);
-               exit(-1);
-       }
-       node->key = key;
-       node->object = ptr; 
-       node->next = table->hash[index];
-       table->hash[index] = node;
-       return;
-}
-// Get the address of the object header for a given key
-objheader_t *findKey(unsigned int key, obj_addr_table_t *table) {
-       int index;
-       obj_listnode_t *ptr;
-
-       index = hashKey(key,table);
-       ptr = table->hash[index];
-       while(ptr != NULL) {
-               if (ptr->key == key) {
-                       return ptr->object;
-               }
-               ptr = ptr->next;
-       }
-       return NULL;
-}
-// Remove the pointer to the object header from a linked list of obj_listnode_t given an key
-int removeKey(unsigned int key, obj_addr_table_t *table) {
-       int index;
-       obj_listnode_t *curr, *prev;            // prev points to previous node and curr points to the node to be deleted
-
-       index = hashKey(key,table);
-       prev = curr = table->hash[index];
-       for (; curr != NULL; curr = curr->next) {
-               if (curr->key == key) {         // Find a match in the hash table
-                       table->numelements--;
-                       prev->next = curr->next;
-                       if (table->hash[index] == curr) { // Special case when there is one element pointed by  the hash table
-                               table->hash[index] = NULL;
-                       }
-                       free(curr);
-                       return 0;
-               } 
-               prev = curr;
-       }
-       return -1;
-} 
-
-#endif
index 47c3e01293664061be05b6432981932cfb9de5d7..971905425ea59e38e908b9bb1404b6c48c231788 100644 (file)
@@ -37,14 +37,14 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
        //Add oid into a machine that is a part of the pile linked list structure
        while(tmp != NULL) {
                if (tmp->mid == mid) {
-                       if ((headeraddr->status >> 1) == 1) {
+                       if ((headeraddr->status & DIRTY) == 1) {
                                tmp->oidmod[tmp->nummod] = headeraddr->oid;
                                tmp->nummod = tmp->nummod + 1;
                                tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
                        } else {
                                tmp->oidread[tmp->numread] = headeraddr->oid;
                                offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
-                               memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int));
+                               memcpy(tmp->objread + offset, &headeraddr->oid, sizeof(unsigned int));
                                offset += sizeof(unsigned int);
                                memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
                                tmp->numread = tmp->numread + 1;
@@ -60,13 +60,14 @@ plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mi
                        return NULL;
                }
                ptr->mid = mid;
-               if ((headeraddr->status >> 1) == 1) {
+               if ((headeraddr->status & DIRTY) == 1) {
                        ptr->oidmod[ptr->nummod] = headeraddr->oid;
                        ptr->nummod = ptr->nummod + 1;
                        ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
                } else {
                        ptr->oidread[ptr->numread] = headeraddr->oid;
                        memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
+                       //printf("DEBUG -> objread oid is %d\n", *(ptr->objread));
                        memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
                        ptr->numread = ptr->numread + 1;
                }
index 6d1c34e1c3e1f9a7c160e388223718180e943baf..0df85e255a575bca516f07077d63bb3baabe2c69 100644 (file)
@@ -1,5 +1,6 @@
 #include<stdio.h>
 #include "dstm.h"
+#include "llookup.h"
 
 int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};      
 
@@ -45,19 +46,22 @@ int test2(void) {
        record = transStart();
        printf("DEBUG -> Init done\n");
        h1 = transRead(record, 1);
-//     printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+       lhashInsert(h1->oid, 1);
        h2 = transRead(record, 2);
-//     printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+       lhashInsert(h2->oid, 1);
        h3 = transRead(record, 3);
-//     printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+       lhashInsert(h3->oid, 1);
        h4 = transRead(record, 4);
-//     printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+       lhashInsert(h4->oid, 1);
        h4->status |= DIRTY;
        h5 = transRead(record, 5);
-//     printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
+       lhashInsert(h5->oid, 1);
        h6 = transRead(record, 6);
-//     printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]);
+       lhashInsert(h6->oid, 1);
        h6->status |= DIRTY;
+       
+
+       
        transCommit(record);
 }
 
index 22672f46705ddbfe0bc2ee0b767deb5b850358f8..6e378e976e2468c06a72fae7c686e5f22f7434f1 100644 (file)
@@ -16,6 +16,9 @@ unsigned int createObjects(transrecord_t *record, unsigned short type) {
        memcpy(tmp, header, size);
        mhashInsert(tmp->oid, tmp);
        lhashInsert(tmp->oid, 1);
+       //Lock oid 3 object
+//     if(tmp->oid == 3)
+//             tmp->status |= LOCK;
        return 0;
 }
 
index 1933382994bc8c638c541aa2e6724c5373f72bf8..96bbdf9f5728190f947cbb6dd0497d4080741f4c 100644 (file)
@@ -64,50 +64,122 @@ objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
        tmp->type = type;
        tmp->version = 1;
        tmp->rcount = 0; //? not sure how to handle this yet
+       tmp->status = 0;
        tmp->status |= NEW;
        chashInsert(record->lookupTable, tmp->oid, tmp);
        return tmp;
 }
-
-int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) {
-       int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+//TODO Change the values inside write() to exact size of the message being sent 
+//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
+int decideResponse(thread_data_array_t *tdata, int sd) {
+       int i, j, n, N, sum, oidcount = 0, transagree = 0, transdisagree = 0, transsoftabort = 0, transmiss = 0;
+       char ctrl, control, *ptr;
+       unsigned int *oidnotfound[tdata->pilecount];
+       objheader_t *header;
 
        //Check common data structure 
        for (i = 0 ; i < tdata->pilecount ; i++) {
-               //Check in any DISAGREE has come
-               if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) {
-                       //Send abort
-                       transabort++;
-                       buffer[0] = TRANS_ABORT;
-                       if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
-                               perror("Error sending message for thread");
-                               return 1;
-                       }
-               } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) {
-                       transagree++;
-               } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) {
-                       transmiss++;
-               } else
-                       transsoftabort++;
+               //Switch case
+               control = tdata->recvmsg[i].rcv_status;
+               switch(control) {
+                       case TRANS_DISAGREE:
+                               printf("DEBUG-> Inside TRANS_DISAGREE\n");
+                               transdisagree++;
+                               //Free transaction records
+                               objstrDelete(tdata->rec->cache);
+                               chashDelete(tdata->rec->lookupTable);
+                               //send Abort
+                               ctrl = TRANS_ABORT;
+                               if (write(sd, &ctrl, sizeof(char)) < 0) {
+                                       perror("Error sending ctrl message for participant\n");
+                                       return 1;
+                               }
+                               break;
+                               
+                       case TRANS_AGREE:
+                               printf("DEBUG-> Inside TRANS_AGREE\n");
+                               transagree++;
+                               break;
+                               
+                       case TRANS_SOFT_ABORT:
+                               printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+                               transsoftabort++;
+                               //Read list of objects missing
+                               read(sd, &oidcount, sizeof(int));
+                               N = oidcount * sizeof(unsigned int);
+                               if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               }
+                               ptr = (char *) oidnotfound[i];
+                               do {
+                                       n = read(sd, ptr+sum, N-sum);
+                                       sum += n;
+                               } while(sum < N && n !=0);
+
+                               break;
+                               
+                       case TRANS_AGREE_BUT_MISSING_OBJECTS:
+                               printf("DEBUG-> Inside TRANS_AGREE_BUT_MISSING_OBJECTS\n");
+                               transmiss++;
+                               //Read list of objects missing
+                               read(sd, &oidcount, sizeof(int));
+                               N = oidcount * sizeof(unsigned int);
+                               if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+                                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+                               }
+                               ptr = (char *) oidnotfound[i];
+                               do {
+                                       n = read(sd, ptr+sum, N-sum);
+                                       sum += n;
+                               } while(sum < N && n !=0);
+
+
+                               break;
+                       default:
+                               printf("Participant sent unknown message\n");
+               }
+       }
+       
+       //For Debug purposes 
+       for(i=0 ; i< tdata->pilecount; i++) {
+               for(j=0 ; j < oidcount; j++) {
+                       printf("DEBUG-> Oid %d missing for pilecount: %d\n", oidnotfound[j], i+1);
+               }
        }
+
+       //Decide what control message to send to Participant    
        if(transagree == tdata->pilecount){
                //Send Commit
-               buffer[0] = TRANS_COMMIT;
-               if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
-                       perror("Error sending message for thread");
+               ctrl = TRANS_COMMIT;
+               if (write(sd, &ctrl, sizeof(char)) < 0) {
+                       perror("Error sending ctrl message for participant\n");
                        return 1;
                }
        }
-       if(transsoftabort > 0 && transabort == 0) {
+
+       if(transsoftabort > 0 && transdisagree == 0) {
                //Send abort but retry commit
+               ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
+               if (write(sd, &ctrl, sizeof(char)) < 0) {
+                       perror("Error sending ctrl message for participant\n");
+                       return 1;
+               }
+               //lookup objects and then retry commit 
+               //set up a new connection readClientReq()
+               //rebuilt the pile and llookup table
                //i.e. wait at the participant end and then resend either agree or disagree
-               //
-
        }
-       if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+       if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) {
                //Relookup all missing objects
                //send missing mising object/ objects
        }
+       
+       //Free pointers
+       for(i=0 ; i< tdata->pilecount; i++) {
+               free(oidnotfound[i]);
+       }
+
+       return 0;
 }
 
 void *transRequest(void *threadarg) {
@@ -138,40 +210,44 @@ void *transRequest(void *threadarg) {
 
        //Multiple writes for sending packets of data 
        //Send first few fixed bytes of the TRANS_REQUEST protocol
-       printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control);
-       printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
-       if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
+       printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
+//     printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
+//     printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+       if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
                perror("Error sending fixed bytes for thread");
                return NULL;
        }
        //Send list of machines involved in the transaction
-       printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
+//     printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
        if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
                perror("Error sending list of machines for thread");
                return NULL;
        }
        //Send oids and version number tuples for objects that are read
-       printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount);
-       if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
+//     printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
+//     printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18)); 
+       if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
                perror("Error sending tuples for thread");
                return NULL;
        }
        //Send objects that are modified
-       for( i = 0; i < tdata->buffer->f.nummod ; i++) {
+       for(i = 0; i < tdata->buffer->f.nummod ; i++) {
                headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-               printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]);
-               if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
+//             printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
+               if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type])  < 0) {
                        perror("Error sending obj modified for thread");
                        return NULL;
                }
        }
        
-       //Read message from participant side
-       while(n != 0) {
-               n = read(sd, buffer, sizeof(buffer));
-       }
-       //process the participant's request
-       recvcontrol = buffer[0];
+       //Read message  control message from participant side
+       n = read(sd, &control, sizeof(char));
+       recvcontrol = control;
+       printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
+//     while(n != 0) {
+//             n = read(sd, buffer, sizeof(buffer));
+//     }
+       
        //Update common data structure and increment count
        tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
        //Lock and update count
@@ -181,8 +257,9 @@ void *transRequest(void *threadarg) {
        
        if(*(tdata->count) == tdata->pilecount) {
                pthread_cond_broadcast(tdata->threshold);
-               if (decideResponse(tdata, buffer, sd) == 1) {
-                       printf("decideResponse returned error\n");
+               //process the participant's request
+               if (decideResponse(tdata, sd) == 1) {
+                       printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
                        return NULL;
                }
        } else {
@@ -196,7 +273,7 @@ void *transRequest(void *threadarg) {
 int transCommit(transrecord_t *record) {       
        chashlistnode_t *curr, *ptr, *next;
        unsigned int size;//Represents number of bins in the chash table
-       unsigned int machinenum, tot_bytes_mod;
+       unsigned int machinenum, tot_bytes_mod, *listmid;
        objheader_t *headeraddr;
        plistnode_t *tmp, *pile = NULL;
        int i, rc;
@@ -232,7 +309,7 @@ int transCommit(transrecord_t *record) {
                        }
                        //Make machine groups
                        if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
-                               perror("pInsert calloc error");
+                               printf("pInsert error %s, %d\n", __FILE__, __LINE__);
                                return 1;
                        }
                        curr = next;
@@ -259,7 +336,11 @@ int transCommit(transrecord_t *record) {
        pthread_cond_init(&tcond, NULL);
        
        //Keep track of list of machine ids per transaction     
-       unsigned int *listmid = calloc(pilecount, sizeof(unsigned int));
+       if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+               printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+               return 1;
+       }
+                               
        pListMid(pile, listmid);
        //Process each machine group
        while(tmp != NULL) {
@@ -268,7 +349,7 @@ int transCommit(transrecord_t *record) {
                newtid++;
                trans_req_data_t *tosend;
                if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
-                       perror("");
+                       printf("Calloc error %s, %d\n", __FILE__, __LINE__);
                        return 1;
                }
                tosend->f.control = TRANS_REQUEST;
@@ -296,7 +377,8 @@ int transCommit(transrecord_t *record) {
                        return 1;
                }               
                numthreads++;           
-               //TODO frees ?
+               //TODO frees 
+               free(tosend);
                tmp = tmp->next;
        }
 
@@ -353,7 +435,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
        control = READ_REQUEST;
        buffer[0] = control;
        memcpy(buffer+1, &oid, sizeof(int));
-       if (write(sd, buffer, sizeof(buffer)) < 0) {
+       if (write(sd, buffer, sizeof(int) + 1) < 0) {
                perror("Error sending message");
                return NULL;
        }