#define TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING 9
//Participant Messages
-#define OBJECT_FOUND 10
-#define OBJECT_NOT_FOUND 11
-#define OBJECTS_FOUND 12
-#define OBJECTS_NOT_FOUND 13
-#define TRANS_AGREE 14
-#define TRANS_DISAGREE 15
-#define TRANS_AGREE_BUT_MISSING_OBJECTS 16
-#define TRANS_SOFT_ABORT 17
-#define TRANS_SUCESSFUL 18
+#define OBJECT_FOUND 10
+#define OBJECT_NOT_FOUND 11
+#define OBJECTS_FOUND 12
+#define OBJECTS_NOT_FOUND 13
+#define OBJ_LOCKED_BUT_VERSION_MATCH 14
+#define OBJ_UNLOCK_BUT_VERSION_MATCH 15
+#define VERSION_NO_MATCH 16
+#define TRANS_AGREE 17
+#define TRANS_DISAGREE 18
+#define TRANS_AGREE_BUT_MISSING_OBJECTS 19
+#define TRANS_SOFT_ABORT 20
+#define TRANS_SUCESSFUL 21
#include <stdlib.h>
#include <stdio.h>
transrecord_t *rec; // To send modified objects
}thread_data_array_t;
+// Structure to save information about an oid necesaary for the decideControl()
+typedef struct objinfo {
+ unsigned int oid;
+ int poss_val; //Status of object(locked but version matches, version mismatch, oid not present in machine etc)
+}objinfo_t;
+
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
/* Prototypes for server portion */
void *dstmListen();
void *dstmAccept(void *);
+int readClientReq(int);
+int handleTransReq(int, fixed_data_t *, unsigned int *, char *, void *);
/* end server portion */
/* Prototypes for transactions */
transrecord_t *transStart();
objheader_t *transRead(transrecord_t *record, unsigned int oid);
objheader_t *transCreateObj(transrecord_t *record, unsigned short type); //returns oid
+int decideResponse(thread_data_array_t *tdata, int sd);// Coordinator decides what response to send to the participant
void *transRequest(void *); //the C routine that the thread will execute when TRANS_REQUEST begins
int transCommit(transrecord_t *record); //return 0 if successful
-int decideResponse(thread_data_array_t *tdata, char *buffer, int sd);// Coordinator decides what response to send to the participant
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
/* end transactions */
void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
void *dstmAccept(void *acceptfd)
{
- int numbytes,i,choice, oid;
+ int numbytes,i, val;
+ unsigned int oid;
char buffer[RECEIVE_BUFFER_SIZE], control;
+ char *ptr;
void *srcObj;
objheader_t *h;
+
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
printf("Recieved connection: fd = %d\n", (int)acceptfd);
- while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0)
- {
- printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
- control = buffer[0];
- switch(control) {
- case READ_REQUEST:
- printf("DEBUG -> READ_REQUEST\n");
- oid = *((int *)(buffer+1));
-#ifdef DEBUG1
- printf("DEBUG -> Received oid is %d\n", oid);
-#endif
- srcObj = mhashSearch(oid);
- h = (objheader_t *) srcObj;
- if (h == NULL) {
- buffer[0] = OBJECT_NOT_FOUND;
- } else {
- buffer[0] = OBJECT_FOUND;
- size = sizeof(objheader_t) + sizeof(classsize[h->type]);
- memcpy(buffer+1, srcObj, size);
- }
-#ifdef DEBUG1
- printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
-#endif
+ recv((int)acceptfd, &control, sizeof(char), 0);
+ switch(control) {
+ case READ_REQUEST:
+ recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+ srcObj = mhashSearch(oid);
+ h = (objheader_t *) srcObj;
+ if (h == NULL) {
+ buffer[0] = OBJECT_NOT_FOUND;
+ } else {
+ buffer[0] = OBJECT_FOUND;
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+ memcpy(buffer+1, srcObj, size);
+ }
+ if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
+ perror("");
+ }
+ break;
+
+ case READ_MULT_REQUEST:
+ printf("DEBUG-> READ_MULT_REQUEST\n");
+ break;
+
+ case MOVE_REQUEST:
+ printf("DEBUG -> MOVE_REQUEST\n");
+ break;
+
+ case MOVE_MULT_REQUEST:
+ printf("DEBUG -> MOVE_MULT_REQUEST\n");
+ break;
+
+ case TRANS_REQUEST:
+ //printf("DEBUG -> TRANS_REQUEST\n");
+ if((val = readClientReq((int)acceptfd)) == 1) {
+ printf("Error in readClientReq\n");
+ }
+ break;
+
+ default:
+ printf("Error receiving\n");
+ }
+
+ //Read for new control message from Coordiator
+ recv((int)acceptfd, &control, sizeof(char), 0);
+ switch(control) {
+ case TRANS_ABORT:
+ printf("DEBUG -> TRANS_ABORT\n");
+ write((int)acceptfd, &control, sizeof(char));
+ break;
+
+ case TRANS_COMMIT:
+ printf("DEBUG -> TRANS_COMMIT\n");
+ write((int)acceptfd, &control, sizeof(char));
+ //TODO
+ //change ptr address in mhash table
+ //unlock objects
+ //update object version
+ //change reference count of older address??
+ //free space in objstr ??
+ //Update location lookup table
+ break;
+ }
- if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
- perror("");
- }
- break;
- case READ_MULT_REQUEST:
- printf("DEBUG-> READ_MULT_REQUEST\n");
- break;
- case MOVE_REQUEST:
- printf("DEBUG -> MOVE_REQUEST\n");
- break;
- case MOVE_MULT_REQUEST:
- printf("DEBUG -> MOVE_MULT_REQUEST\n");
- break;
- case TRANS_REQUEST:
- printf("DEBUG -> TRANS_REQUEST\n");
- printf("Client sent %d\n",buffer[0]);
- // handleTransReq(acceptfd, buffer);
- break;
- case TRANS_ABORT:
- printf("DEBUG -> TRANS_ABORT\n");
- break;
- case TRANS_COMMIT:
- printf("DEBUG -> TRANS_COMMIT\n");
- printf("Client sent %d\n",buffer[0]);
- //TODO copy the objects into the machine
- /*copy the object into the object store from its old
- location in the objstore(pointer to its header is already stored before)*/
- break;
- default:
- printf("Error receiving");
- }
- //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
- //printf("%s", buffer);
- }
if (close((int)acceptfd) == -1)
{
perror("close");
}
else
printf("Closed connection: fd = %d\n", (int)acceptfd);
+
pthread_exit(NULL);
}
-//TOOD put __FILE__ __LINE__ for all error conditions
-#if 0
-int handleTransReq(int acceptfd, char *buf) {
- short numread = 0, nummod = 0;
- char control;
- int offset = 0, size,i;
- int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
- objheader_t *headptr = NULL;
- objstr_t *tmpholder;
- void *top, *mobj;
+int readClientReq(int acceptfd) {
+ char *ptr, control;
+ void *modptr;
+ objheader_t *h, tmp_header;
+ fixed_data_t fixed;
+ int sum = 0, N, n;
+
+ // Read fixed_data
+ N = sizeof(fixed) - 1;
+ ptr = (char *)&fixed;;
+ fixed.control = TRANS_REQUEST;
+ do {
+ n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
+ // printf("DEBUG -> 1. Reading %d bytes \n", n);
+ sum += n;
+ } while(sum < N && n != 0);
+
+ //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
+ // Read list of mids
+ int mcount = fixed.mcount;
+ N = mcount * sizeof(unsigned int);
+ unsigned int listmid[mcount];
+ ptr = (char *) listmid;
+ sum = 0;
+ do {
+ n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
+ // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
+ sum += n;
+ } while(sum < N && n != 0);
+
+ // Read oid and version tuples
+ int numread = fixed.numread;
+ N = numread * (sizeof(unsigned int) + sizeof(short));
+ char objread[N];
+ sum = 0;
+ do {
+ n = recv((int)acceptfd, (void *) objread, N, 0);
+ // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
+ sum += n;
+ } while(sum < N && n != 0);
+ //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
+
+ // Read modified objects
+ if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
+ // printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+ return 1;
+ }
+ sum = 0;
+ do {
+ n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+ //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
+ sum += n;
+ } while (sum < fixed.sum_bytes && n != 0);
+ //Send control message as per all votes from the particpants
+ handleTransReq(acceptfd, &fixed, listmid, objread, modptr);
+
- char sendbuf[RECEIVE_BUFFER_SIZE];
+
+ return 0;
+}
- control = buf[0];
- offset = sizeof(fixed_data_t);
- list = *((short *)(buf+offset));
- offset += sizeof(short);
- nummod = *((short *)(buf+offset));
- offset += sizeof(short);
- if (numread) {
- //Make an array to store the object headers for all objects that are only read
- if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
- perror("handleTransReq: Calloc error");
- return 1;
+//This function runs a decision after all objects are weighed under one of the 4 possibilities
+//and returns the appropriate control message to the Ccordinator
+int handleTransReq(int acceptfd, fixed_data_t *fixed, unsigned int *listmid, char *objread, void *modptr) {
+ short version;
+ char control, *ptr;
+ int i;
+ unsigned int oid, oidnotfound[fixed->numread + fixed->nummod], oidlocked[fixed->nummod + fixed->numread];
+ int objnotfound = 0, objlocked = 0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;// Counters to formulate decision on control message to be sent
+ void *mobj;
+ objheader_t *headptr;
+ objinfo_t objinfo[fixed->nummod + fixed->numread];// Structure that saves the possibility per object(if version match, object not found on machine etc)
+
+ //Process each object present in the pile
+ ptr = modptr;
+ //Process each oid in the machine pile/ group
+ for (i = 0; i < fixed->numread + fixed->nummod; i++) {
+ if (i < fixed->numread) {//Object is read
+ int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
+ incr *= i;
+ oid = *((unsigned int *)(objread + incr));
+ incr += sizeof(unsigned int);
+ version = *((short *)(objread + incr));
+ } else {//Obj is modified
+ headptr = (objheader_t *) ptr;
+ oid = headptr->oid;
+ version = headptr->version;
+ ptr += sizeof(objheader_t) + classsize[headptr->type];
}
- //Process each object id that is only read
- for (i = 0; i < numread; i++) {
- objheader_t *tmp;
- tmp = (objheader_t *) (buf + offset);
- //find if object is still present in the same machine since TRANS_REQUEST
- if ((mobj = mhashSearch(tmp->oid)) == NULL) {
- objnotfound++;
- /*
- sendbuf[0] = OBJECT_NOT_FOUND;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
+ //Check if object is still present in the machine since the beginning of TRANS_REQUEST
+ if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
+ objinfo[i].poss_val = OBJECT_NOT_FOUND;
+ //Save the oids not found for later use
+ oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
+ objnotfound++;
+ } else { // If obj found in machine (i.e. has not moved)
+ //Check if obj is locked
+ if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
+ if (version == ((objheader_t *)mobj)->version) { // If version match
+ objinfo[i].poss_val = OBJ_LOCKED_BUT_VERSION_MATCH;
+ v_matchlock++;
+ } else {//If versions don't match ..HARD ABORT
+ objinfo[i].poss_val = VERSION_NO_MATCH;
+ v_nomatch++;
+ //send TRANS_DISAGREE to Coordinator
+ control = TRANS_DISAGREE;
+ write(acceptfd, &control, sizeof(char));
+ //TODO when TRANS_DISAGREE is sent
+ //Free space allocated in main objstore
+ //Unlock objects that was locked in the trans
+ return 0;
}
- */
- } else { // If obj found in machine (i.e. has not moved)
- //Check if obj is locked
- if ((((objheader_t *)mobj)->status >> 3) == 1) {
- //Check version of the object
- if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
- transdis++;
- /*
- sendbuf[0] = TRANS_DISAGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- } else {//If versions don't match ..HARD ABORT
- transabort++;
- /*
- sendbuf[0] = TRANS_DISAGREE_ABORT;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- }
- } else {// If object not locked then lock it
- ((objheader_t *)mobj)->status |= LOCK;
- if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
- transagree++;
- /*
- sendbuf[0] = TRANS_AGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- } else {//If versions don't match
- transabort++;
- /*
- sendbuf[0] = TRANS_DISAGREE_ABORT;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- }
+ } else {//Obj is not locked , so lock object
+ ((objheader_t *)mobj)->status |= LOCK;
+ //Save all object oids that are locked on this machine during this transaction request call
+ oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+ objlocked++;
+ if (version == ((objheader_t *)mobj)->version) { //If versions match
+ objinfo[i].poss_val = OBJ_UNLOCK_BUT_VERSION_MATCH;
+ v_matchnolock++;
+ } else { //If versions don't match
+ objinfo[i].poss_val = VERSION_NO_MATCH;
+ v_nomatch++;
+ //send TRANS_DISAGREE to Coordinator
+ control = TRANS_DISAGREE;
+ write(acceptfd, &control, sizeof(char));
+ return 0;
}
- }
- memcpy(headptr, buf+offset, sizeof(objheader_t));
- offset += sizeof(objheader_t);
+ }
}
}
- if (nummod) {
- if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
- perror("handleTransReq: Calloc error");
- return 1;
- }
-
- //Process each object id that is only modified
- for(i = 0; i < nummod; i++) {
- objheader_t *tmp;
- tmp = (objheader_t *)(buf + offset);
- //find if object is still present in the same machine since TRANS_REQUEST
- if ((mobj = mhashSearch(tmp->oid)) == NULL) {
- objnotfound++;
- /*
- sendbuf[0] = OBJECT_NOT_FOUND;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- } else { // If obj found in machine (i.e. has not moved)
- //Check if obj is locked
- if ((((objheader_t *)mobj)->status >> 3) == 1) {
- //Check version of the object
- if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
- transdis++;
- /*
- sendbuf[0] = TRANS_DISAGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- } else {//If versions don't match ..HARD ABORT
- transabort++;
- /*
- sendbuf[0] = TRANS_DISAGREE_ABORT;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- }
- } else {// If object not locked then lock it
- ((objheader_t *)mobj)->status |= LOCK;
- if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
- transagree++;
- /*
- sendbuf[0] = TRANS_AGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- } else {//If versions don't match
- transabort++;
- /*
- sendbuf[0] = TRANS_DISAGREE_ABORT;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- */
- }
- }
- }
- size = sizeof(objheader_t) + classsize[tmp->type];
- if ((top = objstrAlloc(tmpholder, size)) == NULL) {
- perror("handleTransReq: Calloc error");
- return 1;
- }
- memcpy(top, buf+offset, size);
- offset += size;
- }
+ //Decide what control message(s) to send
+ if(v_matchnolock == fixed->numread + fixed->nummod) {
+ //send TRANS_AGREE to Coordinator
+ control = TRANS_AGREE;
+ write(acceptfd, &control, sizeof(char));
}
- /*
- if(transabort > 0) {
- sendbuf[0] = TRANS_DISAGREE_ABORT;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- } else if(transagree == numread+nummod) {
- sendbuf[0] = TRANS_AGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
- } else {
- sendbuf[0] = TRANS_DISAGREE;
- if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
- perror("");
- }
+ if(objnotfound > 0 && v_matchlock == 0 && v_nomatch == 0) {
+ //send TRANS_AGREE_BUT_MISSING_OBJECTS to Coordinator
+ control = TRANS_AGREE_BUT_MISSING_OBJECTS;
+ write(acceptfd, &control, sizeof(char));
+ //send missing oids and number of oids not found with it
+ write(acceptfd, &objnotfound, sizeof(int));
+ write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
}
- */
+
+ if(v_matchlock > 0 && v_nomatch == 0) {
+ //send TRANS_SOFT_ABORT to Coordinator
+ control = TRANS_SOFT_ABORT;
+ write(acceptfd, &control, sizeof(char));
+ //send missing oids and number of oids not found with it
+ write(acceptfd, &objnotfound, sizeof(int));
+ write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+ }
+
+ //TODO when TRANS_DISAGREE is sent
+ //Free space allocated in main objstore
+ //Unlock objects that was locked in the trans
+ if(control == TRANS_DISAGREE) {
+ for(i = 0; i< objlocked ; i++) {
+ mobj = mhashSearch(oidlocked[i]);// find the header address
+ ((objheader_t *)mobj)->status &= ~(LOCK);
+ }
+ }
return 0;
}
-#endif
return 0;
}
-#if 0
-// Hash Resize
-vkey resize(obj_addr_table_t * table){
- int newCapacity = 2*(table->size) + 1;
- obj_listnode_t **old;
- //if ((table->hash = (obj_listnode_t **) malloc(sizeof(obj_listnode_t *)*size)) == NULL) {
-}
-
-// Hashing for the Key
-int hashKey(unsigned int key, obj_addr_table_t *table) {
- // hash32shiftmult
- int c2=0x27d4eb2d; // a prime or an odd constant
- key = (key ^ 61) ^ (key >> 16);
- key = key + (key << 3);
- key = key ^ (key >> 4);
- key = key * c2;
- key = key ^ (key >> 15);
- printf("The bucket number is %d\n", key % (table->size));
- return (key % (table->size));
-}
-
-//Add key and its address to the new ob_listnode_t
-vkey addKey(unsigned int key, objheader_t *ptr, obj_addr_table_t *table) {
- int index;
- obj_listnode_t *node;
-
- table->numelements++;
- if(table->numelements > (table->loadfactor * table->size)){
- //TODO : check if table is nearly full and then resize
- }
-
- index = hashKey(key,table);
- if ((node = (obj_listnode_t *) malloc(sizeof(obj_listnode_t))) == NULL) {
- printf("Malloc error %s %d\n", __FILE__, __LINE__);
- exit(-1);
- }
- node->key = key;
- node->object = ptr;
- node->next = table->hash[index];
- table->hash[index] = node;
- return;
-}
-// Get the address of the object header for a given key
-objheader_t *findKey(unsigned int key, obj_addr_table_t *table) {
- int index;
- obj_listnode_t *ptr;
-
- index = hashKey(key,table);
- ptr = table->hash[index];
- while(ptr != NULL) {
- if (ptr->key == key) {
- return ptr->object;
- }
- ptr = ptr->next;
- }
- return NULL;
-}
-// Remove the pointer to the object header from a linked list of obj_listnode_t given an key
-int removeKey(unsigned int key, obj_addr_table_t *table) {
- int index;
- obj_listnode_t *curr, *prev; // prev points to previous node and curr points to the node to be deleted
-
- index = hashKey(key,table);
- prev = curr = table->hash[index];
- for (; curr != NULL; curr = curr->next) {
- if (curr->key == key) { // Find a match in the hash table
- table->numelements--;
- prev->next = curr->next;
- if (table->hash[index] == curr) { // Special case when there is one element pointed by the hash table
- table->hash[index] = NULL;
- }
- free(curr);
- return 0;
- }
- prev = curr;
- }
- return -1;
-}
-
-#endif
//Add oid into a machine that is a part of the pile linked list structure
while(tmp != NULL) {
if (tmp->mid == mid) {
- if ((headeraddr->status >> 1) == 1) {
+ if ((headeraddr->status & DIRTY) == 1) {
tmp->oidmod[tmp->nummod] = headeraddr->oid;
tmp->nummod = tmp->nummod + 1;
tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
} else {
tmp->oidread[tmp->numread] = headeraddr->oid;
offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
- memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int));
+ memcpy(tmp->objread + offset, &headeraddr->oid, sizeof(unsigned int));
offset += sizeof(unsigned int);
memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
tmp->numread = tmp->numread + 1;
return NULL;
}
ptr->mid = mid;
- if ((headeraddr->status >> 1) == 1) {
+ if ((headeraddr->status & DIRTY) == 1) {
ptr->oidmod[ptr->nummod] = headeraddr->oid;
ptr->nummod = ptr->nummod + 1;
ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
} else {
ptr->oidread[ptr->numread] = headeraddr->oid;
memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
+ //printf("DEBUG -> objread oid is %d\n", *(ptr->objread));
memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
ptr->numread = ptr->numread + 1;
}
#include<stdio.h>
#include "dstm.h"
+#include "llookup.h"
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
record = transStart();
printf("DEBUG -> Init done\n");
h1 = transRead(record, 1);
-// printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+ lhashInsert(h1->oid, 1);
h2 = transRead(record, 2);
-// printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+ lhashInsert(h2->oid, 1);
h3 = transRead(record, 3);
-// printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+ lhashInsert(h3->oid, 1);
h4 = transRead(record, 4);
-// printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+ lhashInsert(h4->oid, 1);
h4->status |= DIRTY;
h5 = transRead(record, 5);
-// printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
+ lhashInsert(h5->oid, 1);
h6 = transRead(record, 6);
-// printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]);
+ lhashInsert(h6->oid, 1);
h6->status |= DIRTY;
+
+
+
transCommit(record);
}
memcpy(tmp, header, size);
mhashInsert(tmp->oid, tmp);
lhashInsert(tmp->oid, 1);
+ //Lock oid 3 object
+// if(tmp->oid == 3)
+// tmp->status |= LOCK;
return 0;
}
tmp->type = type;
tmp->version = 1;
tmp->rcount = 0; //? not sure how to handle this yet
+ tmp->status = 0;
tmp->status |= NEW;
chashInsert(record->lookupTable, tmp->oid, tmp);
return tmp;
}
-
-int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) {
- int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+//TODO Change the values inside write() to exact size of the message being sent
+//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
+int decideResponse(thread_data_array_t *tdata, int sd) {
+ int i, j, n, N, sum, oidcount = 0, transagree = 0, transdisagree = 0, transsoftabort = 0, transmiss = 0;
+ char ctrl, control, *ptr;
+ unsigned int *oidnotfound[tdata->pilecount];
+ objheader_t *header;
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
- //Check in any DISAGREE has come
- if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) {
- //Send abort
- transabort++;
- buffer[0] = TRANS_ABORT;
- if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
- perror("Error sending message for thread");
- return 1;
- }
- } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) {
- transagree++;
- } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) {
- transmiss++;
- } else
- transsoftabort++;
+ //Switch case
+ control = tdata->recvmsg[i].rcv_status;
+ switch(control) {
+ case TRANS_DISAGREE:
+ printf("DEBUG-> Inside TRANS_DISAGREE\n");
+ transdisagree++;
+ //Free transaction records
+ objstrDelete(tdata->rec->cache);
+ chashDelete(tdata->rec->lookupTable);
+ //send Abort
+ ctrl = TRANS_ABORT;
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
+ return 1;
+ }
+ break;
+
+ case TRANS_AGREE:
+ printf("DEBUG-> Inside TRANS_AGREE\n");
+ transagree++;
+ break;
+
+ case TRANS_SOFT_ABORT:
+ printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
+ transsoftabort++;
+ //Read list of objects missing
+ read(sd, &oidcount, sizeof(int));
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound[i];
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
+
+ break;
+
+ case TRANS_AGREE_BUT_MISSING_OBJECTS:
+ printf("DEBUG-> Inside TRANS_AGREE_BUT_MISSING_OBJECTS\n");
+ transmiss++;
+ //Read list of objects missing
+ read(sd, &oidcount, sizeof(int));
+ N = oidcount * sizeof(unsigned int);
+ if((oidnotfound[i] = calloc(oidcount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ }
+ ptr = (char *) oidnotfound[i];
+ do {
+ n = read(sd, ptr+sum, N-sum);
+ sum += n;
+ } while(sum < N && n !=0);
+
+
+ break;
+ default:
+ printf("Participant sent unknown message\n");
+ }
+ }
+
+ //For Debug purposes
+ for(i=0 ; i< tdata->pilecount; i++) {
+ for(j=0 ; j < oidcount; j++) {
+ printf("DEBUG-> Oid %d missing for pilecount: %d\n", oidnotfound[j], i+1);
+ }
}
+
+ //Decide what control message to send to Participant
if(transagree == tdata->pilecount){
//Send Commit
- buffer[0] = TRANS_COMMIT;
- if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
- perror("Error sending message for thread");
+ ctrl = TRANS_COMMIT;
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
return 1;
}
}
- if(transsoftabort > 0 && transabort == 0) {
+
+ if(transsoftabort > 0 && transdisagree == 0) {
//Send abort but retry commit
+ ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
+ return 1;
+ }
+ //lookup objects and then retry commit
+ //set up a new connection readClientReq()
+ //rebuilt the pile and llookup table
//i.e. wait at the participant end and then resend either agree or disagree
- //
-
}
- if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+ if(transmiss > 0 && transsoftabort == 0 && transdisagree == 0) {
//Relookup all missing objects
//send missing mising object/ objects
}
+
+ //Free pointers
+ for(i=0 ; i< tdata->pilecount; i++) {
+ free(oidnotfound[i]);
+ }
+
+ return 0;
}
void *transRequest(void *threadarg) {
//Multiple writes for sending packets of data
//Send first few fixed bytes of the TRANS_REQUEST protocol
- printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control);
- printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
- if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
+ printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
+// printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
+// printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
+ if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
perror("Error sending fixed bytes for thread");
return NULL;
}
//Send list of machines involved in the transaction
- printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
+// printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
perror("Error sending list of machines for thread");
return NULL;
}
//Send oids and version number tuples for objects that are read
- printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount);
- if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
+// printf("Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
+// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
+ if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
perror("Error sending tuples for thread");
return NULL;
}
//Send objects that are modified
- for( i = 0; i < tdata->buffer->f.nummod ; i++) {
+ for(i = 0; i < tdata->buffer->f.nummod ; i++) {
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
- printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]);
- if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
+// printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
+ if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
perror("Error sending obj modified for thread");
return NULL;
}
}
- //Read message from participant side
- while(n != 0) {
- n = read(sd, buffer, sizeof(buffer));
- }
- //process the participant's request
- recvcontrol = buffer[0];
+ //Read message control message from participant side
+ n = read(sd, &control, sizeof(char));
+ recvcontrol = control;
+ printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
+// while(n != 0) {
+// n = read(sd, buffer, sizeof(buffer));
+// }
+
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
//Lock and update count
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
- if (decideResponse(tdata, buffer, sd) == 1) {
- printf("decideResponse returned error\n");
+ //process the participant's request
+ if (decideResponse(tdata, sd) == 1) {
+ printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
return NULL;
}
} else {
int transCommit(transrecord_t *record) {
chashlistnode_t *curr, *ptr, *next;
unsigned int size;//Represents number of bins in the chash table
- unsigned int machinenum, tot_bytes_mod;
+ unsigned int machinenum, tot_bytes_mod, *listmid;
objheader_t *headeraddr;
plistnode_t *tmp, *pile = NULL;
int i, rc;
}
//Make machine groups
if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
- perror("pInsert calloc error");
+ printf("pInsert error %s, %d\n", __FILE__, __LINE__);
return 1;
}
curr = next;
pthread_cond_init(&tcond, NULL);
//Keep track of list of machine ids per transaction
- unsigned int *listmid = calloc(pilecount, sizeof(unsigned int));
+ if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ return 1;
+ }
+
pListMid(pile, listmid);
//Process each machine group
while(tmp != NULL) {
newtid++;
trans_req_data_t *tosend;
if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
- perror("");
+ printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
}
tosend->f.control = TRANS_REQUEST;
return 1;
}
numthreads++;
- //TODO frees ?
+ //TODO frees
+ free(tosend);
tmp = tmp->next;
}
control = READ_REQUEST;
buffer[0] = control;
memcpy(buffer+1, &oid, sizeof(int));
- if (write(sd, buffer, sizeof(buffer)) < 0) {
+ if (write(sd, buffer, sizeof(int) + 1) < 0) {
perror("Error sending message");
return NULL;
}