11 #define LISTEN_PORT 2156
12 #define BACKLOG 10 //max pending connections
13 #define RECEIVE_BUFFER_SIZE 2048
15 extern int classsize[];
17 objstr_t *mainobjstore;
21 //Initialize main object store
22 mainobjstore = objstrCreate(DEFAULT_OBJ_STORE_SIZE);
23 if (mhashCreate(HASH_SIZE, LOADFACTOR))
26 if (lhashCreate(HASH_SIZE, LOADFACTOR))
29 //pthread_t threadListen;
30 //pthread_create(&threadListen, NULL, dstmListen, NULL);
37 int listenfd, acceptfd;
38 struct sockaddr_in my_addr;
39 struct sockaddr_in client_addr;
40 socklen_t addrlength = sizeof(struct sockaddr);
41 pthread_t thread_dstm_accept;
44 listenfd = socket(AF_INET, SOCK_STREAM, 0);
51 my_addr.sin_family = AF_INET;
52 my_addr.sin_port = htons(LISTEN_PORT);
53 my_addr.sin_addr.s_addr = INADDR_ANY;
54 memset(&(my_addr.sin_zero), '\0', 8);
56 if (bind(listenfd, (struct sockaddr *)&my_addr, addrlength) == -1)
62 if (listen(listenfd, BACKLOG) == -1)
68 printf("Listening on port %d, fd = %d\n", LISTEN_PORT, listenfd);
71 acceptfd = accept(listenfd, (struct sockaddr *)&client_addr, &addrlength);
72 pthread_create(&thread_dstm_accept, NULL, dstmAccept, (void *)acceptfd);
77 void *dstmAccept(void *acceptfd)
79 int numbytes,i, val, retval;
81 char buffer[RECEIVE_BUFFER_SIZE], control,ctrl;
85 trans_commit_data_t transinfo;
87 int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
89 printf("Recieved connection: fd = %d\n", (int)acceptfd);
90 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0) {
91 perror("Error in receiving control from coordinator\n");
96 if((retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0)) <= 0) {
97 perror("Error receiving object from cooridnator\n");
100 printf("DEBUG -> Recv READ_REQUEST from Coordinator for oid = %d\n", oid);
101 srcObj = mhashSearch(oid);
102 h = (objheader_t *) srcObj;
103 size = sizeof(objheader_t) + sizeof(classsize[h->type]);
105 ctrl = OBJECT_NOT_FOUND;
106 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
107 perror("Error sending control msg to coordinator\n");
110 //char responsemessage[sizeof(char)+sizeof(int)];
113 if(send((int)acceptfd, &ctrl, sizeof(char), 0) < 0) {
114 perror("Error sending control msg to coordinator\n");
117 //responsemessage[0]=OBJECT_FOUND;
119 //*((int *)(&responsemessage[1])) = sizeof(objheader_t) + classsize[h->type];
120 //if(send((int)acceptfd, &responsemessage, sizeof(responsemessage), 0) < 0) {
121 // perror("Error sending control msg to coordinator\n");
125 if(send((int)acceptfd, &size, sizeof(int), 0) < 0) {
126 perror("Error sending size of object to coordinator\n");
128 if(send((int)acceptfd, h, size, 0) < 0) {
129 perror("Error in sending object\n");
134 case READ_MULT_REQUEST:
135 printf("DEBUG-> READ_MULT_REQUEST\n");
139 printf("DEBUG -> MOVE_REQUEST\n");
142 case MOVE_MULT_REQUEST:
143 printf("DEBUG -> MOVE_MULT_REQUEST\n");
147 printf("DEBUG -> Recv TRANS_REQUEST from Coordinator accept_fd = %d\n", acceptfd);
148 if((val = readClientReq((int)acceptfd, &transinfo)) != 0) {
149 printf("Error in readClientReq\n");
154 printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
156 if (close((int)acceptfd) == -1)
159 printf("Closed connection: fd = %d\n", (int)acceptfd);
162 printf("DEBUG -> Exiting dstmAccept\n");
165 int readClientReq(int acceptfd, trans_commit_data_t *transinfo) {
166 char *ptr, control, prevctrl, sendctrl, newctrl;
167 void *modptr, *header;
168 objheader_t *tmp_header;
170 int sum = 0, i, N, n, val, retval;
172 //Reads to process the TRANS_REQUEST protocol further
174 N = sizeof(fixed) - 1;
175 ptr = (char *)&fixed;;
176 fixed.control = TRANS_REQUEST;
178 n = recv((int)acceptfd, (void *) ptr+1+sum, N-sum, 0);
179 // printf("DEBUG -> 1. Reading %d bytes \n", n);
181 } while(sum < N && n != 0);
183 //printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", fixed.mcount, fixed.numread, fixed.nummod, fixed.sum_bytes);
185 int mcount = fixed.mcount;
186 N = mcount * sizeof(unsigned int);
187 unsigned int listmid[mcount];
188 ptr = (char *) listmid;
191 n = recv((int)acceptfd, (void *) ptr+sum, N-sum, 0);
192 // printf("DEBUG -> 2. Reading %d bytes cap = %d\n", n, N);
194 } while(sum < N && n != 0);
196 // Read oid and version tuples
197 int numread = fixed.numread;
198 N = numread * (sizeof(unsigned int) + sizeof(short));
200 if(numread != 0) { // If pile contains objects to be read
201 // N = numread * (sizeof(unsigned int) + sizeof(short));
205 n = recv((int)acceptfd, (void *) objread, N, 0);
206 // printf("DEBUG -> 3. Reading %d bytes cap = %d\n", n, N);
208 } while(sum < N && n != 0);
209 // printf("DEBUG -> Recv objs from Coordinator %d %d %d %d\n", *objread, *(objread + 6), *(objread + 12), *(objread + 18));
212 // Read modified objects
213 if(fixed.nummod != 0) { // If pile contains modified objects
214 if ((modptr = objstrAlloc(mainobjstore, fixed.sum_bytes)) == NULL) {
215 printf("objstrAlloc error for modified objects %s, %d", __FILE__, __LINE__);
219 do { // Recv the objs that are modified at Coordinator
220 n = recv((int)acceptfd, modptr+sum, fixed.sum_bytes-sum, 0);
221 // printf("DEBUG -> 4. Reading %d bytes cap = %d, oid = %d\n", n, fixed.sum_bytes, *((int *)modptr));
223 } while (sum < fixed.sum_bytes && n != 0);
226 //Send control message as per all votes from all oids in the machine
227 if((prevctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0) {
228 printf("Handle Trans Request Error %s, %d\n", __FILE__, __LINE__);
231 //Read for new control message from Coordiator
232 if((retval = recv((int)acceptfd, &control, sizeof(char), 0)) <= 0 ) {
233 printf("DEBUG -> Error receiving control, received %d\n", control);
237 printf("DEBUG-> Control message after first call to handleTransReq is %d\n", control);
242 printf("DEBUG -> Recv TRANS_ABORT from Coordinator accept_fd %d\n", acceptfd) ;
243 //send ack to coordinator
244 sendctrl = TRANS_SUCESSFUL;
245 if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
246 perror("Error sending ACK to coordinator\n");
249 //Mark all ref counts as 1 and do garbage collection
251 for(i = 0; i< fixed.nummod; i++) {
252 tmp_header = (objheader_t *)ptr;
253 tmp_header->rcount = 1;
254 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
256 //Unlock objects that was locked in this machine due to this transaction
257 for(i = 0; i< transinfo->numlocked; i++) {
258 header = mhashSearch(transinfo->objlocked[i]);// find the header address
259 ((objheader_t *)header)->status &= ~(LOCK);
264 printf("DEBUG -> Recv TRANS_COMMIT from Coordinator accept_fd = %d\n", acceptfd);
265 if((val = transCommitProcess(transinfo, (int)acceptfd)) != 0) {
266 printf("Error in transCommitProcess %s, %d\n", __FILE__, __LINE__);
269 case TRANS_ABORT_BUT_RETRY_COMMIT:
270 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT from Coordinator acceptfd = %d\n", acceptfd);
271 //Process again after waiting for sometime and on prev control message sent
275 sendctrl = TRANS_AGREE;
276 if(send((int)acceptfd, &sendctrl, sizeof(char), 0) < 0) {
277 perror("Error sending ACK to coordinator\n");
281 case TRANS_SOFT_ABORT:
282 if((newctrl = handleTransReq(acceptfd, &fixed, transinfo, listmid, objread, modptr)) == 0 ) {
283 printf("Handle Trans Request Error for second call%s, %d\n", __FILE__, __LINE__);
285 //If no change in previous control message that was sent then ABORT transaction
286 if(newctrl == TRANS_SOFT_ABORT){
288 newctrl = TRANS_DISAGREE;
289 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
290 perror("Error sending ACK to coordinator\n");
292 //Set the reference count of the object to 1 in mainstore for garbage collection
294 for(i = 0; i< fixed.nummod; i++) {
295 tmp_header = (objheader_t *) ptr;
296 tmp_header->rcount = 1;
297 ptr += sizeof(objheader_t) + classsize[tmp_header->type];
299 //Unlock objects that was locked in this machine due to this transaction
300 for(i = 0; i< transinfo->numlocked; i++) {
301 ptr = mhashSearch(transinfo->objlocked[i]);// find the header address
302 ((objheader_t *)ptr)->status &= ~(LOCK);
305 } else if(newctrl == TRANS_AGREE) {
306 newctrl = TRANS_AGREE;
307 //Send new control message
308 if(send((int)acceptfd, &newctrl, sizeof(char), 0) < 0) {
309 perror("Error sending ACK to coordinator\n");
317 case TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING:
318 //TODO expect another transrequest from client
319 printf("DEBUG -> Recv TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING from Coordinator accept_fd%d\n", acceptfd);
322 printf("No response to TRANS_AGREE OR DISAGREE control\n");
323 //TODO Use fixed.trans_id TID since Client may have died
328 printf("DEBUG -> Freeing...");
330 if (transinfo->objmod != NULL) {
331 free(transinfo->objmod);
332 transinfo->objmod = NULL;
334 if (transinfo->objlocked != NULL) {
335 free(transinfo->objlocked);
336 transinfo->objlocked = NULL;
338 if (transinfo->objnotfound != NULL) {
339 free(transinfo->objnotfound);
340 transinfo->objnotfound = NULL;
345 //This function runs a decision after all objects are weighed under one of the 4 possibilities
346 //and returns the appropriate control message to the Ccordinator
347 char handleTransReq(int acceptfd, fixed_data_t *fixed, trans_commit_data_t *transinfo, unsigned int *listmid, char *objread, void *modptr) {
350 char control = 0, ctrlmissoid, *ptr;
353 unsigned int *oidnotfound, *oidlocked, *oidmod;
355 oidnotfound = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
356 oidlocked = (unsigned int *) calloc(fixed->numread + fixed->nummod, sizeof(unsigned int));
357 oidmod = (unsigned int *) calloc(fixed->nummod, sizeof(unsigned int));
358 // Counters and arrays to formulate decision on control message to be sent
359 int objnotfound = 0, objlocked = 0, objmod =0, v_nomatch = 0, v_matchlock = 0, v_matchnolock = 0;
360 int objmodnotfound = 0, nummodfound = 0;
362 objheader_t *headptr;
364 //Process each object present in the pile
367 //Process each oid in the machine pile/ group
368 for (i = 0; i < fixed->numread + fixed->nummod; i++) {
369 if (i < fixed->numread) {//Object is read
370 int incr = sizeof(unsigned int) + sizeof(short);// Offset that points to next position in the objread array
372 oid = *((unsigned int *)(objread + incr));
373 incr += sizeof(unsigned int);
374 version = *((short *)(objread + incr));
375 } else {//Obj is modified
376 headptr = (objheader_t *) ptr;
378 oidmod[objmod] = oid;//Array containing modified oids
380 version = headptr->version;
381 ptr += sizeof(objheader_t) + classsize[headptr->type];
383 //Check if object is still present in the machine since the beginning of TRANS_REQUEST
384 if ((mobj = mhashSearch(oid)) == NULL) {// Obj not found
385 //Save the oids not found for later use
386 oidnotfound[objnotfound] = ((objheader_t *)mobj)->oid;
388 } else { // If obj found in machine (i.e. has not moved)
389 //Check if obj is locked
390 if ((((objheader_t *)mobj)->status & LOCK) == LOCK) {
391 if (version == ((objheader_t *)mobj)->version) { // If version match
393 } else {//If versions don't match ..HARD ABORT
395 //send TRANS_DISAGREE to Coordinator
396 control = TRANS_DISAGREE;
397 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
398 perror("Error in sending control to the Coordinator\n");
401 printf("DEBUG -> Sending TRANS_DISAGREE acceptfd = %d\n", acceptfd);
404 } else {//Obj is not locked , so lock object
405 ((objheader_t *)mobj)->status |= LOCK;
406 //Save all object oids that are locked on this machine during this transaction request call
407 oidlocked[objlocked] = ((objheader_t *)mobj)->oid;
408 printf("DEBUG -> Obj locked are %d\n",((objheader_t *)mobj)->oid);
410 if (version == ((objheader_t *)mobj)->version) { //If versions match
412 } else { //If versions don't match
414 //send TRANS_DISAGREE to Coordinator
415 control = TRANS_DISAGREE;
416 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
417 perror("Error in sending control to the Coordinator\n");
420 printf("DEBUG -> Sending TRANS_DISAGREE accept_fd = %d\n", acceptfd);
427 printf("No of objs locked = %d\n", objlocked);
428 printf("No of v_nomatch = %d\n", v_nomatch);
429 printf("No of objs v_match but are did not have locks before = %d\n", v_matchnolock);
430 printf("No of objs v_match but had locks before = %d\n", v_matchlock);
431 printf("No of objs not found = %d\n", objnotfound);
432 printf("No of objs modified but not found = %d\n", objmodnotfound);
434 //Decide what control message(s) to send
435 //Cond to send TRANS_AGREE
436 if(v_matchnolock == fixed->numread + fixed->nummod) {
437 //send TRANS_AGREE to Coordinator
438 control = TRANS_AGREE;
439 if((val = write(acceptfd, &control, sizeof(char))) <= 0) {
440 perror("Error in sending control to Coordinator\n");
443 printf("DEBUG -> Sending TRANS_AGREE accept_fd = %d\n", acceptfd);
445 //Condition to send TRANS_SOFT_ABORT
446 if((v_matchlock > 0 && v_nomatch == 0) || (objnotfound > 0 && v_nomatch == 0)) {
447 //send TRANS_SOFT_ABORT to Coordinator
448 control = TRANS_SOFT_ABORT;
449 if((val = write(acceptfd, &control, sizeof(char))) <=0 ) {
450 perror("Error in sending control back to coordinator\n");
453 printf("DEBUG -> Sending TRANS_SOFT_ABORT accept_fd = %d\n", acceptfd);
454 //send number of oids not found and the missing oids
455 if((val = write(acceptfd, &objnotfound, sizeof(int))) <= 0) {
456 perror("Error in sending no of objects that are not found\n");
459 if(objnotfound != 0) {
460 if((val = write(acceptfd, oidnotfound, (sizeof(unsigned int) * objnotfound))) <= 0) {
461 perror("Error in sending objects that are not found\n");
467 //Do the following when TRANS_DISAGREE is sent
468 if(control == TRANS_DISAGREE) {
469 //Set the reference count of the object to 1 in mainstore for garbage collection
471 for(i = 0; i< fixed->nummod; i++) {
472 headptr = (objheader_t *) ptr;
474 ptr += sizeof(objheader_t) + classsize[headptr->type];
476 //Unlock objects that was locked in the trans
477 for(i = 0; i< objlocked ; i++) {
478 mobj = mhashSearch(oidlocked[i]);// find the header address
479 ((objheader_t *)mobj)->status &= ~(LOCK);
483 //Fill out the structure required for a trans commit process if pile receives a TRANS_COMMIT
484 transinfo->objmod = oidmod;
485 transinfo->objlocked = oidlocked;
486 transinfo->objnotfound = oidnotfound;
487 transinfo->modptr = modptr;
488 transinfo->nummod = fixed->nummod;
489 transinfo->numlocked = objlocked;
490 transinfo->numnotfound = objnotfound;
495 //Processes oids in the TRANS_COMMIT request at the participant end and sends an ack back
496 int transCommitProcess(trans_commit_data_t *transinfo, int acceptfd) {
498 int i = 0, offset = 0;
500 //Process each modified object saved in the mainobject store
501 for(i=0; i<transinfo->nummod; i++) {
502 if((header = (objheader_t *) mhashSearch(transinfo->objmod[i])) == NULL) {
503 printf("mhashsearch returns NULL %s, %d\n", __FILE__, __LINE__);
505 //change reference count of older address and free space in objstr ??
506 header->rcount = 1; //Not sure what would be th val
507 //change ptr address in mhash table
508 printf("DEBUG -> Removing object oid = %d\n", transinfo->objmod[i]);
509 mhashRemove(transinfo->objmod[i]);
510 mhashInsert(transinfo->objmod[i], (transinfo->modptr + offset));
511 offset += sizeof(objheader_t) + classsize[header->type];
512 //update object version
513 header = (objheader_t *) mhashSearch(transinfo->objmod[i]);
514 header->version += 1;
516 for(i=0; i<transinfo->numlocked; i++) {
518 header = (objheader_t *) mhashSearch(transinfo->objlocked[i]);
519 header->status &= ~(LOCK);
522 //TODO Update location lookup table
524 //send ack to coordinator
525 control = TRANS_SUCESSFUL;
527 printf("DEBUG-> Sending TRANS_SUCCESSFUL from accept_fd = %d\n", acceptfd);
528 if(send((int)acceptfd, &control, sizeof(char), 0) < 0) {
529 perror("Error sending ACK to coordinator\n");