// Get a new object id
unsigned int getNewOID(void) {
static int id = 1;
- return ++id;
+ return id++;
}
// Get the size of the object for a given type
unsigned int objSize(objheader_t *object) {
- return classsize[h.type];
+ return classsize[object->type];
}
/* END object header */
int transCommit(transrecord_t *record); //return 0 if successful
/* end transactions */
+void *getRemoteObj(transrecord_t *, unsigned int, unsigned int);
+
#endif
#include "mlookup.h"
#include "llookup.h"
-#define LISTEN_PORT 2153
+#define LISTEN_PORT 2156
#define BACKLOG 10 //max pending connections
-#define RECIEVE_BUFFER_SIZE 1500
+#define RECIEVE_BUFFER_SIZE 2048
+extern int classsize[];
objstr_t *mainobjstore;
-mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
int dstmInit(void)
{
//todo:initialize main object store
//do we want this to be a global variable, or provide
//separate access funtions and hide the structure?
-
+ mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
if (mhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
if (lhashCreate(HASH_SIZE, LOADFACTOR))
return 1; //failure
- pthread_t threadListen;
- pthread_create(&threadListen, NULL, dstmListen, NULL);
+ //pthread_t threadListen;
+ //pthread_create(&threadListen, NULL, dstmListen, NULL);
return 0;
}
pthread_t thread_dstm_accept;
int i;
- listenfd = socket(PF_INET, SOCK_STREAM, 0);
+ listenfd = socket(AF_INET, SOCK_STREAM, 0);
if (listenfd == -1)
{
perror("socket");
void *dstmAccept(void *acceptfd)
{
- int numbytes;
+ int numbytes,i,choice, oid;
char buffer[RECIEVE_BUFFER_SIZE];
- int fd_flags = fcntl((int)acceptfd, F_GETFD);
+ char opcode[10];
+ void *srcObj;
+ objheader_t *h;
+ int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
+
printf("Recieved connection: fd = %d\n", (int)acceptfd);
do
{
}
else
{
- printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
- printf("%s", buffer);
+ sscanf(buffer, "%s %d\n", opcode, &oid);
+
+ if (strcmp(opcode, "TRANS_RD") == 0) {
+ printf("DEBUG -> Requesting: %s %d\n", opcode, oid);
+ srcObj = mhashSearch(oid);
+ h = (objheader_t *) srcObj;
+ printf("DEBUG -> Sending oid = %d, type %d\n", h->oid, h->type);
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+ if(send((int)acceptfd, srcObj, size, 0) < 0) {
+ perror("");
+ }
+ //printf("DEBUG -> sent ...%d\n", write(acceptfd, srcObj, size));
+ }
+ else if (strcmp(opcode, "TRANS_COMMIT") == 0)
+ printf(" This is a Commit\n");
+ else if (strcmp(opcode,"TRANS_ABORT") == 0)
+ printf(" This is a Abort\n");
+ else
+ printf(" This is a Broadcastt\n");
+
+ //printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
+ //printf("%s", buffer);
}
- } while (numbytes != 0);
+ //} while (numbytes != 0);
+ } while (0);
if (close((int)acceptfd) == -1)
{
perror("close");
- pthread_exit(NULL);
}
else
printf("Closed connection: fd = %d\n", (int)acceptfd);
pthread_exit(NULL);
}
-
#include <pthread.h>
-#include "dstmserver.h"
+#include "dstm.h"
+
+extern objstr_t *mainobjstore;
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+
+unsigned int createObjects(transrecord_t *record, unsigned short type) {
+ objheader_t *header, *tmp;
+ unsigned int size;
+ size = sizeof(objheader_t) + classsize[type] ;
+ header = transCreateObj(record, type);
+ tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
+ memcpy(tmp, header, size);
+ mhashInsert(tmp->oid, tmp);
+ lhashInsert(tmp->oid, 1);
+ return 0;
+}
int main()
{
- pthread_t thread_listen;
- pthread_create(&thread_listen, NULL, dstmListen, NULL);
- pthread_join(thread_listen, NULL);
+ unsigned int val;
+ transrecord_t *myTrans;
+ pthread_t thread_Listen;
+
+ dstmInit();
+ pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+ // Start Transaction
+ myTrans = transStart();
+
+ printf("Creating Transaction\n");
+ //Create Object1
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj1");
+ }
+ //Create Object2
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj2");
+ }
+ //Create Object3
+ if((val = createObjects(myTrans, 2)) != 0) {
+ printf("Error transCreateObj3");
+ }
+ //Create Object4
+ if((val = createObjects(myTrans, 3)) != 0) {
+ printf("Error transCreateObj4");
+ }
+ pthread_join(thread_Listen, NULL);
return 0;
}
-
--- /dev/null
+#include<stdio.h>
+#include "dstm.h"
+
+int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+
+int main() {
+
+ transrecord_t *record;
+ objheader_t *h1,*h2,*h3,*h4;
+
+ dstmInit();
+ record = transStart();
+ printf("DEBUG -> Init done");
+ h1 = transRead(record, 3);
+ printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+ h3 = transRead(record, 1);
+ printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+ /*
+ h4 = transRead(&record, 4);
+ printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+ h3 = transRead(&record, 2);
+ printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+ */
+// getRemoteObj(&record, 0,1);
+}
#include "clookup.h"
#include "mlookup.h"
#include "llookup.h"
+#include<sys/types.h>
+#include<sys/socket.h>
+#include<netdb.h>
+#include<netinet/in.h>
+
+#define LISTEN_PORT 2156
+#define MACHINE_IP "127.0.0.1"
+#define RECIEVE_BUFFER_SIZE 2048
extern int classsize[];
objheader_t *transRead(transrecord_t *record, unsigned int oid)
{
unsigned int machinenumber;
-
objheader_t *tmp, *objheader;
void *objcopy;
int size;
+ void *buf;
//check cache
if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
return(objheader);
} else {
printf(" oid not found in Machine Lookup\n");
machinenumber = lhashSearch(oid);
- //TODO:broadcast
- return(NULL);
+ //Get object from a given machine
+ /* if (getRemoteObj(record, machinenumber, oid) != 0) {
+ printf("Error getRemoteObj");
+ }
+ */
+ objcopy = getRemoteObj(record, machinenumber, oid);
+ return(objcopy);
}
}
-
objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
{
- objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, classsize[type]);
+ objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
tmp->oid = getNewOID();
tmp->type = type;
tmp->version = 1;
int transAbort(transrecord_t *record){
}
+
+//mnun will be used to represent the machine IP address later
+void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
+ int sd, size;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ char buffer[RECIEVE_BUFFER_SIZE];
+ objheader_t *h;
+ void *objcopy;
+
+ if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+ perror("Error in socket");
+ return NULL;
+ }
+ bzero((char*) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_port = htons(LISTEN_PORT);
+ serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
+
+ if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
+ perror("Error in connect");
+ return NULL;
+ }
+ bzero((char *)buffer,sizeof(buffer));
+ sprintf(buffer, "TRANS_RD %d\n", oid);
+ if (write(sd, buffer, sizeof(buffer)) < 0) {
+ perror("Error sending message");
+ return NULL;
+ }
+ printf("DEBUG -> ready to rcv ...\n");
+ /*
+ while (read(sd, buffer, sizeof(buffer)) != 0) {
+ ;
+ }
+ */
+ read(sd, buffer, sizeof(buffer));
+ h = (objheader_t *) buffer;
+ size = sizeof(objheader_t) + sizeof(classsize[h->type]);
+ printf("DEBUG -> Received: oid = %d, type = %d\n", h->oid, h->type);
+ fflush(stdout);
+ objcopy = objstrAlloc(record->cache, size);
+ memcpy(objcopy, (void *)buffer, size);
+ //Insert into cache's lookup table
+ chashInsert(record->lookupTable, oid, objcopy);
+ return objcopy;
+}