11 #include<netinet/in.h>
13 #define LISTEN_PORT 2156
14 #define MACHINE_IP "127.0.0.1"
15 #define RECEIVE_BUFFER_SIZE 2048
17 extern int classsize[];
19 transrecord_t *transStart()
21 transrecord_t *tmp = malloc(sizeof(transrecord_t));
22 tmp->cache = objstrCreate(1048576);
23 tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
27 objheader_t *transRead(transrecord_t *record, unsigned int oid)
29 printf("Enter TRANS_READ\n");
30 unsigned int machinenumber;
31 objheader_t *tmp, *objheader;
36 if((objheader =(objheader_t *)chashSearch(record->lookupTable, oid)) != NULL){
37 printf("DEBUG -> transRead oid %d found local\n", oid);
39 } else if ((objheader = (objheader_t *) mhashSearch(oid)) != NULL) {
40 //Look up in Machine lookup table and found
41 printf("oid not found in local cache\n");
42 tmp = mhashSearch(oid);
43 size = sizeof(objheader_t)+classsize[tmp->type];
45 objcopy = objstrAlloc(record->cache, size);
46 memcpy(objcopy, (void *)tmp, size);
47 //Insert into cache's lookup table
48 chashInsert(record->lookupTable, objheader->oid, objcopy);
51 //Get the object from the remote location
52 //printf("oid not found in local machine lookup\n");
53 printf("machinenumber = %d\n",machinenumber);
54 printf("oid = %d\n",oid);
55 machinenumber = lhashSearch(oid);
56 printf("machinenumber = %d\n",machinenumber);
57 objcopy = getRemoteObj(record, machinenumber, oid);
59 //If object is not found in Remote location
60 printf("Object not found in Machine %d\n", machinenumber);
68 objheader_t *transCreateObj(transrecord_t *record, unsigned short type)
70 objheader_t *tmp = (objheader_t *) objstrAlloc(record->cache, (sizeof(objheader_t) + classsize[type]));
71 tmp->oid = getNewOID();
74 tmp->rcount = 0; //? not sure how to handle this yet
77 chashInsert(record->lookupTable, tmp->oid, tmp);
80 //int decideResponse(thread_data_array_t *tdata, char *control, int sd) {
81 int decideResponse(thread_data_array_t *tdata, int sd, int val) {
82 int i, n, N, sum, oidcount = 0;
83 int transagree = 0, transdisagree = 0, transsoftabort = 0, transsoftabortmiss = 0;
84 char ctrl, control, *ptr;
85 unsigned int *oidnotfound;
88 //Check common data structure
89 for (i = 0 ; i < tdata->pilecount ; i++) {
91 control = tdata->recvmsg[i].rcv_status;
94 printf("DEBUG-> Inside TRANS_DISAGREE\n");
96 //Free transaction records
97 objstrDelete(tdata->rec->cache);
98 chashDelete(tdata->rec->lookupTable);
102 if (write(sd, &ctrl, sizeof(char)) < 0) {
103 perror("Error sending ctrl message for participant\n");
109 printf("DEBUG-> Inside TRANS_AGREE\n");
113 case TRANS_SOFT_ABORT:
114 printf("DEBUG-> Inside TRANS_SOFT_ABORT\n");
116 /* Do a socket read only if TRANS_SOFT_ABORT was meant for this thread */
117 if ((i == tdata->thread_id) && (val == 0)) {
118 //Read list of objects missing
119 if(read(sd, &oidcount, sizeof(int)) != 0) {
120 //Break if only objs are locked at the Participant side
124 transsoftabortmiss++;
125 N = oidcount * sizeof(unsigned int);
126 if((oidnotfound = calloc(oidcount, sizeof(unsigned int))) == NULL) {
127 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
129 ptr = (char *) oidnotfound;
131 n = read(sd, ptr+sum, N-sum);
133 } while(sum < N && n !=0);
139 printf("Participant sent unknown message\n");
143 //Decide what control message to send to Participant
144 if(transagree == tdata->pilecount){
147 printf("Sending TRANS_COMMIT\n");
148 if (write(sd, &ctrl, sizeof(char)) < 0) {
149 perror("Error sending ctrl message for participant\n");
154 if(transsoftabort > 0 && transdisagree == 0 && transsoftabortmiss == 0) {
155 //Send abort but retry commit
156 ctrl = TRANS_ABORT_BUT_RETRY_COMMIT;
157 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT\n");
158 if (write(sd, &ctrl, sizeof(char)) < 0) {
159 perror("Error sending ctrl message for participant\n");
164 //Read new control message from Participant
165 n = read(sd, &control, sizeof(char));
167 //Update common data structure and increment count
168 tdata->recvmsg[tdata->thread_id].rcv_status = control;
170 decideResponse(tdata, sd, val); //Second call to decideResponse(); indicated by parameter val = 1
173 if(transsoftabort > 0 && transsoftabortmiss > 0 && transdisagree == 0) {
174 //Send abort but retry commit after relloking up objects
175 //ctrl = TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING;
177 printf("Sending TRANS_ABORT_BUT_RETRY_COMMIT_WITH_RELOCATING\n");
178 if (write(sd, &ctrl, sizeof(char)) < 0) {
179 perror("Error sending ctrl message for participant\n");
184 //update location table
192 void *transRequest(void *threadarg) {
194 struct sockaddr_in serv_addr;
195 struct hostent *server;
196 thread_data_array_t *tdata;
197 objheader_t *headeraddr;
198 //unsigned int *oidnotfound;
199 char buffer[RECEIVE_BUFFER_SIZE], control, recvcontrol;
202 tdata = (thread_data_array_t *) threadarg;
203 printf("DEBUG -> New thread id %d\n", tdata->thread_id);
205 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
206 perror("Error in socket for TRANS_REQUEST");
209 bzero((char*) &serv_addr, sizeof(serv_addr));
210 serv_addr.sin_family = AF_INET;
211 serv_addr.sin_port = htons(LISTEN_PORT);
212 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
213 midtoIP(tdata->mid,machineip);
214 machineip[15] = '\0';
215 serv_addr.sin_addr.s_addr = inet_addr(machineip);
216 //serv_addr.sin_addr.s_addr = inet_addr(tdata->mid);
218 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
219 perror("Error in connect for TRANS_REQUEST");
223 //Multiple writes for sending packets of data
224 //Send first few fixed bytes of the TRANS_REQUEST protocol
225 printf("DEBUG -> Start sending commit data... %d\n", tdata->buffer->f.control);
226 // printf("DEBUG-> Bytes sent in first write: %d\n", sizeof(fixed_data_t));
227 // printf("Machine count = %d\tnumread = %d\tnummod = %d\tsum_bytes = %d\n", tdata->buffer->f.mcount, tdata->buffer->f.numread, tdata->buffer->f.nummod, tdata->buffer->f.sum_bytes);
228 if (write(sd, &(tdata->buffer->f), (sizeof(fixed_data_t))) < 0) {
229 perror("Error sending fixed bytes for thread");
232 //Send list of machines involved in the transaction
233 // printf("DEBUG-> Bytes sent in second write: %d\n", sizeof(unsigned int) * tdata->pilecount);
234 if (write(sd, tdata->buffer->listmid, (sizeof(unsigned int) * tdata->pilecount )) < 0) {
235 perror("Error sending list of machines for thread");
238 //Send oids and version number tuples for objects that are read
239 // printf("DEBUG-> Bytes sent in the third write: %d\n", (sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread);
240 // printf(" DEBUG->Read oids are %d %x %d %d\n", *(tdata->buffer->objread), (tdata->buffer->objread + 6), *(tdata->buffer->objread + 12), *(tdata->buffer->objread +18));
241 if (write(sd, tdata->buffer->objread, ((sizeof(unsigned int) + sizeof(short)) * tdata->buffer->f.numread )) < 0) {
242 perror("Error sending tuples for thread");
245 //Send objects that are modified
246 for(i = 0; i < tdata->buffer->f.nummod ; i++) {
247 headeraddr = chashSearch(tdata->rec->lookupTable, tdata->buffer->oidmod[i]);
248 // printf("DEBUG -> Bytes sent for oid = %d modified %d\n", *((int *)headeraddr), sizeof(objheader_t) + classsize[headeraddr->type]);
249 if (write(sd, headeraddr, sizeof(objheader_t) + classsize[headeraddr->type]) < 0) {
250 perror("Error sending obj modified for thread");
255 //Read message control message from participant side
256 n = read(sd, &control, sizeof(char));
257 recvcontrol = control;
258 printf("DEBUG -> After TRANS_REQUEST, message control recv is %d\n", recvcontrol);
260 //Update common data structure and increment count
261 tdata->recvmsg[tdata->thread_id].rcv_status = recvcontrol;
262 //Lock and update count
263 //Thread sleeps until all messages from pariticipants are received by coordinator
264 pthread_mutex_lock(tdata->lock);
267 if(*(tdata->count) == tdata->pilecount) {
268 pthread_cond_broadcast(tdata->threshold);
269 //process the participant's request
270 if (decideResponse(tdata, sd, 0) != 0) { // Send the first call to decideResponse() ; indicated by parameter 0
271 printf("decideResponse returned error %s. %d\n", __FILE__, __LINE__);
272 pthread_mutex_unlock(tdata->lock);
276 pthread_cond_wait(tdata->threshold, tdata->lock);
278 pthread_mutex_unlock(tdata->lock);
284 int transCommit(transrecord_t *record) {
285 chashlistnode_t *curr, *ptr, *next;
286 unsigned int size;//Represents number of bins in the chash table
287 unsigned int machinenum, tot_bytes_mod, *listmid;
288 objheader_t *headeraddr;
289 plistnode_t *tmp, *pile = NULL;
291 int pilecount = 0, offset, numthreads = 0, trecvcount = 0, tmachcount = 0;
292 char buffer[RECEIVE_BUFFER_SIZE],control;
293 char transid[TID_LEN];
294 static int newtid = 0;
296 ptr = record->lookupTable->table;
297 size = record->lookupTable->size;
298 //Look through all the objects in the cache and make piles
299 for(i = 0; i < size ;i++) {
301 //Inner loop to traverse the linked list of the cache lookupTable
302 while(curr != NULL) {
303 //if the first bin in hash table is empty
308 //Get machine location for object id
310 if ((machinenum = lhashSearch(curr->key)) == 0) {
311 printf("Error: No such machine\n");
315 //TODO only for debug
317 if ((headeraddr = chashSearch(record->lookupTable, curr->key)) == NULL) {
318 printf("Error: No such oid\n");
321 //Make machine groups
322 if ((pile = pInsert(pile, headeraddr, machinenum, record->lookupTable->numelements)) == NULL) {
323 printf("pInsert error %s, %d\n", __FILE__, __LINE__);
330 //Create the packet to be sent in TRANS_REQUEST
332 pilecount = pCount(pile); //Keeps track of the number of participants
334 //Thread related variables
335 pthread_t thread[pilecount]; //Create threads for each participant
337 pthread_cond_t tcond;
338 pthread_mutex_t tlock;
339 pthread_mutex_t tlshrd;
340 thread_data_array_t thread_data_array[pilecount];
342 thread_response_t rcvd_control_msg[pilecount]; //Shared thread array that keeps track of responses of participants
344 //Initialize and set thread detach attribute
345 pthread_attr_init(&attr);
346 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
347 pthread_mutex_init(&tlock, NULL);
348 pthread_cond_init(&tcond, NULL);
350 //Keep track of list of machine ids per transaction
351 if((listmid = calloc(pilecount, sizeof(unsigned int))) == NULL) {
352 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
356 pListMid(pile, listmid);
357 //Process each machine group
359 printf("DEBUG -> Created thread %d... \n", numthreads);
360 //Create transaction id
362 trans_req_data_t *tosend;
363 if ((tosend = calloc(1, sizeof(trans_req_data_t))) == NULL) {
364 printf("Calloc error %s, %d\n", __FILE__, __LINE__);
367 tosend->f.control = TRANS_REQUEST;
368 sprintf(tosend->f.trans_id, "%x_%d", tmp->mid, newtid);
369 tosend->f.mcount = pilecount;
370 tosend->f.numread = tmp->numread;
371 tosend->f.nummod = tmp->nummod;
372 tosend->f.sum_bytes = tmp->sum_bytes;
373 tosend->listmid = listmid;
374 tosend->objread = tmp->objread;
375 tosend->oidmod = tmp->oidmod;
376 thread_data_array[numthreads].thread_id = numthreads;
377 thread_data_array[numthreads].mid = tmp->mid;
378 thread_data_array[numthreads].pilecount = pilecount;
379 thread_data_array[numthreads].buffer = tosend;
380 thread_data_array[numthreads].recvmsg = rcvd_control_msg;
381 thread_data_array[numthreads].threshold = &tcond;
382 thread_data_array[numthreads].lock = &tlock;
383 thread_data_array[numthreads].count = &trecvcount;
384 thread_data_array[numthreads].rec = record;
386 rc = pthread_create(&thread[numthreads], NULL, transRequest, (void *) &thread_data_array[numthreads]);
388 perror("Error in pthread create");
397 // Free attribute and wait for the other threads
398 pthread_attr_destroy(&attr);
399 for (i = 0 ;i < pilecount ; i++) {
400 rc = pthread_join(thread[i], NULL);
403 printf("ERROR return code from pthread_join() is %d\n", rc);
409 pthread_cond_destroy(&tcond);
410 pthread_mutex_destroy(&tlock);
416 //mnun will be used to represent the machine IP address later
417 void *getRemoteObj(transrecord_t *record, unsigned int mnum, unsigned int oid) {
419 struct sockaddr_in serv_addr;
420 struct hostent *server;
426 if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
427 perror("Error in socket");
430 bzero((char*) &serv_addr, sizeof(serv_addr));
431 serv_addr.sin_family = AF_INET;
432 serv_addr.sin_port = htons(LISTEN_PORT);
433 //serv_addr.sin_addr.s_addr = inet_addr(MACHINE_IP);
434 midtoIP(mnum,machineip);
435 machineip[15] = '\0';
436 serv_addr.sin_addr.s_addr = inet_addr(machineip);
438 if (connect(sd, (struct sockaddr *) &serv_addr, sizeof(struct sockaddr)) < 0) {
439 perror("Error in connect");
442 char readrequest[sizeof(char)+sizeof(unsigned int)];
443 readrequest[0] = READ_REQUEST;
444 *((unsigned int *)(&readrequest[1])) = oid;
445 if (write(sd, &readrequest, sizeof(readrequest)) < 0) {
446 perror("Error sending message");
451 printf("DEBUG -> ready to rcv ...\n");
453 //Read response from the Participant
454 read(sd, &control, sizeof(char));
456 case OBJECT_NOT_FOUND:
460 read(sd, &size, sizeof(int));
461 objcopy = objstrAlloc(record->cache, size);
462 read(sd, objcopy, size);
463 //Insert into cache's lookup table
464 chashInsert(record->lookupTable, oid, objcopy);
467 printf("Error in recv request from participant on a READ_REQUEST\n");