From: adash Date: Mon, 19 Mar 2007 02:12:39 +0000 (+0000) Subject: *** empty log message *** X-Git-Tag: preEdgeChange~653 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=c9799c66e7b971f3d66ce814598516d0b54d7ebb;p=IRC.git *** empty log message *** --- diff --git a/Robust/src/Runtime/DSTM/interface/dstm.h b/Robust/src/Runtime/DSTM/interface/dstm.h index 64600ff4..eb82144a 100644 --- a/Robust/src/Runtime/DSTM/interface/dstm.h +++ b/Robust/src/Runtime/DSTM/interface/dstm.h @@ -1,6 +1,24 @@ #ifndef _DSTM_H_ #define _DSTM_H_ +//Client Messages +#define READ_REQUEST 1 +#define READ_MULT_REQUEST 2 +#define MOVE_REQUEST 3 +#define MOVE_MULT_REQUEST 4 +#define TRANS_REQUEST 5 +#define TRANS_ABORT 6 +#define TRANS_COMMIT 7 + +//Server Messages +#define OBJECT_FOUND 8 +#define OBJECT_NOT_FOUND 9 +#define OBJECTS_FOUND 10 +#define OBJECTS_NOT_FOUND 11 +#define TRANS_AGREE 12 +#define TRANS_DISAGREE 13 +#define TRANS_SUCESSFUL 14 + #include #include #include diff --git a/Robust/src/Runtime/DSTM/interface/dstmserver.c b/Robust/src/Runtime/DSTM/interface/dstmserver.c index e154c417..fca26644 100644 --- a/Robust/src/Runtime/DSTM/interface/dstmserver.c +++ b/Robust/src/Runtime/DSTM/interface/dstmserver.c @@ -10,7 +10,7 @@ #define LISTEN_PORT 2156 #define BACKLOG 10 //max pending connections -#define RECIEVE_BUFFER_SIZE 2048 +#define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; @@ -79,50 +79,62 @@ void *dstmListen() void *dstmAccept(void *acceptfd) { int numbytes,i,choice, oid; - char buffer[RECIEVE_BUFFER_SIZE]; - char opcode[10]; + char buffer[RECEIVE_BUFFER_SIZE], control; void *srcObj; objheader_t *h; int fd_flags = fcntl((int)acceptfd, F_GETFD), size; printf("Recieved connection: fd = %d\n", (int)acceptfd); - do + numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0); + if (numbytes == -1) { - numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0); - buffer[numbytes] = '\0'; - if (numbytes == -1) - { - perror("recv"); - pthread_exit(NULL); - } - else - { - sscanf(buffer, "%s %d\n", opcode, &oid); - - if (strcmp(opcode, "TRANS_RD") == 0) { - printf("DEBUG -> Requesting: %s %d\n", opcode, oid); + perror("recv"); + pthread_exit(NULL); + } + else + { + control = buffer[0]; + switch(control) { + case READ_REQUEST: + oid = *((int *)(buffer+1)); +#ifdef DEBUG1 + printf("DEBUG -> Received oid is %d\n", oid); +#endif srcObj = mhashSearch(oid); h = (objheader_t *) srcObj; + if (h == NULL) { + buffer[0] = OBJECT_NOT_FOUND; + } else { + buffer[0] = OBJECT_FOUND; + size = sizeof(objheader_t) + sizeof(classsize[h->type]); + memcpy(buffer+1, srcObj, size); + } +#ifdef DEBUG1 printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type); - size = sizeof(objheader_t) + sizeof(classsize[h->type]); - if(send((int)acceptfd, srcObj, size, 0) < 0) { +#endif + + if(send((int)acceptfd, (void *)buffer, sizeof(buffer), 0) < 0) { perror(""); } - //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size)); - } - else if (strcmp(opcode, "TRANS_COMMIT") == 0) - printf(" This is a Commit\n"); - else if (strcmp(opcode,"TRANS_ABORT") == 0) - printf(" This is a Abort\n"); - else - printf(" This is a Broadcastt\n"); - - //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); - //printf("%s", buffer); + break; + case READ_MULT_REQUEST: + break; + case MOVE_REQUEST: + break; + case MOVE_MULT_REQUEST: + break; + case TRANS_REQUEST: + break; + case TRANS_ABORT: + break; + case TRANS_COMMIT: + break; + default: + printf("Error receiving"); } - - //} while (numbytes != 0); - } while (0); + //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd); + //printf("%s", buffer); + } if (close((int)acceptfd) == -1) { perror("close"); diff --git a/Robust/src/Runtime/DSTM/interface/llookup.c b/Robust/src/Runtime/DSTM/interface/llookup.c index 02469a14..cc989a28 100644 --- a/Robust/src/Runtime/DSTM/interface/llookup.c +++ b/Robust/src/Runtime/DSTM/interface/llookup.c @@ -64,6 +64,7 @@ unsigned int lhashInsert(unsigned int oid, unsigned int mid) { } else { // Insert in the linked list if ((node = calloc(1, sizeof(lhashlistnode_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&llookup.locktable); return 1; } node->oid = oid; @@ -87,6 +88,7 @@ unsigned int lhashSearch(unsigned int oid) { pthread_mutex_lock(&llookup.locktable); while(node != NULL) { if(node->oid == oid) { + pthread_mutex_unlock(&llookup.locktable); return node->mid; } node = node->next; @@ -122,6 +124,7 @@ unsigned int lhashRemove(unsigned int oid) { prev->next = curr->next; free(curr); } + pthread_mutex_unlock(&llookup.locktable); return 0; } prev = curr; diff --git a/Robust/src/Runtime/DSTM/interface/mlookup.c b/Robust/src/Runtime/DSTM/interface/mlookup.c index 44317e2a..bcee6650 100644 --- a/Robust/src/Runtime/DSTM/interface/mlookup.c +++ b/Robust/src/Runtime/DSTM/interface/mlookup.c @@ -54,6 +54,7 @@ unsigned int mhashInsert(unsigned int key, void *val) { } else { // Insert in the beginning of linked list if ((node = calloc(1, sizeof(mhashlistnode_t))) == NULL) { printf("Calloc error %s, %d\n", __FILE__, __LINE__); + pthread_mutex_unlock(&mlookup.locktable); return 1; } node->key = key; @@ -76,6 +77,7 @@ void *mhashSearch(unsigned int key) { pthread_mutex_lock(&mlookup.locktable); while(node != NULL) { if(node->key == key) { + pthread_mutex_unlock(&mlookup.locktable); return node->val; } node = node->next; @@ -111,6 +113,7 @@ unsigned int mhashRemove(unsigned int key) { prev->next = curr->next; free(curr); } + pthread_mutex_unlock(&mlookup.locktable); return 0; } prev = curr; diff --git a/Robust/src/Runtime/DSTM/interface/testclient.c b/Robust/src/Runtime/DSTM/interface/testclient.c index ee384473..0a1715b9 100644 --- a/Robust/src/Runtime/DSTM/interface/testclient.c +++ b/Robust/src/Runtime/DSTM/interface/testclient.c @@ -10,16 +10,19 @@ int main() { dstmInit(); record = transStart(); - printf("DEBUG -> Init done"); - h1 = transRead(record, 3); + printf("DEBUG -> Init done\n"); + h1 = transRead(record, 1); printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]); - h3 = transRead(record, 1); + h3 = transRead(record, 3); printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); - /* - h4 = transRead(&record, 4); + h4 = transRead(record, 4); printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); - h3 = transRead(&record, 2); + h2 = transRead(record, 2); printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]); - */ + h4 = transRead(record, 4); + printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]); + h3 = transRead(record, 3); + printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]); + h1 = transRead(record, 5); // getRemoteObj(&record, 0,1); } diff --git a/Robust/src/Runtime/DSTM/interface/trans.c b/Robust/src/Runtime/DSTM/interface/trans.c index 9aea07ed..7934b6a1 100644 --- a/Robust/src/Runtime/DSTM/interface/trans.c +++ b/Robust/src/Runtime/DSTM/interface/trans.c @@ -9,7 +9,7 @@ #define LISTEN_PORT 2156 #define MACHINE_IP "127.0.0.1" -#define RECIEVE_BUFFER_SIZE 2048 +#define RECEIVE_BUFFER_SIZE 2048 extern int classsize[]; @@ -33,7 +33,7 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) return(objheader); } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) { //Look up in Machine lookup table and found - printf(" oid not found in cache\n"); + printf("oid not found in local cache\n"); tmp = mhashSearch(oid); size = sizeof(objheader_t)+classsize[tmp->type]; //Copy into cache @@ -43,15 +43,13 @@ objheader_t *transRead(transrecord_t *record, unsigned int oid) chashInsert(record->lookupTable, objheader->oid, objcopy); return(objcopy); } else { - printf(" oid not found in Machine Lookup\n"); + printf("oid not found in local machine lookup\n"); machinenumber = lhashSearch(oid); - //Get object from a given machine - /* if (getRemoteObj(record, machinenumber, oid) != 0) { - printf("Error getRemoteObj"); - } - */ objcopy = getRemoteObj(record, machinenumber, oid); - return(objcopy); + if(objcopy == NULL) + printf("Object not found in Machine %d\n", machinenumber); + else + return(objcopy); } } @@ -81,7 +79,7 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { int sd, size; struct sockaddr_in serv_addr; struct hostent *server; - char buffer[RECIEVE_BUFFER_SIZE]; + char buffer[RECEIVE_BUFFER_SIZE],control; objheader_t *h; void *objcopy; @@ -99,24 +97,31 @@ void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) { return NULL; } bzero((char *)buffer,sizeof(buffer)); - sprintf(buffer, "TRANS_RD %d\n", oid); + control = READ_REQUEST; + buffer[0] = control; + memcpy(buffer+1, &oid, sizeof(int)); if (write(sd, buffer, sizeof(buffer)) < 0) { perror("Error sending message"); return NULL; } + +#ifdef DEBUG1 printf("DEBUG -> ready to rcv ...\n"); - /* - while (read(sd, buffer, sizeof(buffer)) != 0) { - ; - } - */ +#endif read(sd, buffer, sizeof(buffer)); - h = (objheader_t *) buffer; - size = sizeof(objheader_t) + sizeof(classsize[h->type]); + close(sd); + if (buffer[0] == OBJECT_NOT_FOUND) { + return NULL; + } else { + + h = (objheader_t *) buffer+1; + size = sizeof(objheader_t) + sizeof(classsize[h->type]); +#ifdef DEBUG1 printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type); - fflush(stdout); +#endif + } objcopy = objstrAlloc(record->cache, size); - memcpy(objcopy, (void *)buffer, size); + memcpy(objcopy, (void *)buffer+1, size); //Insert into cache's lookup table chashInsert(record->lookupTable, oid, objcopy); return objcopy;