client:
- gcc -lpthread -g -O0 -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
server:
- gcc -lpthread -g -O0 -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+ gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
+all:
+ gcc -lpthread -g -o client trans.c testclient.c mlookup.c clookup.c llookup.c dstm.c objstr.c dstmserver.c plookup.c ip.c
+ gcc -lpthread -g -o server dstmserver.c testserver.c plookup.c mlookup.c clookup.c llookup.c dstm.c objstr.c trans.c ip.c
clean:
rm client server
void *dstmAccept(void *acceptfd)
{
- int numbytes,i, val;
+ int numbytes,i, val, retval;
unsigned int oid;
char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
char *ptr;
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
printf("Recieved connection: fd = %d\n", (int)acceptfd);
- recv((int)acceptfd, &control, sizeof(char), 0);
+ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
+ perror("Error in receiving control from coordinator\n");
+ return;
+ }
switch(control) {
case READ_REQUEST:
printf("DEBUG -> Recv READ_REQUEST from Coordinator\n");
- recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+ if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
+ perror("Error receiving object from cooridnator\n");
+ return;
+ }
srcObj = mhashSearch(oid);
h = (objheader_t *) srcObj;
size = sizeof(objheader_t) + sizeof(classsize[h->type]);
break;
default:
- printf("Error receiving\n");
+ printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
}
if (close((int)acceptfd) == -1)
{
void *modptr, *header;
objheader_t *tmp_header;
fixed_data_t fixed;
- int sum = 0, i, N, n, val;
+ int sum = 0, i, N, n, val, retval;
//Reads to process the TRANS_REQUEST protocol further
// Read fixed_data
int numread = fixed.numread;
N = numread * (sizeof(unsigned int) + sizeof(short));
char objread[N];
- sum = 0;
- do {
- n = recv((int)acceptfd, (void *) objread, N, 0);
- // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
- sum += n;
- } while(sum < N && n != 0);
- //printf("DEBUG -> %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
-
- // Read modified objects
- if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
- printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
- return 1;
+ if(numread != 0) { // If pile contains objects to be read
+ // N = numread * (sizeof(unsigned int) + sizeof(short));
+ // char objread[N];
+ sum = 0;
+ do {
+ n = recv((int)acceptfd, (void *) objread, N, 0);
+ // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
+ sum += n;
+ } while(sum < N && n != 0);
+// printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
}
- sum = 0;
- do {
- n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
- //printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
- sum += n;
- } while (sum < fixed.sum_bytes && n != 0);
+ // Read modified objects
+ if(fixed.nummod != 0) { // If pile contains modified objects
+ if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
+ printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
+ return 1;
+ }
+ sum = 0;
+ do { // Recv the objs that are modified at Coordinator
+ n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
+ // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
+ sum += n;
+ } while (sum < fixed.sum_bytes && n != 0);
+ }
+
//Send control message as per all votes from all oids in the machine
if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
printf("Handle req error\n");
}
-
+
//Read for new control message from Coordiator
- recv((int)acceptfd, &control, sizeof(char), 0);
+ if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
+ perror("Error in receiving control message");
+ return 1;
+ }
+
switch(control) {
case TRANS_ABORT:
printf("DEBUG -> Recv TRANS_ABORT from Coordinator\n");
sendctrl = TRANS_SUCESSFUL;
if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
perror("Error sending ACK to coordinator\n");
+ return 1;
}
//Mark all ref counts as 1 and do garbage collection
ptr = modptr;
perror("Error sending ACK to coordinator\n");
}
}
-
+
break;
}
-
+
break;
case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
//TODO expect another transrequest from client
//This function runs a decision after all objects are weighed under one of the 4 possibilities
//and returns the appropriate control message to the Ccordinator
char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
+ int val;
short version;
char control = 0, ctrlmissoid, *ptr;
int i, j = 0;
//Process each object present in the pile
ptr = modptr;
- printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
+ //printf("DEBUG -> Total objs involved in trans is %d\n",fixed->nummod + fixed->numread);
fflush(stdout);
//Process each oid in the machine pile/ group
for (i = 0; i < fixed->numread + fixed->nummod; i++) {
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
- write(acceptfd, &control, sizeof(char));
+ if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ perror("Error in sending control to the Coordinator\n");
+ return 0;
+ }
printf("DEBUG -> Sending TRANS_DISAGREE\n");
return control;
}
((objheader_t *)mobj)->status |= LOCK;
//Save all object oids that are locked on this machine during this transaction request call
oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
- printf("DEBUG-> Object to be locked is %d\n", ((objheader_t *)mobj)->oid);
objlocked++;
if (version == ((objheader_t *)mobj)->version) { //If versions match
v_matchnolock++;
v_nomatch++;
//send TRANS_DISAGREE to Coordinator
control = TRANS_DISAGREE;
- write(acceptfd, &control, sizeof(char));
+ if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ perror("Error in sending control to the Coordinator\n");
+ return 0;
+ }
printf("DEBUG -> Sending TRANS_DISAGREE\n");
return control;
}
}
}
- printf("No of objs locked = %d\n", objlocked);
- printf("No of v_nomatch = %d\n", v_nomatch);
- printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
- printf("No of objs v_match but had locks before = %d\n", v_matchlock);
- printf("No of objs not found = %d\n", objnotfound);
- printf("No of objs modified but not found = %d\n", objmodnotfound);
+ //printf("No of objs locked = %d\n", objlocked);
+ //printf("No of v_nomatch = %d\n", v_nomatch);
+ //printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
+ //printf("No of objs v_match but had locks before = %d\n", v_matchlock);
+ //printf("No of objs not found = %d\n", objnotfound);
+ //printf("No of objs modified but not found = %d\n", objmodnotfound);
//Decide what control message(s) to send
if(v_matchnolock == fixed->numread + fixed->nummod) {
//send TRANS_AGREE to Coordinator
control = TRANS_AGREE;
- write(acceptfd, &control, sizeof(char));
+ if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
+ perror("Error in sending control to Coordinator\n");
+ return 0;
+ }
printf("DEBUG -> Sending TRANS_AGREE\n");
}
if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
//send TRANS_SOFT_ABORT to Coordinator
control = TRANS_SOFT_ABORT;
- write(acceptfd, &control, sizeof(char));
+ if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
+ perror("Error in sending control back to coordinator\n");
+ return 0;
+ }
printf("DEBUG -> Sending TRANS_SOFT_ABORT\n");
//send number of oids not found and the missing oids
- write(acceptfd, &objnotfound, sizeof(int));
- if(objnotfound != 0)
- write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound));
+ if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
+ perror("Error in sending no of objects that are not found\n");
+ return 0;
+ }
+ if(objnotfound != 0) {
+ if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
+ perror("Error in sending objects that are not found\n");
+ return 0;
+ }
+ }
}
//Do the following when TRANS_DISAGREE is sent
offset += sizeof(unsigned int);
memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
tmp->numread = tmp->numread + 1;
+ // printf("DEBUG->pInsert() No of obj read = %d\n", tmp->numread);
}
found = 1;
break;
} else {
ptr->oidread[ptr->numread] = headeraddr->oid;
memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
- //printf("DEBUG -> objread oid is %d\n", *(ptr->objread));
memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
ptr->numread = ptr->numread + 1;
}
ptr->next = pile;
pile = ptr;
}
+
return pile;
}
#include<stdio.h>
+#include<pthread.h>
#include "dstm.h"
#include "llookup.h"
#include "ip.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
+//#include <sys/socket.h>
+//#include <netinet/in.h>
+//#include <arpa/inet.h>
extern objstr_t *mainobjstore;
//extern lhashtable_t llookup; //Global Hash table
unsigned int createObjects(transrecord_t *record) {
objheader_t *header, *tmp;
- struct sockaddr_in antelope;
unsigned int size, mid;
int i = 0;
for(i = 20 ; i< 23; i++) {
header = (objheader_t *) objstrAlloc(mainobjstore, size);
memcpy(header, tmp, size);
mhashInsert(header->oid, header);
- //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
- //mid = iptoMid(inet_ntoa(antelope.sin_addr));
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.27");//machine d-2
+ printf("DEBUG -> createObjects mid is %x\n", mid);
lhashInsert(header->oid, mid);
- // lhashInsert(header->oid, 1);
}
// printf("Insert oid = %d at address %x\n",tmp->oid, tmp);
size = sizeof(objheader_t) + classsize[0] ;
header->status = 0;
header->status |= NEW;
mhashInsert(header->oid, header);
- //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
- //mid = iptoMid(inet_ntoa(antelope.sin_addr));
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.27");
lhashInsert(header->oid, mid);
size = sizeof(objheader_t) + classsize[1] ;
header = (objheader_t *) objstrAlloc(mainobjstore, size);
header->status = 0;
header->status |= LOCK;
mhashInsert(header->oid, header);
- //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
- //mid = iptoMid(inet_ntoa(antelope.sin_addr));
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.27");
lhashInsert(header->oid, mid);
size = sizeof(objheader_t) + classsize[2] ;
header = (objheader_t *) objstrAlloc(mainobjstore, size);
header->status = 0;
header->status |= LOCK;
mhashInsert(header->oid, header);
- //inet_aton("127.0.0.1", &antelope.sin_addr); // store IP in antelope
- //mid = iptoMid(inet_ntoa(antelope.sin_addr));
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.27");
lhashInsert(header->oid, mid);
return 0;
}
//trans commit
int test5(void) {
transrecord_t *record;
- unsigned int mid;
+ objheader_t *header;
+ unsigned int size, mid;
+ pthread_t thread_Listen;
+ pthread_attr_t attr;
objheader_t *h1,*h2, *h3, *h4;
dstmInit();
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+ //pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+
record = transStart();
printf("DEBUG -> Init done\n");
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.10");// Machine demsky.eecs.uci.edu
lhashInsert(1,mid);
lhashInsert(2,mid);
lhashInsert(3,mid);
lhashInsert(4,mid);
lhashInsert(5,mid);
lhashInsert(6,mid);
- createObjects(record);
+ pthread_create(&thread_Listen, &attr, dstmListen, NULL);
+
+ //Create and Insert Oid 20
+ size = sizeof(objheader_t) + classsize[2] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 20;
+ header->type = 2;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
+
+ //Create and Insert Oid 21
+ size = sizeof(objheader_t) + classsize[1] ;
+ header = (objheader_t *) objstrAlloc(mainobjstore, size);
+ header->oid = 21;
+ header->type = 1;
+ header->version = 1;
+ header->rcount = 0; //? not sure how to handle this yet
+ header->status = 0;
+ header->status |= NEW;
+ mhashInsert(header->oid, header);
+ mid = iptoMid("128.200.9.27");
+ lhashInsert(header->oid, mid);
//read object 1
if((h1 = transRead(record, 1)) == NULL){
printf("Object not found\n");
}
transCommit(record);
+ pthread_join(thread_Listen, NULL);
}
tmp = (objheader_t *) objstrAlloc(mainobjstore, size);
memcpy(tmp, header, size);
mhashInsert(tmp->oid, tmp);
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.10");
lhashInsert(tmp->oid, mid);
//Lock oid 3 object
// if(tmp->oid == 3)
pthread_t thread_Listen;
dstmInit();
- mid = iptoMid("127.0.0.1");
+ mid = iptoMid("128.200.9.27");
+ //Inserting into lhashtable
lhashInsert(20, mid);
lhashInsert(21, mid);
lhashInsert(22, mid);
printf("Error transCreateObj6");
}
pthread_join(thread_Listen, NULL);
-
-
}
return(objheader);
} else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
//Look up in Machine lookup table and found
- printf("oid not found in local cache\n");
+ printf("oid is found in Local mlookup\n");
tmp = mhashSearch(oid);
size = sizeof(objheader_t)+classsize[tmp->type];
//Copy into cache
return(objcopy);
} else {
//Get the object from the remote location
- //printf("oid not found in local machine lookup\n");
- printf("machinenumber = %d\n",machinenumber);
- printf("oid = %d\n",oid);
+ printf("oid is found in remote machine\n");
machinenumber = lhashSearch(oid);
- printf("machinenumber = %d\n",machinenumber);
objcopy = getRemoteObj(record, machinenumber, oid);
if(objcopy == NULL) {
//If object is not found in Remote location
}
//int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
int decideResponse(thread_data_array_t *tdata, int sd, int val) {
- int i, n, N, sum, oidcount = 0;
+ int i, n, N, sum, retval, oidcount = 0;
int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
char ctrl, control, *ptr;
unsigned int *oidnotfound;
objheader_t *header;
+
//Check common data structure
for (i = 0 ; i < tdata->pilecount ; i++) {
free(tdata->rec);
//send Abort
ctrl = TRANS_ABORT;
- if (write(sd, &ctrl, sizeof(char)) < 0) {
- perror("Error sending ctrl message for participant\n");
- return 1;
+ for (i = 0 ; i < tdata->pilecount ; i++) { //send writes to all machine groups involved
+ if (write(sd, &ctrl, sizeof(char)) < 0) {
+ perror("Error sending ctrl message for participant\n");
+ return 1;
+ }
}
return 0;
//Send Commit
ctrl = TRANS_COMMIT;
printf("Sending TRANS_COMMIT\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
+ //printf("Sending control %d ,sd = %d\n", ctrl, i, sd);
}
if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
//Send abort but retry commit
ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) <= 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
- //Sleep
+ //Sleep and the resend the request
sleep(5);
//Read new control message from Participant
- n = read(sd, &control, sizeof(char));
+
+ if((n = read(sd, &control, sizeof(char))) <= 0) {
+ perror("No bytes are read for participant\n");
+ return 1;
+ }
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = control;
}
if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
- //Send abort but retry commit after relloking up objects
- //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
+ //Send abort but retry commit after relooking up objects
ctrl = TRANS_ABORT;
printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
- if (write(sd, &ctrl, sizeof(char)) < 0) {
+ if((retval = write(sd, &ctrl, sizeof(char))) < 0) {
perror("Error sending ctrl message for participant\n");
return 1;
}
//TODO
//Relook up objects
//update location table
+
//Free pointers
free(oidnotfound);
}
char machineip[16];
tdata = (thread_data_array_t *) threadarg;
- printf("DEBUG -> New thread id %d\n", tdata->thread_id);
//Send Trans Request
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("Error in socket for TRANS_REQUEST");
midtoIP(tdata->mid,machineip);
machineip[15] = '\0';
serv_addr.sin_addr.s_addr = inet_addr(machineip);
- //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
perror("Error in connect for TRANS_REQUEST");
}
//Send oids and version number tuples for objects that are read
// printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
-// printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
+// printf(" DEBUG->Read oids are %d %d %d %d\n", *(tdata->buffer->objread), *(tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
perror("Error sending tuples for thread");
return NULL;
//Send objects that are modified
for(i = 0; i < tdata->buffer->f.nummod ; i++) {
headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
-// printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
perror("Error sending obj modified for thread");
return NULL;
}
//Read message control message from participant side
- n = read(sd, &control, sizeof(char));
+ if((n = read(sd, &control, sizeof(char))) <= 0) {
+ perror("Error in reading control message from Participant\n");
+ return NULL;
+ }
recvcontrol = control;
- printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
//Update common data structure and increment count
tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
- //process the participant's request
- if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
- printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
- pthread_mutex_unlock(tdata->lock);
- return NULL;
- }
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
+
+ //process the participant's request
+ if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
+ printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(tdata->lock);
+ return NULL;
+ }
pthread_mutex_unlock(tdata->lock);
close(sd);
char buffer[RECEIVE_BUFFER_SIZE],control;
char transid[TID_LEN];
static int newtid = 0;
+ trans_req_data_t *tosend;
ptr = record->lookupTable->table;
size = record->lookupTable->size;
pthread_cond_t tcond;
pthread_mutex_t tlock;
pthread_mutex_t tlshrd;
- thread_data_array_t thread_data_array[pilecount];
+ //thread_data_array_t thread_data_array[pilecount];
+ thread_data_array_t *thread_data_array;
+
+ thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
pListMid(pile, listmid);
//Process each machine group
while(tmp != NULL) {
- printf("DEBUG -> Created thread %d... \n", numthreads);
//Create transaction id
newtid++;
- trans_req_data_t *tosend;
+ //trans_req_data_t *tosend;
if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
printf("Calloc error %s, %d\n", __FILE__, __LINE__);
return 1;
perror("Error in pthread create");
return 1;
}
+
numthreads++;
//TODO frees
- free(tosend);
+ //free(tosend);
tmp = tmp->next;
}
//Free resources
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
+// for(i = 0 ;i< pilecount ;i++) {
+ free(tosend);
+// }
free(listmid);
pDelete(pile);
return 0;
//mnun will be used to represent the machine IP address later
void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
- int sd, size;
+ int sd, size, val;
struct sockaddr_in serv_addr;
struct hostent *server;
char control;
void *objcopy;
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
- perror("Error in socket");
+ perror("Error in socket\n");
return NULL;
}
bzero((char*) &serv_addr, sizeof(serv_addr));
serv_addr.sin_addr.s_addr = inet_addr(machineip);
if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
- perror("Error in connect");
+ perror("Error in connect\n");
return NULL;
}
char readrequest[sizeof(char)+sizeof(unsigned int)];
readrequest[0] = READ_REQUEST;
*((unsigned int *)(&readrequest[1])) = oid;
if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
- perror("Error sending message");
+ perror("Error sending message\n");
return NULL;
}
printf("DEBUG -> ready to rcv ...\n");
#endif
//Read response from the Participant
- read(sd, &control, sizeof(char));
+ if((val = read(sd, &control, sizeof(char))) <= 0) {
+ perror("No control response for getRemoteObj sent\n");
+ return NULL;
+ }
switch(control) {
case OBJECT_NOT_FOUND:
return NULL;
break;
case OBJECT_FOUND:
- read(sd, &size, sizeof(int));
+ if((val = read(sd, &size, sizeof(int))) <= 0) {
+ perror("No size is read from the participant\n");
+ return NULL;
+ }
objcopy = objstrAlloc(record->cache, size);
- read(sd, objcopy, size);
+ if((val = read(sd, objcopy, size)) <= 0) {
+ perror("No objects are read from the remote participant\n");
+ return NULL;
+ }
//Insert into cache's lookup table
chashInsert(record->lookupTable, oid, objcopy);
break;
printf("Error in recv request from participant on a READ_REQUEST\n");
return NULL;
}
+ close(sd);
return objcopy;
}