at the same time.
TODO: Fix another bug that causes trans to commit when it should be a
soft abort.
-client:
- gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+d-2:
+ gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+
+demksy:
+ gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+
+d-1:
+ gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
-server:
- gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
all:
- gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
- gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -lpthread -g -o d-2 trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -lpthread -g -o demksy dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -lpthread -g -o d-1 trans.c testd-1.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
clean:
- rm client server
+ rm d-2 d-1 demsky
else
printf("Closed connection: fd = %d\n", (int)acceptfd);
- //Free memory
- free(transinfo.objmod);
- free(transinfo.objlocked);
- free(transinfo.objnotfound);
+
pthread_exit(NULL);
+ printf("DEBUG -> Exiting dstmAccept\n");
}
int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
//TODO Use fixed.trans_id TID since Client may have died
break;
}
-
+ //Free memory
+ printf("DEBUG -> Freeing...");
+ fflush(stdout);
+ if (transinfo->objmod != NULL) {
+ free(transinfo->objmod);
+ transinfo->objmod = NULL;
+ }
+ if (transinfo->objlocked != NULL) {
+ free(transinfo->objlocked);
+ transinfo->objlocked = NULL;
+ }
+ if (transinfo->objnotfound != NULL) {
+ free(transinfo->objnotfound);
+ transinfo->objnotfound = NULL;
+ }
return 0;
}
oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
-
// Counters and arrays to formulate decision on control message to be sent
int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
int objmodnotfound = 0, nummodfound = 0;
((objheader_t *)mobj)->status |= LOCK;
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+ printf("DEBUG-> Obj locked are %d\n",((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
}
}
- //printf("No of objs locked = %d\n", objlocked);
- //printf("No of v_nomatch = %d\n", v_nomatch);
- //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
- //printf("No of objs v_match but had locks before = %d\n", v_matchlock);
- //printf("No of objs not found = %d\n", objnotfound);
- //printf("No of objs modified but not found = %d\n", objmodnotfound);
+ printf("No of objs locked = %d\n", objlocked);
+ printf("No of v_nomatch = %d\n", v_nomatch);
+ printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+ printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+ printf("No of objs not found = %d\n", objnotfound);
+ printf("No of objs modified but not found = %d\n", objmodnotfound);
//Decide what control message(s) to send
if(v_matchnolock == fixed->numread + fixed->nummod) {
if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
}
-
+
+ printf("DEBUG-> Completed the pending transaction\n");
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include "ip.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <netinet/in.h>
+#include <string.h>
+
+#define LISTEN_PORT 2156
unsigned int iptoMid(char *addr) {
ip_t i;
sscanf(addr, "%d.%d.%d.%d", &i.a, &i.b, &i.c, &i.d);
mid = (i.a << 24) | (i.b << 16) | (i.c << 8) | i.d;
+ fflush(stdout);
return mid;
}
return;
}
+int checkServer(int mid, char *machineip) {
+ int tmpsd;
+ struct sockaddr_in serv_addr;
+ char m[20];
+
+ strncpy(m, machineip, strlen(machineip));
+ // Foreach machine you want to transact with
+ // check if its up and running
+ if ((tmpsd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("");
+ return(-1);
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ midtoIP(mid, m);
+ m[15] = '\0';
+ serv_addr.sin_addr.s_addr = inet_addr(m);
+ while (connect(tmpsd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ sleep(1);
+ }
+ printf("DEBUG -> Connection established with %s\n", machineip);
+ close(tmpsd);
+ return 0;
+}
/*
main() {
unsigned int mid;
unsigned int iptoMid(char *);
void midtoIP(unsigned int, char *);
+int checkServer(int, char *);
#endif
//#include <sys/socket.h>
//#include <netinet/in.h>
//#include <arpa/inet.h>
+#define LISTEN_PORT 2156
extern objstr_t *mainobjstore;
//extern lhashtable_t llookup; //Global Hash table
// test2();
// test3();
// test4();
- test5();
+ //test5();
+ test6();
+
}
int test1(void) {
transCommit(record);
pthread_join(thread_Listen, NULL);
}
+
+int test6(void) {
+ transrecord_t *record;
+ objheader_t *header;
+ unsigned int size, mid;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ objheader_t *h1,*h2, *h3, *h4, *h5, *h6;
+ int tmpsd;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
+ record = transStart();
+ //printf("DEBUG -> Init done\n");
+ mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu
+ lhashInsert(1,mid);
+ lhashInsert(2,mid);
+ lhashInsert(3,mid);
+ lhashInsert(4,mid);
+ lhashInsert(5,mid);
+ lhashInsert(6,mid);
+
+ mid = iptoMid("128.200.9.26");// Machine demsky.eecs.uci.edu
+ lhashInsert(31,mid);
+ lhashInsert(32,mid);
+ lhashInsert(33,mid);
+
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ checkServer(mid, "128.200.9.26");
+ mid = iptoMid("128.200.9.10");
+ checkServer(mid, "128.200.9.10");
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+ sleep(3);
+ //read object 1 //from demsky
+ if((h1 = transRead(record, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 2
+ if((h2 = transRead(record, 2)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 31 //Found in d-1
+ if((h2 = transRead(record, 31)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 32 //Found in d-1
+ if((h2 = transRead(record, 32)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 20(present in local machine)
+ if((h3 = transRead(record, 20)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 21(present in local machine)
+ if((h4 = transRead(record, 21)) == NULL) {
+ printf("Object not found\n");
+ }
+ transCommit(record);
+ pthread_join(thread_Listen, NULL);
+ return 0;
+}
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
+#include "ip.h"
extern objstr_t *mainobjstore;
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
int test1(void);
int test2(void);
+int test3(void);
unsigned int createObjects(transrecord_t *record, unsigned short type) {
objheader_t *header, *tmp;
struct sockaddr_in antelope;
unsigned int size, mid;
size = sizeof(objheader_t) + classsize[type] ;
+ //Inserts in chashtable
header = transCreateObj(record, type);
tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
memcpy(tmp, header, size);
return 0;
}
+void init_obj(objheader_t *h, unsigned int oid, unsigned short type, \
+ unsigned short version,\
+ unsigned short rcount, char status) {
+ h->oid = oid;
+ h->type = type;
+ h->version = version;
+ h->rcount = rcount;
+ h->status |= status;
+ return;
+}
+
+
int main()
{
- test2();
+ //sleep(3);
+ test3();
}
int test1()
pthread_t thread_Listen;
dstmInit();
- mid = iptoMid("128.200.9.27");
+ mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
//Inserting into lhashtable
lhashInsert(20, mid);
lhashInsert(21, mid);
}
pthread_join(thread_Listen, NULL);
}
+
+int test3() {
+
+ unsigned int val, mid;
+ transrecord_t *myTrans;
+ unsigned int size;
+ objheader_t *header;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
+ objheader_t *h1, *h2, *h3, *h4, *h5;
+
+ dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ mid = iptoMid("128.200.9.27"); //d-2.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(20, mid);
+ lhashInsert(21, mid);
+ lhashInsert(22, mid);
+
+ mid = iptoMid("128.200.9.26"); //d-1.eecs.uci.edu
+ //Inserting into lhashtable
+ lhashInsert(31, mid);
+ lhashInsert(32, mid);
+ lhashInsert(33, mid);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+// pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+
+ printf("DEBUG -> mid = %d\n", mid);
+ checkServer(mid, "128.200.9.26");
+ mid = iptoMid("128.200.9.27");
+ printf("DEBUG -> mid = %d\n", mid);
+ checkServer(mid, "128.200.9.27");
+
+ // Start Transaction
+ myTrans = transStart();
+/*
+ //Create Object1
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj1");
+ }
+ //Create Object2
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj2");
+ }
+ //Create Object3
+ if((val = createObjects(myTrans, 2)) != 0) {
+ printf("Error transCreateObj3");
+ }
+ //Create Object4
+ if((val = createObjects(myTrans, 3)) != 0) {
+ printf("Error transCreateObj4");
+ }
+ //Create Object5
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj5");
+ }
+ //Create Object6
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj6");
+ }
+
+ */
+ // Create and Insert Oid 1
+ size = sizeof(objheader_t) + classsize[0] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 1, 0, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 2
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 2, 1, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+
+ // Create and Insert Oid 3
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 3, 2, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 4
+ size = sizeof(objheader_t) + classsize[3] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 4, 3, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 5
+ size = sizeof(objheader_t) + classsize[0] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 5, 0, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ // Create and Insert Oid 6
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ init_obj(header, 6, 1, 1, 0, NEW);
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.10");
+ lhashInsert(header->oid, mid);
+
+ //read object 1(present in local machine)
+ if((h1 = transRead(myTrans, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 2present in local machine)
+ if((h2 = transRead(myTrans, 2)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 3(present in local machine)
+ if((h3 = transRead(myTrans, 3)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 31 (present in d-1. eecs)
+ if((h4 = transRead(myTrans, 31)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 20 (present in d-2. eecs)
+ if((h5 = transRead(myTrans, 20)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ transCommit(myTrans);
+
+ pthread_join(thread_Listen, NULL);
+
+ return 0;
+}
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
- printf("oid is found in Local mlookup\n");
+
+ //printf("oid is found in Local machinelookup\n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
//Copy into cache
return(objcopy);
} else {
//Get the object from the remote location
- printf("oid is found in remote machine\n");
+ //printf("oid is found in remote machine\n");
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
switch(control) {
case OBJECT_NOT_FOUND:
return NULL;
- break;
case OBJECT_FOUND:
if((val = read(sd, &size, sizeof(int))) <= 0) {
perror("No size is read from the participant\n");