11 #include<netinet/in.h>
12 #include <sys/types.h>
16 #define LISTEN_PORT 2156
17 #define MACHINE_IP "127.0.0.1"
18 #define RECEIVE_BUFFER_SIZE 2048
20 extern int classsize[];
22 void randomdelay(void)
24 struct timespec req, rem;
29 req.tv_nsec = (long)(1000000 + (t%10000000)); //1-11 msec
30 nanosleep(&req, &rem);
34 transrecord_t *transStart()
36 transrecord_t *tmp = malloc(sizeof(transrecord_t));
37 tmp->cache = objstrCreate(1048576);
38 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
42 objheader_t *transRead(transrecord_t *record, unsigned int oid)
44 unsigned int machinenumber;
45 objheader_t *tmp, *objheader;
50 if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
51 //printf("DEBUG -> transRead oid %d found local\n", oid);
53 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
54 //Look up in Machine lookup table and found
56 //printf("oid is found in Local machinelookup\n");
57 tmp = mhashSearch(oid);
58 size = sizeof(objheader_t)+classsize[tmp->type];
60 objcopy = objstrAlloc(record->cache, size);
61 memcpy(objcopy, (void *)tmp, size);
62 //Insert into cache's lookup table
63 chashInsert(record->lookupTable, objheader->oid, objcopy);
66 //Get the object from the remote location
67 //printf("oid is found in remote machine\n");
68 machinenumber = lhashSearch(oid);
69 objcopy = getRemoteObj(record, machinenumber, oid);
71 //If object is not found in Remote location
72 //printf("Object oid = %d not found in Machine %d\n", oid, machinenumber);
76 //printf("Object oid = %d found in Machine %d\n", oid, machinenumber);
82 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
84 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
85 tmp->oid = getNewOID();
88 tmp->rcount = 0; //? not sure how to handle this yet
91 chashInsert(record->lookupTable, tmp->oid, tmp);
95 //This function decides the reponse that needs to be sent to all other machines involved in a
96 //transaction by the machine that initiated the transaction request
98 int decideResponse(thread_data_array_t *tdata) {
100 int i, transagree = 0, transdisagree = 0, transsoftabort = 0;
102 //Check common data structure
103 for (i = 0 ; i < tdata->pilecount ; i++) {
105 control = tdata->recvmsg[i].rcv_status;
108 printf("DEBUG-> trans.c Recv TRANS_DISAGREE\n");
113 printf("DEBUG-> trans.c Recv TRANS_AGREE\n");
117 case TRANS_SOFT_ABORT:
118 printf("DEBUG-> trans.c Recv TRANS_SOFT_ABORT\n");
122 printf("Participant sent unknown message in %s, %d\n", __FILE__, __LINE__);
127 //Decide what control message to send to Participant
128 if(transdisagree > 0) {
130 *(tdata->replyctrl) = TRANS_ABORT;
131 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
132 objstrDelete(tdata->rec->cache);
133 chashDelete(tdata->rec->lookupTable);
135 } else if(transagree == tdata->pilecount){
137 *(tdata->replyctrl) = TRANS_COMMIT;
138 printf("DEBUG-> trans.c Sending TRANS_COMMIT\n");
139 objstrDelete(tdata->rec->cache);
140 chashDelete(tdata->rec->lookupTable);
142 } else if(transsoftabort > 0 && transdisagree == 0) {
144 *(tdata->replyctrl) = TRANS_ABORT;
145 *(tdata->replyretry) = 1;
146 //objstrDelete(tdata->rec->cache);
147 //chashDelete(tdata->rec->lookupTable);
149 printf("DEBUG-> trans.c Sending TRANS_ABORT\n");
151 printf("DEBUG -> %s, %d: Error: undecided response\n", __FILE__, __LINE__);
157 //This function sends the final response to all threads in their respective socket id
158 char sendResponse(thread_data_array_t *tdata, int sd) {
159 int n, N, sum, oidcount = 0;
160 char *ptr, retval = 0;
161 unsigned int *oidnotfound;
163 //If the decided response is due to a soft abort and missing objects at the Participant's side
164 if(tdata->recvmsg[tdata->thread_id].rcv_status == TRANS_SOFT_ABORT) {
165 //Read list of objects missing
166 if((read(sd, &oidcount, sizeof(int)) != 0) && (oidcount != 0)) {
167 //Break if only objs are locked at the Participant side
168 N = oidcount * sizeof(unsigned int);
169 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
170 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
172 ptr = (char *) oidnotfound;
174 n = read(sd, ptr+sum, N-sum);
176 } while(sum < N && n !=0);
178 retval = TRANS_SOFT_ABORT;
180 //If the decided response is TRANS_ABORT
181 if(*(tdata->replyctrl) == TRANS_ABORT) {
182 retval = TRANS_ABORT;
184 if(*(tdata->replyctrl) == TRANS_COMMIT) {
185 retval = TRANS_COMMIT;
187 // Send response to the Participant
188 if (send(sd, tdata->replyctrl, sizeof(char),MSG_NOSIGNAL) < sizeof(char)) {
189 perror("Error sending ctrl message for participant\n");
195 void *transRequest(void *threadarg) {
197 struct sockaddr_in serv_addr;
198 struct hostent *server;
199 thread_data_array_t *tdata;
200 objheader_t *headeraddr;
201 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
202 char machineip[16], retval;
204 tdata = (thread_data_array_t *) threadarg;
206 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
207 perror("Error in socket for TRANS_REQUEST\n");
210 bzero((char*) &serv_addr, sizeof(serv_addr));
211 serv_addr.sin_family = AF_INET;
212 serv_addr.sin_port = htons(LISTEN_PORT);
213 midtoIP(tdata->mid,machineip);
214 machineip[15] = '\0';
215 serv_addr.sin_addr.s_addr = inet_addr(machineip);
217 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
218 perror("Error in connect for TRANS_REQUEST\n");
222 printf("DEBUG-> trans.c Sending TRANS_REQUEST to mid %s\n", machineip);
223 //Send bytes of data with TRANS_REQUEST control message
224 if (send(sd, &(tdata->buffer->f), sizeof(fixed_data_t),MSG_NOSIGNAL) < sizeof(fixed_data_t)) {
225 perror("Error sending fixed bytes for thread\n");
228 //Send list of machines involved in the transaction
230 int size=sizeof(unsigned int)*tdata->pilecount;
231 if (send(sd, tdata->buffer->listmid, size, MSG_NOSIGNAL) < size) {
232 perror("Error sending list of machines for thread\n");
236 //Send oids and version number tuples for objects that are read
238 int size=(sizeof(unsigned int)+sizeof(short))*tdata->buffer->f.numread;
239 if (send(sd, tdata->buffer->objread, size, MSG_NOSIGNAL) < size) {
240 perror("Error sending tuples for thread\n");
244 //Send objects that are modified
245 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
247 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
248 size=sizeof(objheader_t)+classsize[headeraddr->type];
249 if (send(sd, headeraddr, size, MSG_NOSIGNAL) < size) {
250 perror("Error sending obj modified for thread\n");
255 //Read message control message from participant side
256 if((n = read(sd, &control, sizeof(char))) <= 0) {
257 perror("Error in reading control message from Participant\n");
260 recvcontrol = control;
262 //Update common data structure and increment count
263 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
264 //Lock and update count
265 //Thread sleeps until all messages from pariticipants are received by coordinator
266 pthread_mutex_lock(tdata->lock);
270 if(*(tdata->count) == tdata->pilecount) {
271 if (decideResponse(tdata) != 0) {
272 printf("decideResponse returned error %s,%d\n", __FILE__, __LINE__);
273 pthread_mutex_unlock(tdata->lock);
277 pthread_cond_broadcast(tdata->threshold);
279 pthread_cond_wait(tdata->threshold, tdata->lock);
282 pthread_mutex_unlock(tdata->lock);
284 if (sendResponse(tdata, sd) == 0) {
285 printf("sendResponse returned error %s,%d\n", __FILE__, __LINE__);
286 pthread_mutex_unlock(tdata->lock);
294 int transCommit(transrecord_t *record) {
295 chashlistnode_t *curr, *ptr, *next;
296 unsigned int size;//Represents number of bins in the chash table
297 unsigned int machinenum, tot_bytes_mod, *listmid;
298 objheader_t *headeraddr;
299 plistnode_t *tmp, *pile = NULL;
301 int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
302 char buffer[RECEIVE_BUFFER_SIZE],control;
303 char transid[TID_LEN];
304 static int newtid = 0;
305 trans_req_data_t *tosend;
306 char treplyctrl = 0, treplyretry = 0; //keep track of the common response that needs to be sent
308 ptr = record->lookupTable->table;
309 size = record->lookupTable->size;
310 //Look through all the objects in the cache and make piles
311 for(i = 0; i < size ;i++) {
313 //Inner loop to traverse the linked list of the cache lookupTable
314 while(curr != NULL) {
315 //if the first bin in hash table is empty
320 //Get machine location for object id
322 if ((machinenum = lhashSearch(curr->key)) == 0) {
323 printf("Error: No such machine %s, %d\n", __FILE__, __LINE__);
327 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
328 printf("Error: No such oid %s, %d\n", __FILE__, __LINE__);
331 //Make machine groups
332 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
333 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
340 //Create the packet to be sent in TRANS_REQUEST
342 pilecount = pCount(pile); //Keeps track of the number of participants
344 //Thread related variables
345 pthread_t thread[pilecount]; //Create threads for each participant
347 pthread_cond_t tcond;
348 pthread_mutex_t tlock;
349 pthread_mutex_t tlshrd;
350 //thread_data_array_t thread_data_array[pilecount];
351 thread_data_array_t *thread_data_array;
353 thread_data_array = (thread_data_array_t *) malloc(sizeof(thread_data_array_t)*pilecount);
355 thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
357 //Initialize and set thread detach attribute
358 pthread_attr_init(&attr);
359 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
360 pthread_mutex_init(&tlock, NULL);
361 pthread_cond_init(&tcond, NULL);
363 //Keep track of list of machine ids per transaction
364 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
365 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
369 pListMid(pile, listmid);
370 //Process each machine group
371 //Should be a new function for while loop
373 //Create transaction id
375 //trans_req_data_t *tosend;
376 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
377 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
380 tosend->f.control = TRANS_REQUEST;
381 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
382 tosend->f.mcount = pilecount;
383 tosend->f.numread = tmp->numread;
384 tosend->f.nummod = tmp->nummod;
385 tosend->f.sum_bytes = tmp->sum_bytes;
386 tosend->listmid = listmid;
387 tosend->objread = tmp->objread;
388 tosend->oidmod = tmp->oidmod;
389 thread_data_array[numthreads].thread_id = numthreads;
390 thread_data_array[numthreads].mid = tmp->mid;
391 thread_data_array[numthreads].pilecount = pilecount;
392 thread_data_array[numthreads].buffer = tosend;
393 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
394 thread_data_array[numthreads].threshold = &tcond;
395 thread_data_array[numthreads].lock = &tlock;
396 thread_data_array[numthreads].count = &trecvcount;
397 thread_data_array[numthreads].replyctrl = &treplyctrl;
398 thread_data_array[numthreads].replyretry = &treplyretry;
399 thread_data_array[numthreads].rec = record;
401 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
403 perror("Error in pthread create");
411 // Free attribute and wait for the other threads
412 pthread_attr_destroy(&attr);
413 for (i = 0 ;i < pilecount ; i++) {
414 rc = pthread_join(thread[i], NULL);
417 printf("ERROR return code from pthread_join() is %d\n", rc);
423 pthread_cond_destroy(&tcond);
424 pthread_mutex_destroy(&tlock);
429 if(treplyretry == 1) {
430 //wait a random amount of time
433 //Retry the commiting transaction again
440 //mnun will be used to represent the machine IP address later
441 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
443 struct sockaddr_in serv_addr;
444 struct hostent *server;
450 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
451 perror("Error in socket\n");
454 bzero((char*) &serv_addr, sizeof(serv_addr));
455 serv_addr.sin_family = AF_INET;
456 serv_addr.sin_port = htons(LISTEN_PORT);
457 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
458 midtoIP(mnum,machineip);
459 machineip[15] = '\0';
460 serv_addr.sin_addr.s_addr = inet_addr(machineip);
462 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
463 perror("Error in connect\n");
466 char readrequest[sizeof(char)+sizeof(unsigned int)];
467 readrequest[0] = READ_REQUEST;
468 *((unsigned int *)(&readrequest[1])) = oid;
469 if (send(sd, &readrequest, sizeof(readrequest), MSG_NOSIGNAL) < sizeof(readrequest)) {
470 perror("Error sending message\n");
475 printf("DEBUG -> ready to rcv ...\n");
477 //Read response from the Participant
478 if((val = read(sd, &control, sizeof(char))) <= 0) {
479 perror("No control response for getRemoteObj sent\n");
483 case OBJECT_NOT_FOUND:
484 printf("DEBUG -> Control OBJECT_NOT_FOUND received\n");
487 if((val = read(sd, &size, sizeof(int))) <= 0) {
488 perror("No size is read from the participant\n");
491 objcopy = objstrAlloc(record->cache, size);
492 if((val = read(sd, objcopy, size)) <= 0) {
493 perror("No objects are read from the remote participant\n");
496 //Insert into cache's lookup table
497 chashInsert(record->lookupTable, oid, objcopy);
500 printf("Error in recv request from participant on a READ_REQUEST %s, %d\n",__FILE__, __LINE__);