- #include "plookup.h"
+#include "plookup.h"
+extern int classsize[];
plistnode_t *pCreate(int objects) {
plistnode_t *pile;
return NULL;
}
pile->next = NULL;
- //Create array of objects
- if((pile->obj = calloc(objects, sizeof(unsigned int))) == NULL) {
+ if ((pile->oidmod = calloc(objects, sizeof(unsigned int))) == NULL) {
printf("Calloc error %s %d\n", __FILE__, __LINE__);
return NULL;
}
- pile->index = 0;
- //pile->vote = 0;
+ if ((pile->oidread = calloc(objects, sizeof(unsigned int))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pile->nummod = pile->numread = pile->sum_bytes = 0;
+ if ((pile->objread = calloc(objects, sizeof(int) + sizeof(short))) == NULL) {
+ printf("Calloc error %s %d\n", __FILE__, __LINE__);
+ return NULL;
+ }
+ pile->objmodified = NULL;
+ pile->nummod = pile->numread = pile->sum_bytes = 0;
+
return pile;
}
-plistnode_t *pInsert(plistnode_t *pile, unsigned int mid, unsigned int oid, int num_objs) {
+plistnode_t *pInsert(plistnode_t *pile, objheader_t *headeraddr, unsigned int mid, int num_objs) {
plistnode_t *ptr, *tmp;
- int found = 0;
+ int found = 0, offset;
tmp = pile;
//Add oid into a machine that is a part of the pile linked list structure
while(tmp != NULL) {
if (tmp->mid == mid) {
- tmp->obj[tmp->index] = oid;
- tmp->index++;
+ if ((headeraddr->status >> 1) == 1) {
+ tmp->oidmod[tmp->nummod] = headeraddr->oid;
+ tmp->nummod = tmp->nummod + 1;
+ tmp->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
+ } else {
+ tmp->oidread[tmp->numread] = headeraddr->oid;
+ offset = (sizeof(unsigned int) + sizeof(short)) * tmp->numread;
+ memcpy(tmp->objread, &headeraddr->oid, sizeof(unsigned int));
+ offset += sizeof(unsigned int);
+ memcpy(tmp->objread + offset, &headeraddr->version, sizeof(short));
+ tmp->numread = tmp->numread + 1;
+ }
found = 1;
break;
}
return NULL;
}
ptr->mid = mid;
- ptr->obj[ptr->index] = oid;
- ptr->index++;
+ if ((headeraddr->status >> 1) == 1) {
+ ptr->oidmod[ptr->nummod] = headeraddr->oid;
+ ptr->nummod = ptr->nummod + 1;
+ ptr->sum_bytes += sizeof(objheader_t) + classsize[headeraddr->type];
+ } else {
+ ptr->oidread[ptr->numread] = headeraddr->oid;
+ memcpy(ptr->objread, &headeraddr->oid, sizeof(unsigned int));
+ memcpy(ptr->objread + sizeof(unsigned int), &headeraddr->version, sizeof(short));
+ ptr->numread = ptr->numread + 1;
+ }
ptr->next = pile;
pile = ptr;
}
return 0;
}
-// Return objects for a given mid
-unsigned int *pSearch(plistnode_t *pile, unsigned int mid) {
- plistnode_t *tmp;
- tmp = pile;
- while(tmp != NULL) {
- if(tmp->mid == mid) {
- return(tmp->obj);
- }
- tmp = tmp->next;
- }
- return NULL;
-}
-
//Delete the entire pile
void pDelete(plistnode_t *pile) {
plistnode_t *next, *tmp;
tmp = pile;
while(tmp != NULL) {
next = tmp->next;
- free(tmp->obj);
+ free(tmp->oidmod);
+ free(tmp->oidread);
+ free(tmp->objread);
free(tmp);
tmp = next;
}
return tmp;
}
+int decideResponse(thread_data_array_t *tdata, char *buffer, int sd) {
+ int i, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+
+ //Check common data structure
+ for (i = 0 ; i < tdata->pilecount ; i++) {
+ //Check in any DISAGREE has come
+ if(tdata->recvmsg[i].rcv_status == TRANS_DISAGREE) {
+ //Send abort
+ transabort++;
+ buffer[0] = TRANS_ABORT;
+ if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+ perror("Error sending message for thread");
+ return 1;
+ }
+ } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE) {
+ transagree++;
+ } else if(tdata->recvmsg[i].rcv_status == TRANS_AGREE_BUT_MISSING_OBJECTS) {
+ transmiss++;
+ } else
+ transsoftabort++;
+ }
+ if(transagree == tdata->pilecount){
+ //Send Commit
+ buffer[0] = TRANS_COMMIT;
+ if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
+ perror("Error sending message for thread");
+ return 1;
+ }
+ }
+ if(transsoftabort > 0 && transabort == 0) {
+ //Send abort but retry commit
+ //i.e. wait at the participant end and then resend either agree or disagree
+ //
+
+ }
+ if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
+ //Relookup all missing objects
+ //send missing mising object/ objects
+ }
+}
+
void *transRequest(void *threadarg) {
- int sd, transagree = 0, transabort = 0, transcommit = 0, transmiss = 0, transsoftabort = 0;
+ int sd, i, n;
struct sockaddr_in serv_addr;
struct hostent *server;
thread_data_array_t *tdata;
+ objheader_t *headeraddr;
char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
tdata = (thread_data_array_t *) threadarg;
return NULL;
}
- if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
- perror("Error sending message for thread");
+ //Multiple writes for sending packets of data
+ //Send first few fixed bytes of the TRANS_REQUEST protocol
+ if (write(sd, tdata->buffer->f, (sizeof(fixed_data_t))) < 0) {
+ perror("Error sending fixed bytes for thread");
+ return NULL;
+ }
+ //Send list of machines involved in the transaction
+ if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
+ perror("Error sending list of machines for thread");
+ return NULL;
+ }
+ //Send oids and version number tuples for objects that are read
+ if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->pilecount )) < 0) {
+ perror("Error sending tuples for thread");
return NULL;
}
+ //Send objects that are modified
+ for( i = 0; i < tdata->buffer->f.nummod ; i++) {
+ headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
+ if (write(sd, &headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
+ perror("Error sending obj modified for thread");
+ return NULL;
+ }
+ }
+
//Read message from participant side
- read(sd, buffer, sizeof(buffer));
+ while(n != 0) {
+ n = read(sd, buffer, sizeof(buffer));
+ }
//process the participant's request
recvcontrol = buffer[0];
//Update common data structure and increment count
- tdata->recvmsg[tdata->thread_id] = recvcontrol;
+ tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
//Lock and update count
//Thread sleeps until all messages from pariticipants are received by coordinator
pthread_mutex_lock(tdata->lock);
if(*(tdata->count) == tdata->pilecount) {
pthread_cond_broadcast(tdata->threshold);
- //Check common data structure
- for (i = 0 ; i < tdata->pilecount ; i++) {
- //Check in any DISAGREE has come
- if(tdata->recvmsg[i] == TRANS_DISAGREE) {
- //Send abort
- transabort++;
- buffer[0] = TRANS_ABORT;
- if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
- perror("Error sending message for thread");
- return NULL;
- }
- } else if(tdata->recvmsg[i] == AGREE) {
- transagree++;
- } else if(tdata->recvmsg[i] == AGREE_BUT_MISSING_OBJECTS) {
- transmiss++;
- } else
- transsoftabort++;
- }
- if(transagree == tdata->pilecount){
- //Send Commit
- buffer[0] = TRANS_COMMIT;
- if (write(sd, tdata->buffer, (sizeof(char) * RECEIVE_BUFFER_SIZE)) < 0) {
- perror("Error sending message for thread");
- return NULL;
- }
- }
- if(transsoftabort > 0 && transabort == 0) {
- //Send abort but retry commit
- //i.e. wait at the participant end and then resend either agree or disagree
- //
-
- }
- if(transmiss > 0 && transsoftabort == 0 && transabort == 0) {
- //Relookup all missing objects
- //send missing mising object/ objects
+ if (decideResponse(tdata, buffer, sd) == 1) {
+ printf("decideResponse returned error\n");
+ return NULL;
}
-
} else {
pthread_cond_wait(tdata->threshold, tdata->lock);
}
pthread_mutex_unlock(tdata->lock);
close(sd);
- //Reset numread and nummod for the next machine
pthread_exit(NULL);
}
plistnode_t *tmp, *pile = NULL;
int i, rc;
int pilecount = 0, offset, numthreads = 0, trecvcount = 0;
- short numread = 0,nummod = 0;
char buffer[RECEIVE_BUFFER_SIZE],control;
- char tmpbuffer[RECEIVE_BUFFER_SIZE];
char transid[TID_LEN];
static int newtid = 0;
- pthread_cond_t threshold;
- pthread_mutex_t count;
ptr = record->lookupTable->table;
size = record->lookupTable->size;
}
next = curr->next;
//Get machine location for object id
- machinenum = lhashSearch(curr->key);
+ if ((machinenum = lhashSearch(curr->key)) == 0) {
+ printf("Error: No such machine\n");
+ return 1;
+ }
+ if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
+ printf("Error: No such oid\n");
+ return 1;
+ }
//Make machine groups
- if ((pile = pInsert(pile, machinenum, curr->key, record->lookupTable->numelements)) == NULL) {
+ if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
perror("pInsert calloc error");
return 1;
}
pthread_mutex_t tlock;
thread_data_array_t thread_data_array[pilecount];
- char rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
+ thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
//Initialize and set thread detach attribute
pthread_attr_init(&attr);
pListMid(pile, listmid);
//Process each machine group
while(tmp != NULL) {
- unsigned int *oidmod = calloc(record->lookupTable->numelements, sizeof(unsigned int));
- unsigned int *oidread = calloc(record->lookupTable->numelements, sizeof(unsigned int));
- nummod = numread = tot_bytes_mod = 0;
- offset = 0;
-
-
//Create transaction id
newtid++;
- sprintf(transid, "%x_%d", tmp->mid, newtid);
- //Browse through each oid in machine group
- for(i = 0; i < tmp->index; i++) {
- headeraddr = (objheader_t *) chashSearch(record->lookupTable, tmp->obj[i]);
- //check if object modified in cache
- if((headeraddr->status >> 1) == 1){
- //Keep track of oids that have been modified
- oidmod[nummod] = headeraddr->oid;
- nummod++;
- tot_bytes_mod += (sizeof(objheader_t) + classsize[headeraddr->type]); //Keeps track of total bytes of modified object
- } else {
- //Keep track of oids that are read
- oidread[numread] = headeraddr->oid;
- //create <oid,version> tuples in temporary buffer
- memcpy(tmpbuffer+offset, &headeraddr->oid, sizeof(unsigned int));
- offset += sizeof(unsigned int);
- memcpy(tmpbuffer+offset, &headeraddr->version, sizeof(short));
- offset += sizeof(short);
- numread++;
- }
- }
- //Copy each field of the packet into buffer
- bzero((char *)buffer,sizeof(buffer));
- offset = 0;
- buffer[offset] = TRANS_REQUEST;
- offset = offset + 1;
- memcpy(buffer+offset, transid, sizeof(char) * TID_LEN);
- offset += (sizeof(char) * TID_LEN);
- memcpy(buffer+offset, &pilecount, sizeof(int));
- offset += sizeof(int);
- memcpy(buffer+offset, &numread, sizeof(short));
- offset += sizeof(short);
- memcpy(buffer+offset, &nummod, sizeof(short));
- offset += sizeof(short);
- memcpy(buffer+offset, &tot_bytes_mod, sizeof(unsigned int));
- offset += sizeof(unsigned int);
- memcpy(buffer+offset, listmid, sizeof(unsigned int) * pilecount);
- offset += (sizeof(unsigned int) * pilecount);
- memcpy(buffer+offset, tmpbuffer, sizeof(char) * RECEIVE_BUFFER_SIZE);
- offset += (sizeof(char) * RECEIVE_BUFFER_SIZE);
- //send objects for all objects modified
- for( i= 0; i< nummod; i++) {
- headeraddr = (objheader_t *) chashSearch(record->lookupTable, oidmod[i]);
- memcpy(buffer+offset, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]);
- offset += sizeof(objheader_t) + classsize[headeraddr->type];
+ trans_req_data_t *tosend;
+ if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
+ perror("");
+ return 1;
}
- if (offset > RECEIVE_BUFFER_SIZE) {
- printf("Error: Buffersize too small");
- }
- //Create thread input to pass multiple arguments via structure
+ tosend->f.control = TRANS_REQUEST;
+ sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
+ tosend->f.mcount = pilecount;
+ tosend->f.numread = tmp->numread;
+ tosend->f.nummod = tmp->nummod;
+ tosend->f.sum_bytes = tmp->sum_bytes;
+ tosend->listmid = listmid;
+ tosend->objread = tmp->objread;
+ tosend->oidmod = tmp->oidmod;
thread_data_array[numthreads].thread_id = numthreads;
thread_data_array[numthreads].mid = tmp->mid;
thread_data_array[numthreads].pilecount = pilecount;
- thread_data_array[numthreads].buffer = buffer;
+ thread_data_array[numthreads].buffer = tosend;
thread_data_array[numthreads].recvmsg = rcvd_control_msg;
thread_data_array[numthreads].threshold = &tcond;
thread_data_array[numthreads].lock = &tlock;
thread_data_array[numthreads].count = &trecvcount;
- //Spawn thread for each TRANS_REQUEST
+ thread_data_array[numthreads].rec = record;
+
rc = pthread_create(&thread[numthreads], &attr, transRequest, (void *) &thread_data_array[numthreads]);
if (rc) {
perror("Error in pthread create");
- exit(-1);
+ return 1;
}
numthreads++;
- sleep(2);
- free(oidmod);
- free(oidread);
+ //TODO frees ?
tmp = tmp->next;
}
+
// Free attribute and wait for the other threads
pthread_attr_destroy(&attr);
for (i = 0 ;i < pilecount ; i++) {
if (rc)
{
printf("ERROR return code from pthread_join() is %d\n", rc);
- exit(-1);
+ return 1;
}
}
-
//Free resources
pthread_cond_destroy(&tcond);
pthread_mutex_destroy(&tlock);
- pthread_exit(NULL);
free(listmid);
+ return 0;
}
int transSoftAbort(transrecord_t *record){