break;
default:
printf("No response to TRANS_AGREE OR DISAGREE protocol\n");
+ //TODO Use fixed.trans_id TID since Client may have died
break;
}
//Process each object present in the pile
ptr = modptr;
+ printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
+ fflush(stdout);
//Process each oid in the machine pile/ group
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
if (i < fixed->numread) {//Object is read
((objheader_t *)mobj)->status |= LOCK;
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
+ printf("DEBUG-> Object to be locked is %d\n", ((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
return 0;
}
+
--- /dev/null
+#include <stdio.h>
+#include <stdlib.h>
+#include "ip.h"
+
+unsigned int iptoMid(char *addr) {
+ ip_t i;
+ unsigned int mid;
+
+ sscanf(addr, "%d.%d.%d.%d", &i.a, &i.b, &i.c, &i.d);
+ mid = (i.a << 24) | (i.b << 16) | (i.c << 8) | i.d;
+ return mid;
+}
+
+void midtoIP(unsigned int mid, char *ptr) {
+ ip_t i;
+
+ i.a = (mid & 0xff000000) >> 24;
+ i.b = (mid & 0x00ff0000) >> 16;
+ i.c = (mid & 0x0000ff00) >> 8;
+ i.d = mid & 0x000000ff;
+ sprintf(ptr, "%d.%d.%d.%d", i.a, i.b, i.c, i.d);
+ return;
+}
+
+/*
+main() {
+ unsigned int mid;
+ ip_t i;
+ char ip[16];
+
+ memset(ip, 0, 16);
+ mid = iptoMid("192.10.0.1");
+ printf("mid = %x\n", mid);
+ midtoIP(mid, ip);
+ ip[15] = '\0';
+ printf("%s\n",ip);
+}
+*/
--- /dev/null
+#ifndef _ip_h_
+#define _ip_h_
+
+typedef struct ip {
+ short a;
+ short b;
+ short c;
+ short d;
+}ip_t;
+
+unsigned int iptoMid(char *);
+void midtoIP(unsigned int, char *);
+
+#endif
#include<stdio.h>
#include "dstm.h"
#include "llookup.h"
+#include "ip.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+extern objstr_t *mainobjstore;
+//extern lhashtable_t llookup; //Global Hash table
+//extern mhashtable_t mlookup; //Global Hash table
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
int test1(void);
int test2(void);
+unsigned int createObjects(transrecord_t *record) {
+ objheader_t *header, *tmp;
+ struct sockaddr_in antelope;
+ unsigned int size, mid;
+ int i = 0;
+ for(i = 20 ; i< 23; i++) {
+ size = sizeof(objheader_t) + classsize[i-20] ;
+ tmp = (objheader_t *)objstrAlloc(record->cache, size);
+ tmp->oid = i;
+ tmp->type = (i-20);
+ tmp->version = 1;
+ tmp->rcount = 0; //? not sure how to handle this yet
+ tmp->status = 0;
+ tmp->status |= NEW;
+ chashInsert(record->lookupTable, tmp->oid, tmp);
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ memcpy(header, tmp, size);
+ mhashInsert(header->oid, header);
+ //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
+ //mid = iptoMid(inet_ntoa(antelope.sin_addr));
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(header->oid, mid);
+ // lhashInsert(header->oid, 1);
+ }
+ // printf("Insert oid = %d at address %x\n",tmp->oid, tmp);
+ size = sizeof(objheader_t) + classsize[0] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 30;
+ header->type = 0;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
+ //mid = iptoMid(inet_ntoa(antelope.sin_addr));
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(header->oid, mid);
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 28;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= LOCK;
+ mhashInsert(header->oid, header);
+ //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
+ //mid = iptoMid(inet_ntoa(antelope.sin_addr));
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(header->oid, mid);
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 29;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= LOCK;
+ mhashInsert(header->oid, header);
+ //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
+ //mid = iptoMid(inet_ntoa(antelope.sin_addr));
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(header->oid, mid);
+ return 0;
+}
+
int main()
{
- test2();
+// test2();
+// test3();
+// test4();
+ test5();
}
int test1(void) {
dstmInit();
record = transStart();
+
+ lhashInsert(1,1);
+ lhashInsert(2,1);
+ lhashInsert(3,1);
+ lhashInsert(4,1);
+ lhashInsert(5,1);
+ lhashInsert(6,1);
printf("DEBUG -> Init done\n");
h1 = transRead(record, 1);
lhashInsert(h1->oid, 1);
lhashInsert(h3->oid, 1);
h4 = transRead(record, 4);
lhashInsert(h4->oid, 1);
- h4->status |= DIRTY;
+// h4->status |= DIRTY;
h5 = transRead(record, 5);
lhashInsert(h5->oid, 1);
h6 = transRead(record, 6);
lhashInsert(h6->oid, 1);
- h6->status |= DIRTY;
+// h6->status |= DIRTY;
+ transCommit(record);
-
+ return 0;
+}
+//Test Read objects when objects are not found in any participant
+int test3(void){
+ transrecord_t *record;
+ objheader_t *h1,*h2;
+
+ dstmInit();
+ record = transStart();
+ printf("DEBUG -> Init done\n");
+ //read object 11
+ if((h1 = transRead(record, 11)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 12
+ if((h2 = transRead(record, 12)) == NULL) {
+ printf("Object not found\n");
+ }
transCommit(record);
+
+ return 0;
+}
+
+//Test Read objects when some objects are found and other objects not found in any participant
+int test4(void) {
+ transrecord_t *record;
+ objheader_t *h1,*h2, *h3, *h4;
+
+ dstmInit();
+ record = transStart();
+ printf("DEBUG -> Init done\n");
+ //read object 1
+ if((h1 = transRead(record, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 2
+ if((h2 = transRead(record, 2)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 11
+ if((h3 = transRead(record, 11)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 13
+ if((h4 = transRead(record, 13)) == NULL) {
+ printf("Object not found\n");
+ }
+ if((h1 != NULL) && (h2 != NULL) && (h3 != NULL) && h4 !=NULL) {
+ transCommit(record);
+ }else {
+ printf("Cannot complete this transaction\n");
+ }
+
+ return 0;
}
+//Test for transaction objects when the objs are part of the Coordinator machine starting the
+//trans commit
+int test5(void) {
+ transrecord_t *record;
+ unsigned int mid;
+ objheader_t *h1,*h2, *h3, *h4;
+ dstmInit();
+ record = transStart();
+ printf("DEBUG -> Init done\n");
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(1,mid);
+ lhashInsert(2,mid);
+ lhashInsert(3,mid);
+ lhashInsert(4,mid);
+ lhashInsert(5,mid);
+ lhashInsert(6,mid);
+ createObjects(record);
+ //read object 1
+ if((h1 = transRead(record, 1)) == NULL){
+ printf("Object not found\n");
+ }
+ //read object 5
+ if((h2 = transRead(record, 5)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 20(present in local machine)
+ if((h3 = transRead(record, 20)) == NULL) {
+ printf("Object not found\n");
+ }
+ //read object 21(present in local machine)
+ if((h4 = transRead(record, 21)) == NULL) {
+ printf("Object not found\n");
+ }
+
+ transCommit(record);
+}
#include <pthread.h>
#include "dstm.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
extern objstr_t *mainobjstore;
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
unsigned int createObjects(transrecord_t *record, unsigned short type) {
objheader_t *header, *tmp;
- unsigned int size;
+ struct sockaddr_in antelope;
+ unsigned int size, mid;
size = sizeof(objheader_t) + classsize[type] ;
header = transCreateObj(record, type);
tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
memcpy(tmp, header, size);
-// printf("Insert oid = %d at address %x\n",tmp->oid, tmp);
mhashInsert(tmp->oid, tmp);
- lhashInsert(tmp->oid, 1);
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(tmp->oid, mid);
//Lock oid 3 object
// if(tmp->oid == 3)
// tmp->status |= LOCK;
int test2() {
- unsigned int val;
+ unsigned int val, mid;
transrecord_t *myTrans;
pthread_t thread_Listen;
dstmInit();
+ mid = iptoMid("127.0.0.1");
+ lhashInsert(20, mid);
+ lhashInsert(21, mid);
+ lhashInsert(22, mid);
+ lhashInsert(23, mid);
+ lhashInsert(30, mid);
+ lhashInsert(28, mid);
+ lhashInsert(29, mid);
pthread_create(&thread_Listen, NULL, dstmListen, NULL);
// Start Transaction
myTrans = transStart();
#include "dstm.h"
+#include "ip.h"
#include "clookup.h"
#include "mlookup.h"
#include "llookup.h"
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
+ printf("Enter TRANS_READ\n");
unsigned int machinenumber;
objheader_t *tmp, *objheader;
void *objcopy;
void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
+ printf("DEBUG -> transRead oid %d found local\n", oid);
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
} else {
//Get the object from the remote location
//printf("oid not found in local machine lookup\n");
+ printf("machinenumber = %d\n",machinenumber);
+ printf("oid = %d\n",oid);
machinenumber = lhashSearch(oid);
+ printf("machinenumber = %d\n",machinenumber);
objcopy = getRemoteObj(record, machinenumber, oid);
- if(objcopy == NULL)
+ if(objcopy == NULL) {
//If object is not found in Remote location
printf("Object not found in Machine %d\n", machinenumber);
+ return NULL;
+ }
else
return(objcopy);
}
objheader_t *headeraddr;
//unsigned int *oidnotfound;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
+ char machineip[16];
tdata = (thread_data_array_t *) threadarg;
printf("DEBUG -> New thread id %d\n", tdata->thread_id);
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ midtoIP(tdata->mid,machineip);
+ machineip[15] = '\0';
+ serv_addr.sin_addr.s_addr = inet_addr(machineip);
//serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
}
next = curr->next;
//Get machine location for object id
- /*
+
if ((machinenum = lhashSearch(curr->key)) == 0) {
printf("Error: No such machine\n");
return 1;
- }
- */
+ }
+
//TODO only for debug
- machinenum = 1;
+ //machinenum = 1;
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
printf("Error: No such oid\n");
return 1;
struct sockaddr_in serv_addr;
struct hostent *server;
char control;
+ char machineip[16];
objheader_t *h;
void *objcopy;
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(LISTEN_PORT);
- serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+ midtoIP(mnum,machineip);
+ machineip[15] = '\0';
+ serv_addr.sin_addr.s_addr = inet_addr(machineip);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect");
return NULL;
}
-// bzero((char *)buffer,sizeof(buffer));
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
- //buffer[0] = READ_REQUEST;
- //memcpy(buffer+1, &oid, sizeof(int));
- //if (write(sd, buffer, sizeof(int) + sizeof(char)) < 0) {
if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
perror("Error sending message");
return NULL;
read(sd, &size, sizeof(int));
objcopy = objstrAlloc(record->cache, size);
read(sd, objcopy, size);
+ //Insert into cache's lookup table
+ chashInsert(record->lookupTable, oid, objcopy);
break;
default:
printf("Error in recv request from participant on a READ_REQUEST\n");
return NULL;
}
- //Insert into cache's lookup table
- chashInsert(record->lookupTable, oid, objcopy);
return objcopy;
}