#ifndef _DSTM_H_
#define _DSTM_H_
+//Client Messages
+#define READ_REQUEST 1
+#define READ_MULT_REQUEST 2
+#define MOVE_REQUEST 3
+#define MOVE_MULT_REQUEST 4
+#define TRANS_REQUEST 5
+#define TRANS_ABORT 6
+#define TRANS_COMMIT 7
+
+//Server Messages
+#define OBJECT_FOUND 8
+#define OBJECT_NOT_FOUND 9
+#define OBJECTS_FOUND 10
+#define OBJECTS_NOT_FOUND 11
+#define TRANS_AGREE 12
+#define TRANS_DISAGREE 13
+#define TRANS_SUCESSFUL 14
+
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#define LISTEN_PORT 2156
#define BACKLOG 10 //max pending connections
-#define RECIEVE_BUFFER_SIZE 2048
+#define RECEIVE_BUFFER_SIZE 2048
extern int classsize[];
void *dstmAccept(void *acceptfd)
{
int numbytes,i,choice, oid;
- char buffer[RECIEVE_BUFFER_SIZE];
- char opcode[10];
+ char buffer[RECEIVE_BUFFER_SIZE], control;
void *srcObj;
objheader_t *h;
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
printf("Recieved connection: fd = %d\n", (int)acceptfd);
- do
+ numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
+ if (numbytes == -1)
{
- numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
- buffer[numbytes] = '\0';
- if (numbytes == -1)
- {
- perror("recv");
- pthread_exit(NULL);
- }
- else
- {
- sscanf(buffer, "%s %d\n", opcode, &oid);
-
- if (strcmp(opcode, "TRANS_RD") == 0) {
- printf("DEBUG -> Requesting: %s %d\n", opcode, oid);
+ perror("recv");
+ pthread_exit(NULL);
+ }
+ else
+ {
+ control = buffer[0];
+ switch(control) {
+ case READ_REQUEST:
+ oid = *((int *)(buffer+1));
+#ifdef DEBUG1
+ printf("DEBUG -> Received oid is %d\n", oid);
+#endif
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
+ if (h == NULL) {
+ buffer[0] = OBJECT_NOT_FOUND;
+ } else {
+ buffer[0] = OBJECT_FOUND;
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+ memcpy(buffer+1, srcObj, size);
+ }
+#ifdef DEBUG1
printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
- size = sizeof(objheader_t) + sizeof(classsize[h->type]);
- if(send((int)acceptfd, srcObj, size, 0) < 0) {
+#endif
+
+ if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) {
perror("");
}
- //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size));
- }
- else if (strcmp(opcode, "TRANS_COMMIT") == 0)
- printf(" This is a Commit\n");
- else if (strcmp(opcode,"TRANS_ABORT") == 0)
- printf(" This is a Abort\n");
- else
- printf(" This is a Broadcastt\n");
-
- //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
- //printf("%s", buffer);
+ break;
+ case READ_MULT_REQUEST:
+ break;
+ case MOVE_REQUEST:
+ break;
+ case MOVE_MULT_REQUEST:
+ break;
+ case TRANS_REQUEST:
+ break;
+ case TRANS_ABORT:
+ break;
+ case TRANS_COMMIT:
+ break;
+ default:
+ printf("Error receiving");
}
-
- //} while (numbytes != 0);
- } while (0);
+ //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
+ //printf("%s", buffer);
+ }
if (close((int)acceptfd) == -1)
{
perror("close");
} else { // Insert in the linked list
if ((node = calloc(1, sizeof(lhashlistnode_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&llookup.locktable);
return 1;
}
node->oid = oid;
pthread_mutex_lock(&llookup.locktable);
while(node != NULL) {
if(node->oid == oid) {
+ pthread_mutex_unlock(&llookup.locktable);
return node->mid;
}
node = node->next;
prev->next = curr->next;
free(curr);
}
+ pthread_mutex_unlock(&llookup.locktable);
return 0;
}
prev = curr;
} else { // Insert in the beginning of linked list
if ((node = calloc(1, sizeof(mhashlistnode_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(&mlookup.locktable);
return 1;
}
node->key = key;
pthread_mutex_lock(&mlookup.locktable);
while(node != NULL) {
if(node->key == key) {
+ pthread_mutex_unlock(&mlookup.locktable);
return node->val;
}
node = node->next;
prev->next = curr->next;
free(curr);
}
+ pthread_mutex_unlock(&mlookup.locktable);
return 0;
}
prev = curr;
dstmInit();
record = transStart();
- printf("DEBUG -> Init done");
- h1 = transRead(record, 3);
+ printf("DEBUG -> Init done\n");
+ h1 = transRead(record, 1);
printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
- h3 = transRead(record, 1);
+ h3 = transRead(record, 3);
printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
- /*
- h4 = transRead(&record, 4);
+ h4 = transRead(record, 4);
printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
- h3 = transRead(&record, 2);
+ h2 = transRead(record, 2);
printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
- */
+ h4 = transRead(record, 4);
+ printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+ h3 = transRead(record, 3);
+ printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+ h1 = transRead(record, 5);
// getRemoteObj(&record, 0,1);
}
#define LISTEN_PORT 2156
#define MACHINE_IP "127.0.0.1"
-#define RECIEVE_BUFFER_SIZE 2048
+#define RECEIVE_BUFFER_SIZE 2048
extern int classsize[];
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
- printf(" oid not found in cache\n");
+ printf("oid not found in local cache\n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
//Copy into cache
chashInsert(record->lookupTable, objheader->oid, objcopy);
return(objcopy);
} else {
- printf(" oid not found in Machine Lookup\n");
+ printf("oid not found in local machine lookup\n");
machinenumber = lhashSearch(oid);
- //Get object from a given machine
- /* if (getRemoteObj(record, machinenumber, oid) != 0) {
- printf("Error getRemoteObj");
- }
- */
objcopy = getRemoteObj(record, machinenumber, oid);
- return(objcopy);
+ if(objcopy == NULL)
+ printf("Object not found in Machine %d\n", machinenumber);
+ else
+ return(objcopy);
}
}
int sd, size;
struct sockaddr_in serv_addr;
struct hostent *server;
- char buffer[RECIEVE_BUFFER_SIZE];
+ char buffer[RECEIVE_BUFFER_SIZE],control;
objheader_t *h;
void *objcopy;
return NULL;
}
bzero((char *)buffer,sizeof(buffer));
- sprintf(buffer, "TRANS_RD %d\n", oid);
+ control = READ_REQUEST;
+ buffer[0] = control;
+ memcpy(buffer+1, &oid, sizeof(int));
if (write(sd, buffer, sizeof(buffer)) < 0) {
perror("Error sending message");
return NULL;
}
+
+#ifdef DEBUG1
printf("DEBUG -> ready to rcv ...\n");
- /*
- while (read(sd, buffer, sizeof(buffer)) != 0) {
- ;
- }
- */
+#endif
read(sd, buffer, sizeof(buffer));
- h = (objheader_t *) buffer;
- size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+ close(sd);
+ if (buffer[0] == OBJECT_NOT_FOUND) {
+ return NULL;
+ } else {
+
+ h = (objheader_t *) buffer+1;
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+#ifdef DEBUG1
printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
- fflush(stdout);
+#endif
+ }
objcopy = objstrAlloc(record->cache, size);
- memcpy(objcopy, (void *)buffer, size);
+ memcpy(objcopy, (void *)buffer+1, size);
//Insert into cache's lookup table
chashInsert(record->lookupTable, oid, objcopy);
return objcopy;