1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
21 int numTransCommit = 0;
22 int numTransAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
29 /* Thread variable for locking/unlocking */
30 __thread threadrec_t trec;
31 __thread struct objlist * lockedobjs;
32 int typesCausingAbort[TOTALNUMCLASSANDARRAY];
33 /******Keep track of objects and types causing aborts******/
35 #define DEBUGSTMSTAT(args...) { \
40 #define DEBUGSTMSTAT(args...)
42 #define DEBUGSTMSTAT(args...)
46 #define DEBUGSTM(x...) printf(x);
48 #define DEBUGSTM(x...)
52 /* ==================================================
54 * This function starts up the transaction runtime.
55 * ==================================================
61 /* ======================================
63 * - create an object store of given size
64 * ======================================
66 objstr_t *objstrCreate(unsigned int size) {
68 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
69 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
74 tmp->top = tmp + 1; //points to end of objstr_t structure!
79 while(t_cache->next!=NULL) {
80 objstr_t *next=t_cache->next;
81 t_cache->next=t_reserve;
85 t_cache->top=t_cache+1;
88 //free entire list, starting at store
89 void objstrDelete(objstr_t *store) {
91 while (store != NULL) {
99 /* =================================================
101 * This function initializes things required in the
103 * =================================================
106 //Transaction start is currently free...commit and aborting is not
109 /* =======================================================
111 * This function creates objects in the transaction record
112 * =======================================================
114 objheader_t *transCreateObj(void * ptr, unsigned int size) {
115 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
116 objheader_t *retval=&tmp[1];
117 tmp->lock=RW_LOCK_BIAS;
120 tmp->accessCount = 0;
123 //initialize obj lock
124 pthread_mutex_init(&tmp->objlock, NULL);
126 // don't insert into table
127 if (newobjs->offset<MAXOBJLIST) {
128 newobjs->objs[newobjs->offset++]=retval;
130 struct objlist *tmp=malloc(sizeof(struct objlist));
136 return retval; //want space after object header
139 /* This functions inserts randowm wait delays in the order of msec
140 * Mostly used when transaction commits retry*/
141 void randomdelay(int softaborted) {
145 gettimeofday(&t,NULL);
148 req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
149 nanosleep(&req, NULL);
153 /* ==============================================
155 * - allocate space in an object store
156 * ==============================================
158 void *objstrAlloc(unsigned int size) {
161 objstr_t *store=t_cache;
167 if (OSFREE(store)>=size) {
172 if ((store=store->next)==NULL)
177 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE ? size : DEFAULT_OBJ_STORE_SIZE;
178 objstr_t **otmp=&t_reserve;
180 while((ptr=*otmp)!=NULL) {
181 if (ptr->size>=newsize) {
186 ptr->top=((char *)(&ptr[1]))+size;
191 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
196 os->top=((char *)nptr)+size;
201 /* =============================================================
203 * -finds the objects either in main heap
204 * -copies the object into the transaction cache
205 * =============================================================
207 __attribute__((pure)) void *transRead(void * oid) {
208 objheader_t *tmp, *objheader;
209 objheader_t *objcopy;
212 /* Read from the main heap */
214 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
215 GETSIZE(size, header);
216 size += sizeof(objheader_t);
217 objcopy = (objheader_t *) objstrAlloc(size);
218 memcpy(objcopy, header, size);
220 header->accessCount++;
221 //FIXME riskratio fix
222 //float riskratio = ((header->abortCount)/(header->accessCount));
223 //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d, riskratio: %f\n", TYPE(header), header->abortCount, header->accessCount, riskratio);
224 //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d\n", TYPE(header), header->abortCount, header->accessCount);
225 //if(header->abortCount > MAXABORTS && riskratio > NEED_LOCK_THRESHOLD) {
226 if(header->abortCount > MAXABORTS) {
228 header->riskyflag = 1;
233 /* Insert into cache's lookup table */
235 t_chashInsert(oid, &objcopy[1]);
240 struct objlist *ptr=newobjs;
241 while(ptr->next!=NULL) {
242 struct objlist *tmp=ptr->next;
251 void freelockedobjs() {
252 struct objlist *ptr=lockedobjs;
253 while(ptr->next!=NULL) {
254 struct objlist *tmp=ptr->next;
263 /* ================================================================
265 * - This function initiates the transaction commit process
266 * - goes through the transaction cache and decides
268 * ================================================================
273 /* Look through all the objects in the transaction hash table */
275 if (c_numelements<(c_size>>3))
276 finalResponse= alttraverseCache();
278 finalResponse= traverseCache();
279 if(finalResponse == TRANS_ABORT) {
294 if(finalResponse == TRANS_COMMIT) {
309 /* wait a random amount of time before retrying to commit transaction*/
310 if(finalResponse == TRANS_SOFT_ABORT) {
316 //retry if too many soft aborts
325 randomdelay(softaborted);
327 printf("Error: in %s() Unknown outcome", __func__);
333 /* ==================================================
335 * - goes through the transaction cache and
336 * - decides if a transaction should commit or abort
337 * ==================================================
339 int traverseCache() {
340 /* Create info to keep track of objects that can be locked */
341 int numoidrdlocked=0;
342 int numoidwrlocked=0;
343 void * rdlocked[200];
345 void * wrlocked[200];
351 if (c_numelements<200) {
352 oidrdlocked=rdlocked;
353 oidrdversion=rdversion;
354 oidwrlocked=wrlocked;
356 int size=c_numelements*sizeof(void*);
357 oidrdlocked=malloc(size);
358 oidrdversion=malloc(size);
359 oidwrlocked=malloc(size);
361 chashlistnode_t *ptr = c_table;
362 /* Represents number of bins in the chash table */
363 unsigned int size = c_size;
364 for(i = 0; i<size; i++) {
365 chashlistnode_t *curr = &ptr[i];
366 /* Inner loop to traverse the linked list of the cache lookupTable */
367 while(curr != NULL) {
368 //if the first bin in hash table is empty
369 if(curr->key == NULL)
371 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
372 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
373 unsigned int version = headeraddr->version;
375 if(STATUS(headeraddr) & DIRTY) {
376 /* Read from the main heap and compare versions */
377 if(write_trylock(&header->lock)) { //can aquire write lock
378 if (version == header->version) { /* versions match */
379 /* Keep track of objects locked */
380 oidwrlocked[numoidwrlocked++] = OID(header);
382 oidwrlocked[numoidwrlocked++] = OID(header);
383 transAbortProcess(oidwrlocked, numoidwrlocked);
385 header->abortCount++;
386 (typesCausingAbort[TYPE(header)])++;
387 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
389 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
390 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
391 if (c_numelements>=200) {
398 } else { /* cannot aquire lock */
399 if(version == header->version) {
403 transAbortProcess(oidwrlocked, numoidwrlocked);
405 header->abortCount++;
406 (typesCausingAbort[TYPE(header)])++;
407 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
409 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
410 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
411 if (c_numelements>=200) {
420 oidrdversion[numoidrdlocked]=version;
421 oidrdlocked[numoidrdlocked++] = header;
427 //THIS IS THE SERIALIZATION POINT *****
429 for(i=0; i<numoidrdlocked; i++) {
430 /* Read from the main heap and compare versions */
431 objheader_t *header=oidrdlocked[i];
432 unsigned int version=oidrdversion[i];
433 if(header->lock>0) { //not write locked
434 if(version != header->version) { /* versions do not match */
435 oidrdlocked[numoidrdlocked++] = OID(header);
436 transAbortProcess(oidwrlocked, numoidwrlocked);
438 header->abortCount++;
439 (typesCausingAbort[TYPE(header)])++;
440 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
442 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
443 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
444 if (c_numelements>=200) {
451 } else { /* cannot aquire lock */
452 //do increment as we didn't get lock
453 if(version == header->version) {
456 transAbortProcess(oidwrlocked, numoidwrlocked);
458 header->abortCount++;
459 (typesCausingAbort[TYPE(header)])++;
460 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
462 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
463 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
464 if (c_numelements>=200) {
474 /* Decide the final response */
476 transAbortProcess(oidwrlocked, numoidwrlocked);
477 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
478 if (c_numelements>=200) {
483 return TRANS_SOFT_ABORT;
485 transCommitProcess(oidwrlocked, numoidwrlocked);
486 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
487 if (c_numelements>=200) {
496 /* ==================================================
498 * - goes through the transaction cache and
499 * - decides if a transaction should commit or abort
500 * ==================================================
502 int alttraverseCache() {
503 /* Create info to keep track of objects that can be locked */
504 int numoidrdlocked=0;
505 int numoidwrlocked=0;
506 void * rdlocked[200];
508 void * wrlocked[200];
514 if (c_numelements<200) {
515 oidrdlocked=rdlocked;
516 oidrdversion=rdversion;
517 oidwrlocked=wrlocked;
519 int size=c_numelements*sizeof(void*);
520 oidrdlocked=malloc(size);
521 oidrdversion=malloc(size);
522 oidwrlocked=malloc(size);
524 chashlistnode_t *curr = c_list;
525 /* Inner loop to traverse the linked list of the cache lookupTable */
526 while(curr != NULL) {
527 //if the first bin in hash table is empty
528 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
529 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
530 unsigned int version = headeraddr->version;
532 if(STATUS(headeraddr) & DIRTY) {
533 /* Read from the main heap and compare versions */
534 if(write_trylock(&header->lock)) { //can aquire write lock
535 if (version == header->version) { /* versions match */
536 /* Keep track of objects locked */
537 oidwrlocked[numoidwrlocked++] = OID(header);
539 oidwrlocked[numoidwrlocked++] = OID(header);
540 transAbortProcess(oidwrlocked, numoidwrlocked);
542 header->abortCount++;
543 (typesCausingAbort[TYPE(header)])++;
544 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
546 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
547 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
548 if (c_numelements>=200) {
555 } else { /* cannot aquire lock */
556 if(version == header->version) {
560 transAbortProcess(oidwrlocked, numoidwrlocked);
562 header->abortCount++;
563 (typesCausingAbort[TYPE(header)])++;
564 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
566 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
567 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
568 if (c_numelements>=200) {
577 /* Read from the main heap and compare versions */
578 oidrdversion[numoidrdlocked]=version;
579 oidrdlocked[numoidrdlocked++] = header;
583 //THIS IS THE SERIALIZATION POINT *****
584 for(i=0; i<numoidrdlocked; i++) {
585 objheader_t * header = oidrdlocked[i];
586 unsigned int version=oidrdversion[i];
587 if(header->lock>=0) {
588 if(version != header->version) {
589 transAbortProcess(oidwrlocked, numoidwrlocked);
591 header->abortCount++;
592 (typesCausingAbort[TYPE(header)])++;
593 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
595 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
596 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
597 if (c_numelements>=200) {
604 } else { /* cannot aquire lock */
605 if(version == header->version) {
608 transAbortProcess(oidwrlocked, numoidwrlocked);
610 header->abortCount++;
611 (typesCausingAbort[TYPE(header)])++;
612 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
614 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
615 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
616 if (c_numelements>=200) {
626 /* Decide the final response */
628 transAbortProcess(oidwrlocked, numoidwrlocked);
629 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
630 if (c_numelements>=200) {
635 return TRANS_SOFT_ABORT;
637 transCommitProcess(oidwrlocked, numoidwrlocked);
638 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
639 if (c_numelements>=200) {
649 /* ==================================
652 * =================================
654 int transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
657 /* Release read locks */
659 /* Release write locks */
660 for(i=0; i< numoidwrlocked; i++) {
661 /* Read from the main heap */
662 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
663 write_unlock(&header->lock);
667 /* clear trec and then release objects locked */
668 struct objlist *ptr=lockedobjs;
671 for(i=0; i<max; i++) {
672 header = (objheader_t *)((char *)(ptr->objs[i]) - sizeof(objheader_t));
674 pthread_mutex_unlock(&(header->objlock));
681 /* ==================================
684 * =================================
686 int transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
690 struct objlist *ptr=newobjs;
693 for(i=0; i<max; i++) {
695 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
700 /* Copy from transaction cache -> main object store */
701 for (i = 0; i < numoidwrlocked; i++) {
702 /* Read from the main heap */
703 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
705 GETSIZE(tmpsize, header);
706 struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
707 struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
708 dst->___cachedCode___=src->___cachedCode___;
709 dst->___cachedHash___=src->___cachedHash___;
710 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
711 header->version += 1;
714 /* Release write locks */
715 for(i=0; i< numoidwrlocked; i++) {
716 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
717 write_unlock(&header->lock);
721 /* clear trec and then release objects locked */
725 for(i=0; i<max; i++) {
726 header = (objheader_t *)(((char *)(ptr->objs[i])) - sizeof(objheader_t));
728 pthread_mutex_unlock(&(header->objlock));
729 //TODO printf("%s() Unlock type= %d\n", __func__, TYPE(header));
737 /** ========================================================================================
739 * params : start: start index of the loop
740 * : stop: stop index of the loop
741 * : startptr: pointer that points to where to start looking in the array/ linked list
742 * 'r'/'w' if found when visiting objects read/ objects modified
743 * =========================================================================================
746 void getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, char type) {
750 chashlistnode_t *curr = (chashlistnode_t *) startptr;
751 chashlistnode_t *ptr = c_table;
752 for(i = start; i < stop; i++) {
755 /* Inner loop to traverse the linked list of the cache lookupTable */
756 while(curr != NULL) {
757 if(curr->key == NULL)
759 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
760 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
761 unsigned int version = headeraddr->version;
762 /* versions do not match */
763 if(version != header->version) {
764 header->abortCount++;
765 (typesCausingAbort[TYPE(header)])++;
772 /* Go through oids read that are locked */
773 for(i = start; i < stop; i++) {
774 objheader_t *header = ((void **)startptr)[i];
775 unsigned int version = ((int *)checkptr)[i];
776 if(version != header->version) { /* versions do not match */
777 header->abortCount++;
778 (typesCausingAbort[TYPE(header)])++;
786 * params: Object header
787 * Locks an object that causes aborts
789 void needLock(objheader_t *header) {
790 if(pthread_mutex_trylock(&(header->objlock))) { //busy and failed to get locked
791 trec.blocked = 1; //set blocked flag
792 while(header->trec == NULL) { //retry
795 if(header->trec->blocked == 1) { //ignore locking
797 } else { //lock that blocks
798 pthread_mutex_lock(&(header->objlock));
799 //TODO printf("%s() Got lock on type= %d in second try\n", __func__, TYPE(header));
800 /* Reset blocked field */
803 header->trec = &trec;
805 } else { //acquired lock
806 //TODO printf("%s() Got lock on type= %d in first try\n", __func__, TYPE(header));
807 /* Reset blocked field */
810 header->trec = &trec;
812 /* Save the locked object */
813 if (lockedobjs->offset<MAXOBJLIST) {
814 lockedobjs->objs[lockedobjs->offset++]=OID(header);
816 struct objlist *tmp=malloc(sizeof(struct objlist));
817 tmp->next=lockedobjs;
818 tmp->objs[0]=OID(header);