client:
- gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c
+ gcc -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c
server:
- gcc -g -o server dstmserver.c testserver.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c
+ gcc -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c
clean:
rm client server
struct chashlistnode *next;
} chashlistnode_t;
-typedef struct cashehashtable {
+typedef struct chashtable {
chashlistnode_t *table; // points to beginning of hash table
unsigned int size;
unsigned int numelements;
#define OBJECTS_FOUND 10
#define OBJECTS_NOT_FOUND 11
#define TRANS_AGREE 12
-#define TRANS_DISAGREE 13
-#define TRANS_SUCESSFUL 14
+#define TRANS_DISAGREE 13//for soft abort
+#define TRANS_DISAGREE_ABORT 14//for hard abort
+#define TRANS_SUCESSFUL 15//Not necessary for now
#include <stdlib.h>
#include <stdio.h>
//bit designations for status field of objheader
#define DIRTY 0x01
#define NEW 0x02
+#define LOCK 0x04
typedef struct objheader {
unsigned int oid;
chashtable_t *lookupTable;
} transrecord_t;
+typedef struct pile {
+ unsigned int mid;
+ unsigned int oid;
+ struct pile *next;
+}pile_t;
+
/* Initialize main object store and lookup tables, start server thread. */
int dstmInit(void);
case MOVE_MULT_REQUEST:
break;
case TRANS_REQUEST:
+ printf("Client sent %d\n",buffer[0]);
+ int offset = 1;
+ printf("Num Read %d\n",*((short*)(buffer+offset)));
+ offset += sizeof(short);
+ printf("Num modified %d\n",*((short*)(buffer+offset)));
+ handleTransReq(acceptfd, buffer);
break;
case TRANS_ABORT:
break;
pthread_exit(NULL);
}
+//TOOD put __FILE__ __LINE__ for all error conditions
+int handleTransReq(int acceptfd, char *buf) {
+ short numread = 0, nummod = 0;
+ char control;
+ int offset = 0, size,i;
+ int objnotfound = 0, transdis = 0, transabort = 0, transagree = 0;
+ objheader_t *headptr = NULL;
+ objstr_t *tmpholder;
+ void *top, *mobj;
+ char sendbuf[RECEIVE_BUFFER_SIZE];
+
+ control = buf[0];
+ offset = 1;
+ numread = *((short *)(buf+offset));
+ offset += sizeof(short);
+ nummod = *((short *)(buf+offset));
+ offset += sizeof(short);
+ if (numread) {
+ //Make an array to store the object headers for all objects that are only read
+ if ((headptr = calloc(numread, sizeof(objheader_t))) == NULL) {
+ perror("handleTransReq: Calloc error");
+ return 1;
+ }
+ //Process each object id that is only read
+ for (i = 0; i < numread; i++) {
+ objheader_t *tmp;
+ tmp = (objheader_t *) (buf + offset);
+ //find if object is still present in the same machine since TRANS_REQUEST
+ if ((mobj = mhashSearch(tmp->oid)) == NULL) {
+ objnotfound++;
+ /*
+ sendbuf[0] = OBJECT_NOT_FOUND;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else { // If obj found in machine (i.e. has not moved)
+ //Check if obj is locked
+ if ((((objheader_t *)mobj)->status >> 3) == 1) {
+ //Check version of the object
+ if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
+ transdis++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else {//If versions don't match ..HARD ABORT
+ transabort++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE_ABORT;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ }
+ } else {// If object not locked then lock it
+ ((objheader_t *)mobj)->status |= LOCK;
+ if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
+ transagree++;
+ /*
+ sendbuf[0] = TRANS_AGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else {//If versions don't match
+ transabort++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE_ABORT;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ }
+ }
+ }
+ memcpy(headptr, buf+offset, sizeof(objheader_t));
+ offset += sizeof(objheader_t);
+ }
+ }
+ if (nummod) {
+ if((tmpholder = objstrCreate(RECEIVE_BUFFER_SIZE)) == NULL) {
+ perror("handleTransReq: Calloc error");
+ return 1;
+ }
+
+ //Process each object id that is only modified
+ for(i = 0; i < nummod; i++) {
+ objheader_t *tmp;
+ tmp = (objheader_t *)(buf + offset);
+ //find if object is still present in the same machine since TRANS_REQUEST
+ if ((mobj = mhashSearch(tmp->oid)) == NULL) {
+ objnotfound++;
+ /*
+ sendbuf[0] = OBJECT_NOT_FOUND;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else { // If obj found in machine (i.e. has not moved)
+ //Check if obj is locked
+ if ((((objheader_t *)mobj)->status >> 3) == 1) {
+ //Check version of the object
+ if (tmp->version == ((objheader_t *)mobj)->version) {//If version match
+ transdis++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else {//If versions don't match ..HARD ABORT
+ transabort++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE_ABORT;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ }
+ } else {// If object not locked then lock it
+ ((objheader_t *)mobj)->status |= LOCK;
+ if (tmp->version == ((objheader_t *)mobj)->version) {//If versions match
+ transagree++;
+ /*
+ sendbuf[0] = TRANS_AGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ } else {//If versions don't match
+ transabort++;
+ /*
+ sendbuf[0] = TRANS_DISAGREE_ABORT;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ */
+ }
+ }
+ }
+
+ size = sizeof(objheader_t) + classsize[tmp->type];
+ if ((top = objstrAlloc(tmpholder, size)) == NULL) {
+ perror("handleTransReq: Calloc error");
+ return 1;
+ }
+ memcpy(top, buf+offset, size);
+ offset += size;
+ }
+ }
+ if(transabort > 0) {
+ sendbuf[0] = TRANS_DISAGREE_ABORT;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+
+ } else {
+ sendbuf[0] = TRANS_AGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
+ }
+ return 0;
+}
--- /dev/null
+ #include "plookup.h"
+
+plistnode_t *pCreate(int objects) {
+ plistnode_t *pile;
+
+ //Create main structure
+ if((pile = calloc(1, sizeof(plistnode_t))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pile->next = NULL;
+ //Create array of objects
+ if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pile->index = 0;
+ pile->vote = 0;
+ return pile;
+}
+
+unsigned int pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
+ plistnode_t *ptr, *tmp;
+ int found = 0;
+
+ tmp = pile;
+ //Add oid into a machine that is a part of the pile linked list structure
+ while(tmp != NULL) {
+ if (tmp->mid == mid) {
+ tmp->obj[tmp->index] = oid;
+ tmp->index++;
+ found = 1;
+ break;
+ }
+ tmp = tmp->next;
+ }
+ //Add oid for any new machine
+ if (!found) {
+ if((ptr = pCreate(num_objs)) == NULL) {
+ return 1;
+ }
+ ptr->mid = mid;
+ ptr->obj[ptr->index] = oid;
+ ptr->index++;
+ ptr->next = pile;
+ pile = ptr;
+ }
+ return 0;
+}
+
+// Return objects for a given mid
+unsigned int *pSearch(plistnode_t *pile, unsigned int mid) {
+ plistnode_t *tmp;
+ tmp = pile;
+ while(tmp != NULL) {
+ if(tmp->mid == mid) {
+ return(tmp->obj);
+ }
+ tmp = tmp->next;
+ }
+ return NULL;
+}
+
+//Delete the entire pile
+void pDelete(plistnode_t *pile) {
+ plistnode_t *next, *tmp;
+ tmp = pile;
+ while(tmp != NULL) {
+ next = tmp->next;
+ free(tmp->obj);
+ free(tmp);
+ tmp = next;
+ }
+ return;
+}
--- /dev/null
+#ifndef _PLOOKUP_H_
+#define _PLOOKUP_H_
+
+#include <stdlib.h>
+#include <stdio.h>
+
+typedef struct plistnode {
+ unsigned int mid;
+ unsigned int *obj; //this can be cast to another type or used to point to a larger structure
+ int index;
+ int vote;
+ struct plistnode *next;
+} plistnode_t;
+
+plistnode_t *pCreate(int);
+unsigned int pInsert(plistnode_t*, unsigned int mid, unsigned int oid, int);
+unsigned int *pSearch(plistnode_t *, unsigned int mid);
+void pDelete(plistnode_t *);
+
+#endif
+
#include "clookup.h"
#include "mlookup.h"
#include "llookup.h"
+#include "plookup.h"
#include<sys/types.h>
#include<sys/socket.h>
#include<netdb.h>
chashInsert(record->lookupTable, objheader->oid, objcopy);
return(objcopy);
} else {
+ //Get the object from the remote location
printf("oid not found in local machine lookup\n");
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL)
+ //If object is not found in Remote location
printf("Object not found in Machine %d\n", machinenumber);
else
return(objcopy);
}
int transCommit(transrecord_t *record){
- //Move objects to machine that hosts it
+ chashlistnode_t *curr, *ptr, *next;
+ unsigned int size;//Represents number of bins in the chash table
+ unsigned int machinenum;
+ objheader_t *headeraddr, *localheaderaddr;
+ plistnode_t *tmp, *pile = NULL;
+ int sd,n,i;
+ short numread = 0,nummod = 0;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ char buffer[RECEIVE_BUFFER_SIZE],control;
+
+ ptr = record->lookupTable->table;
+ size = record->lookupTable->size;
+ //Look through all the objects in the cache and make pils
+ //Outer loop for chashtable
+ for(i = 0; i < size ;i++) {
+ curr = &ptr[i];
+ //Inner loop to traverse the linked list of the cache lookupTable
+ while(curr != NULL) {
+ //if the first bin in hash table is empty
+ if(curr->key == 0) {
+ break;
+ }
+ next = curr->next;
+ //Get machine location for object id
+ machinenum = lhashSearch(curr->key);
+ // Make piles
+ pInsert(pile, machinenum, curr->key, record->lookupTable->numelements);
+ curr = next;
+ }
+ }
+
+ tmp = pile;
+ unsigned int oidmod[record->lookupTable->numelements];
+ unsigned int oidread[record->lookupTable->numelements];
+ //Process each machine in pile
+ while(tmp != NULL) {
+ //Identify which oids have been updated and which ones have been just read
+ for(i = 0; i < pile->index; i++) {
+ headeraddr = (objheader_t *) chashSearch(record->lookupTable, pile->obj[i]);
+ //check if object modified in cache ??
+ if(headeraddr->status >>= DIRTY){
+ //Keep track of oids that have been modified
+ oidmod[nummod] = headeraddr->oid;
+ nummod++;
+ } else {
+ oidread[numread] = headeraddr->oid;
+ numread++;
+ }
+ }
+ //Send Trans Request in the form
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket for TRANS_REQUEST");
+ return 1;
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ //serv_addr.sin_addr.s_addr = inet_addr(pile->mid);
+
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("Error in connect for TRANS_REQUEST");
+ return 1;
+ }
+
+ bzero((char *)buffer,sizeof(buffer));
+ control = TRANS_REQUEST;
+ buffer[0] = control;
+ //Send numread, nummod, sizeof header for objects read, size of header+objects that are modified
+ int offset = 1;
+ memcpy(buffer+offset, &numread, sizeof(short));
+ offset += sizeof(short);
+ memcpy(buffer+offset, &nummod, sizeof(short));
+ offset += sizeof(short);
+ for( i= 0; i< numread; i++) {
+ headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidread[i]);
+ memcpy(buffer+offset, headeraddr, sizeof(objheader_t));
+ offset += sizeof(objheader_t);
+ }
+ for( i= 0; i< nummod; i++) {
+ headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]);
+ memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]);
+ offset += sizeof(objheader_t) + classsize[headeraddr->type];
+ }
+ if (offset > RECEIVE_BUFFER_SIZE) {
+ printf("Error: Buffersize too small");
+ }
+ if (write(sd, buffer, sizeof(buffer)) < 0) {
+ perror("Error sending message");
+ return 1;
+ }
+#ifdef DEBUG1
+ printf("DEBUG -> ready to rcv ...\n");
+#endif
+ read(sd, buffer, sizeof(buffer));
+ close(sd);
+ printf("Server sent %d\n",buffer[0]);
+ /*
+ if (buffer[0] == TRANS_AGREE) {
+ //change machine pile
+
+ }
+ */
+ //Reset numread and nummod for the next pile
+ numread = nummod = 0;
+ tmp = tmp->next;
+
+ }
}
+
+#if 0
+int transCommit(transrecord_t *record){
+ //Look through all the objects in the cache
+ int i,numelements,isFirst;
+ unsigned int size,machinenum;//Represents number of buckets
+ void *address;
+ objheader_t *headeraddr,localheaderaddr;
+ chashlistnode_t *curr, *ptr, *next;
+ int sd, size;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ char buffer[RECEIVE_BUFFER_SIZE],control;
+
+ ptr = record->lookupTable->table;
+ size = record->lookupTable->size;
+ //Outer loop for chashtable
+ for(i = 0; i< size ;i++) {
+ curr = &ptr[i];
+ //Inner look to traverse the linked list of the cache lookupTable
+ while(curr != NULL) {
+ if(curr->key == 0) {
+ break;
+ }
+ //Find if local or remote
+ address = mhashSearch(curr->key);
+ d
+ localheaderaddr = (objheader_t *) curr->value;
+ if(address != NULL) {
+ //Is local so check if the local copy has been updated
+ headeraddr = (objheader_t *) address;
+ if(localheaderaddr->version == headeraddr->version){
+ //Lock Object
+
+ }
+ else {
+ //vote as DISAGREE
+ //Start TransAbort();
+ //Unlock object
+ }
+ }
+ else {
+ //Is remote
+ //Find which machine it belongs to
+ machinenum = lhashSearch(curr->key);
+ //Start TRANS_REQUEST to machine
+
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket");
+ return NULL;
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("Error in connect");
+ return NULL;
+ }
+ bzero((char *)buffer,sizeof(buffer));
+ control = READ_REQUEST;
+ buffer[0] = control;
+ memcpy(buffer+1, &oid, sizeof(int));
+ if (write(sd, buffer, sizeof(buffer)) < 0) {
+ perror("Error sending message");
+ return NULL;
+ }
+
+#ifdef DEBUG1
+ printf("DEBUG -> ready to rcv ...\n");
+#endif
+ read(sd, buffer, sizeof(buffer));
+ close(sd);
+
+ }
+ next = curr->next;
+ }
+ curr = next;
+ }
+
+}
+#endif
+
+
int transAbort(transrecord_t *record){
}