int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
printf("Recieved connection: fd = %d\n", (int)acceptfd);
- numbytes = recv((int)acceptfd, (void *)buffer, sizeof(buffer), 0);
- if (numbytes == -1)
- {
- perror("recv");
- pthread_exit(NULL);
- }
- else
+ while((numbytes = recv((int)acceptfd, (void *) buffer, sizeof(buffer), 0)) != 0)
{
+ printf("DEBUG -> dstmserver: numbytes = %d\n", numbytes);
control = buffer[0];
switch(control) {
case READ_REQUEST:
+ printf("DEBUG -> READ_REQUEST\n");
oid = *((int *)(buffer+1));
#ifdef DEBUG1
printf("DEBUG -> Received oid is %d\n", oid);
}
break;
case READ_MULT_REQUEST:
+ printf("DEBUG-> READ_MULT_REQUEST\n");
break;
case MOVE_REQUEST:
+ printf("DEBUG -> MOVE_REQUEST\n");
break;
case MOVE_MULT_REQUEST:
+ printf("DEBUG -> MOVE_MULT_REQUEST\n");
break;
case TRANS_REQUEST:
+ printf("DEBUG -> TRANS_REQUEST\n");
printf("Client sent %d\n",buffer[0]);
- int offset = 1;
- printf("Num Read %d\n",*((short*)(buffer+offset)));
- offset += sizeof(short);
- printf("Num modified %d\n",*((short*)(buffer+offset)));
- handleTransReq(acceptfd, buffer);
+ // handleTransReq(acceptfd, buffer);
break;
case TRANS_ABORT:
+ printf("DEBUG -> TRANS_ABORT\n");
break;
case TRANS_COMMIT:
+ printf("DEBUG -> TRANS_COMMIT\n");
+ printf("Client sent %d\n",buffer[0]);
+ //TODO copy the objects into the machine
+ /*copy the object into the object store from its old
+ location in the objstore(pointer to its header is already stored before)*/
break;
default:
printf("Error receiving");
}
//printf("Read %d bytes from %d\n", numbytes, (int)acceptfd);
//printf("%s", buffer);
- }
+ }
if (close((int)acceptfd) == -1)
{
perror("close");
}
//TOOD put __FILE__ __LINE__ for all error conditions
+#if 0
int handleTransReq(int acceptfd, char *buf) {
short numread = 0, nummod = 0;
char control;
objheader_t *headptr = NULL;
objstr_t *tmpholder;
void *top, *mobj;
+
char sendbuf[RECEIVE_BUFFER_SIZE];
control = buf[0];
- offset = 1;
- numread = *((short *)(buf+offset));
+ offset = sizeof(fixed_data_t);
+ list = *((short *)(buf+offset));
offset += sizeof(short);
nummod = *((short *)(buf+offset));
offset += sizeof(short);
offset += size;
}
}
+ /*
if(transabort > 0) {
sendbuf[0] = TRANS_DISAGREE_ABORT;
if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
perror("");
}
- } else {
+ } else if(transagree == numread+nummod) {
sendbuf[0] = TRANS_AGREE;
if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
perror("");
}
+ } else {
+ sendbuf[0] = TRANS_DISAGREE;
+ if(send((int)acceptfd, (void *)sendbuf, sizeof(sendbuf), 0) < 0) {
+ perror("");
+ }
}
+ */
return 0;
}
+#endif
while (tmp != NULL) {
list[i] = tmp->mid;
i++;
+ tmp = tmp->next;
}
return 0;
}
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
-int main() {
+int test1(void);
+int test2(void);
+
+int main()
+{
+ test2();
+}
+
+int test1(void) {
transrecord_t *record;
- objheader_t *h1,*h2,*h3,*h4;
+ objheader_t *h1,*h2,*h3,*h4,*h5, *h6;
dstmInit();
record = transStart();
printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
h3 = transRead(record, 3);
printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
- h1 = transRead(record, 5);
+ h5 = transRead(record, 5);
+ printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
// getRemoteObj(&record, 0,1);
}
+
+int test2(void) {
+
+ transrecord_t *record;
+ objheader_t *h1,*h2,*h3,*h4,*h5, *h6;
+
+ dstmInit();
+ record = transStart();
+ printf("DEBUG -> Init done\n");
+ h1 = transRead(record, 1);
+// printf("oid = %d\tsize = %d\n", h1->oid,classsize[h1->type]);
+ h2 = transRead(record, 2);
+// printf("oid = %d\tsize = %d\n", h2->oid,classsize[h2->type]);
+ h3 = transRead(record, 3);
+// printf("oid = %d\tsize = %d\n", h3->oid,classsize[h3->type]);
+ h4 = transRead(record, 4);
+// printf("oid = %d\tsize = %d\n", h4->oid,classsize[h4->type]);
+ h4->status |= DIRTY;
+ h5 = transRead(record, 5);
+// printf("oid = %d\tsize = %d\n", h5->oid,classsize[h5->type]);
+ h6 = transRead(record, 6);
+// printf("oid = %d\tsize = %d\n", h6->oid,classsize[h6->type]);
+ h6->status |= DIRTY;
+ transCommit(record);
+}
+
+
extern objstr_t *mainobjstore;
int classsize[]={sizeof(int),sizeof(char),sizeof(short), sizeof(void *)};
+int test1(void);
+int test2(void);
+
unsigned int createObjects(transrecord_t *record, unsigned short type) {
objheader_t *header, *tmp;
unsigned int size;
}
int main()
+{
+ test2();
+}
+
+int test1()
{
unsigned int val;
transrecord_t *myTrans;
if((val = createObjects(myTrans, 3)) != 0) {
printf("Error transCreateObj4");
}
+ //Create Object5
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj5");
+ }
+ //Create Object6
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj6");
+ }
pthread_join(thread_Listen, NULL);
return 0;
}
+
+int test2() {
+
+ unsigned int val;
+ transrecord_t *myTrans;
+ pthread_t thread_Listen;
+
+ dstmInit();
+ pthread_create(&thread_Listen, NULL, dstmListen, NULL);
+ // Start Transaction
+ myTrans = transStart();
+
+ printf("Creating Transaction\n");
+ //Create Object1
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj1");
+ }
+ //Create Object2
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj2");
+ }
+ //Create Object3
+ if((val = createObjects(myTrans, 2)) != 0) {
+ printf("Error transCreateObj3");
+ }
+ //Create Object4
+ if((val = createObjects(myTrans, 3)) != 0) {
+ printf("Error transCreateObj4");
+ }
+ //Create Object5
+ if((val = createObjects(myTrans, 0)) != 0) {
+ printf("Error transCreateObj5");
+ }
+ //Create Object6
+ if((val = createObjects(myTrans, 1)) != 0) {
+ printf("Error transCreateObj6");
+ }
+ pthread_join(thread_Listen, NULL);
+
+
+}
return(objcopy);
} else {
//Get the object from the remote location
- printf("oid not found in local machine lookup\n");
+ //printf("oid not found in local machine lookup\n");
machinenumber = lhashSearch(oid);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL)
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
tdata = (thread_data_array_t *) threadarg;
+ printf("DEBUG -> New thread id %d\n", tdata->thread_id);
//Send Trans Request
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST");
//Multiple writes for sending packets of data
//Send first few fixed bytes of the TRANS_REQUEST protocol
+ printf("DEBUG -> Start sending commit data...\n", tdata->buffer->f.control);
+ printf("Bytes sent in first write: %d\n", sizeof(fixed_data_t));
if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
perror("Error sending fixed bytes for thread");
return NULL;
}
//Send list of machines involved in the transaction
+ printf("Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
perror("Error sending list of machines for thread");
return NULL;
}
//Send oids and version number tuples for objects that are read
+ printf("Bytes sent in the third write: %d\n", sizeof(unsigned int) + sizeof(short) * tdata->pilecount);
if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
perror("Error sending tuples for thread");
return NULL;
//Send objects that are modified
for( i = 0; i < tdata->buffer->f.nummod ; i++) {
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ printf("Bytes sent for %d obj modified %d\n", i+1, sizeof(objheader_t) + classsize[headeraddr->type]);
if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
perror("Error sending obj modified for thread");
return NULL;
pthread_exit(NULL);
}
-int transCommit(transrecord_t *record){
+int transCommit(transrecord_t *record) {
chashlistnode_t *curr, *ptr, *next;
unsigned int size;//Represents number of bins in the chash table
unsigned int machinenum, tot_bytes_mod;
}
next = curr->next;
//Get machine location for object id
+ /*
if ((machinenum = lhashSearch(curr->key)) == 0) {
printf("Error: No such machine\n");
return 1;
}
+ */
+ //TODO only for debug
+ machinenum = 1;
if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
printf("Error: No such oid\n");
return 1;
pListMid(pile, listmid);
//Process each machine group
while(tmp != NULL) {
+ printf("DEBUG -> Created thread %d... \n", numthreads);
//Create transaction id
newtid++;
trans_req_data_t *tosend;
thread_data_array[numthreads].count = &trecvcount;
thread_data_array[numthreads].rec = record;
- rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]);
+ rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
if (rc) {
perror("Error in pthread create");
return 1;