1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
16 /* Per thread transaction variables */
17 __thread objstr_t *t_cache;
18 __thread objstr_t *t_reserve;
19 __thread struct objlist * newobjs;
22 int numTransCommit = 0;
23 int numTransAbort = 0;
25 int nSoftAbortCommit = 0;
26 int nSoftAbortAbort = 0;
30 /* Thread variable for locking/unlocking */
31 __thread threadrec_t *trec;
32 __thread struct objlist * lockedobjs;
34 int typesCausingAbort[TOTALNUMCLASSANDARRAY];
35 /******Keep track of objects and types causing aborts******/
36 /* TODO uncomment for later use
37 #define DEBUGSTMSTAT(args...) { \
42 #define DEBUGSTMSTAT(args...)
44 #define DEBUGSTMSTAT(args...)
48 #define DEBUGSTM(x...) printf(x);
50 #define DEBUGSTM(x...)
54 void * A_memcpy (void * dest, const void * src, size_t count);
56 #define A_memcpy memcpy
60 /*** Global variables *****/
61 objlockstate_t *objlockscope;
64 * params: object header
65 * Increments the abort count for each object
67 void ABORTCOUNT(objheader_t * x) {
69 if (x->abortCount > MAXABORTS && (x->riskyflag != 1)) {
70 //makes riskflag sticky
71 pthread_mutex_lock(&lockedobjstore);
72 if (objlockscope->offset<MAXOBJLIST) {
73 x->objlock=&(objlockscope->lock[objlockscope->offset++]);
75 objlockstate_t *tmp=malloc(sizeof(objlockstate_t));
76 tmp->next=objlockscope;
78 x->objlock=&(tmp->lock[0]);
81 pthread_mutex_unlock(&lockedobjstore);
82 pthread_mutex_init(x->objlock, NULL);
83 //should put a memory barrier here
89 /* ==================================================
91 * This function starts up the transaction runtime.
92 * ==================================================
98 /* ======================================
100 * - create an object store of given size
101 * ======================================
103 objstr_t *objstrCreate(unsigned int size) {
105 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
106 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
111 tmp->top = tmp + 1; //points to end of objstr_t structure!
116 while(t_cache->next!=NULL) {
117 objstr_t *next=t_cache->next;
118 t_cache->next=t_reserve;
122 t_cache->top=t_cache+1;
125 //free entire list, starting at store
126 void objstrDelete(objstr_t *store) {
128 while (store != NULL) {
136 /* =================================================
138 * This function initializes things required in the
140 * =================================================
143 //Transaction start is currently free...commit and aborting is not
146 /* =======================================================
148 * This function creates objects in the transaction record
149 * =======================================================
151 objheader_t *transCreateObj(void * ptr, unsigned int size) {
152 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
153 objheader_t *retval=&tmp[1];
154 tmp->lock=RW_LOCK_BIAS;
156 //initialize obj lock to the header
158 // don't insert into table
159 if (newobjs->offset<MAXOBJLIST) {
160 newobjs->objs[newobjs->offset++]=retval;
162 struct objlist *tmp=malloc(sizeof(struct objlist));
168 return retval; //want space after object header
171 /* This functions inserts randowm wait delays in the order of msec
172 * Mostly used when transaction commits retry*/
173 void randomdelay(int softaborted) {
177 gettimeofday(&t,NULL);
180 req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
181 nanosleep(&req, NULL);
185 /* ==============================================
187 * - allocate space in an object store
188 * ==============================================
190 void *objstrAlloc(unsigned int size) {
193 objstr_t *store=t_cache;
199 if (OSFREE(store)>=size) {
204 if ((store=store->next)==NULL)
209 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE ? size : DEFAULT_OBJ_STORE_SIZE;
210 objstr_t **otmp=&t_reserve;
212 while((ptr=*otmp)!=NULL) {
213 if (ptr->size>=newsize) {
218 ptr->top=((char *)(&ptr[1]))+size;
223 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
228 os->top=((char *)nptr)+size;
233 /* =============================================================
235 * -finds the objects either in main heap
236 * -copies the object into the transaction cache
237 * =============================================================
239 __attribute__((pure)) void *transRead(void * oid, void *gl) {
240 objheader_t *tmp, *objheader;
241 objheader_t *objcopy;
244 /* Read from the main heap */
246 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
247 GETSIZE(size, header);
248 size += sizeof(objheader_t);
249 objcopy = (objheader_t *) objstrAlloc(size);
251 header->accessCount++;
252 //FIXME riskratio fix
253 //float riskratio = ((header->abortCount)/(header->accessCount));
254 //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d, riskratio: %f\n", TYPE(header), header->abortCount, header->accessCount, riskratio);
255 //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d\n", TYPE(header), header->abortCount, header->accessCount);
256 //if(header->abortCount > MAXABORTS && riskratio > NEED_LOCK_THRESHOLD) {
257 if(header->riskyflag) {
258 header=needLock(header,gl);
261 A_memcpy(objcopy, header, size);
262 /* Insert into cache's lookup table */
264 t_chashInsert(oid, &objcopy[1]);
269 struct objlist *ptr=newobjs;
270 while(ptr->next!=NULL) {
271 struct objlist *tmp=ptr->next;
280 void freelockedobjs() {
281 struct objlist *ptr=lockedobjs;
282 while(ptr->next!=NULL) {
283 struct objlist *tmp=ptr->next;
292 /* ================================================================
294 * - This function initiates the transaction commit process
295 * - goes through the transaction cache and decides
297 * ================================================================
302 /* Look through all the objects in the transaction hash table */
304 if (c_numelements<(c_size>>3))
305 finalResponse= alttraverseCache();
307 finalResponse= traverseCache();
308 if(finalResponse == TRANS_ABORT) {
323 if(finalResponse == TRANS_COMMIT) {
338 /* wait a random amount of time before retrying to commit transaction*/
339 if(finalResponse == TRANS_SOFT_ABORT) {
345 //retry if too many soft aborts
354 randomdelay(softaborted);
356 printf("Error: in %s() Unknown outcome", __func__);
362 /* ==================================================
364 * - goes through the transaction cache and
365 * - decides if a transaction should commit or abort
366 * ==================================================
368 int traverseCache() {
369 /* Create info to keep track of objects that can be locked */
370 int numoidrdlocked=0;
371 int numoidwrlocked=0;
372 void * rdlocked[200];
374 void * wrlocked[200];
380 if (c_numelements<200) {
381 oidrdlocked=rdlocked;
382 oidrdversion=rdversion;
383 oidwrlocked=wrlocked;
385 int size=c_numelements*sizeof(void*);
386 oidrdlocked=malloc(size);
387 oidrdversion=malloc(size);
388 oidwrlocked=malloc(size);
390 chashlistnode_t *ptr = c_table;
391 /* Represents number of bins in the chash table */
392 unsigned int size = c_size;
393 for(i = 0; i<size; i++) {
394 chashlistnode_t *curr = &ptr[i];
395 /* Inner loop to traverse the linked list of the cache lookupTable */
396 while(curr != NULL) {
397 //if the first bin in hash table is empty
398 if(curr->key == NULL)
400 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
401 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
402 unsigned int version = headeraddr->version;
404 if(STATUS(headeraddr) & DIRTY) {
405 /* Read from the main heap and compare versions */
406 if(write_trylock(&header->lock)) { //can aquire write lock
407 if (version == header->version) { /* versions match */
408 /* Keep track of objects locked */
409 oidwrlocked[numoidwrlocked++] = OID(header);
411 oidwrlocked[numoidwrlocked++] = OID(header);
412 transAbortProcess(oidwrlocked, numoidwrlocked);
415 (typesCausingAbort[TYPE(header)])++;
416 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1);
418 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
419 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
420 if (c_numelements>=200) {
427 } else { /* cannot aquire lock */
428 if(version == header->version) {
432 transAbortProcess(oidwrlocked, numoidwrlocked);
435 (typesCausingAbort[TYPE(header)])++;
436 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 1);
438 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
439 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
440 if (c_numelements>=200) {
449 oidrdversion[numoidrdlocked]=version;
450 oidrdlocked[numoidrdlocked++] = header;
456 //THIS IS THE SERIALIZATION POINT *****
458 for(i=0; i<numoidrdlocked; i++) {
459 /* Read from the main heap and compare versions */
460 objheader_t *header=oidrdlocked[i];
461 unsigned int version=oidrdversion[i];
462 if(header->lock>0) { //not write locked
463 if(version != header->version) { /* versions do not match */
464 oidrdlocked[numoidrdlocked++] = OID(header);
465 transAbortProcess(oidwrlocked, numoidwrlocked);
468 (typesCausingAbort[TYPE(header)])++;
469 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 0);
471 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
472 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
473 if (c_numelements>=200) {
480 } else { /* cannot aquire lock */
481 //do increment as we didn't get lock
482 if(version == header->version) {
485 transAbortProcess(oidwrlocked, numoidwrlocked);
488 (typesCausingAbort[TYPE(header)])++;
489 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 0);
491 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
492 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
493 if (c_numelements>=200) {
503 /* Decide the final response */
505 transAbortProcess(oidwrlocked, numoidwrlocked);
506 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
507 if (c_numelements>=200) {
512 return TRANS_SOFT_ABORT;
514 transCommitProcess(oidwrlocked, numoidwrlocked);
515 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
516 if (c_numelements>=200) {
525 /* ==================================================
527 * - goes through the transaction cache and
528 * - decides if a transaction should commit or abort
529 * ==================================================
531 int alttraverseCache() {
532 /* Create info to keep track of objects that can be locked */
533 int numoidrdlocked=0;
534 int numoidwrlocked=0;
535 void * rdlocked[200];
537 void * wrlocked[200];
543 if (c_numelements<200) {
544 oidrdlocked=rdlocked;
545 oidrdversion=rdversion;
546 oidwrlocked=wrlocked;
548 int size=c_numelements*sizeof(void*);
549 oidrdlocked=malloc(size);
550 oidrdversion=malloc(size);
551 oidwrlocked=malloc(size);
553 chashlistnode_t *curr = c_list;
554 /* Inner loop to traverse the linked list of the cache lookupTable */
555 while(curr != NULL) {
556 //if the first bin in hash table is empty
557 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
558 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
559 unsigned int version = headeraddr->version;
561 if(STATUS(headeraddr) & DIRTY) {
562 /* Read from the main heap and compare versions */
563 if(write_trylock(&header->lock)) { //can aquire write lock
564 if (version == header->version) { /* versions match */
565 /* Keep track of objects locked */
566 oidwrlocked[numoidwrlocked++] = OID(header);
568 oidwrlocked[numoidwrlocked++] = OID(header);
569 transAbortProcess(oidwrlocked, numoidwrlocked);
572 (typesCausingAbort[TYPE(header)])++;
573 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1);
575 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
576 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
577 if (c_numelements>=200) {
584 } else { /* cannot aquire lock */
585 if(version == header->version) {
589 transAbortProcess(oidwrlocked, numoidwrlocked);
592 (typesCausingAbort[TYPE(header)])++;
593 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 1);
595 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
596 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
597 if (c_numelements>=200) {
606 /* Read from the main heap and compare versions */
607 oidrdversion[numoidrdlocked]=version;
608 oidrdlocked[numoidrdlocked++] = header;
612 //THIS IS THE SERIALIZATION POINT *****
613 for(i=0; i<numoidrdlocked; i++) {
614 objheader_t * header = oidrdlocked[i];
615 unsigned int version=oidrdversion[i];
616 if(header->lock>=0) {
617 if(version != header->version) {
618 transAbortProcess(oidwrlocked, numoidwrlocked);
621 (typesCausingAbort[TYPE(header)])++;
622 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0);
624 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
625 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
626 if (c_numelements>=200) {
633 } else { /* cannot aquire lock */
634 if(version == header->version) {
637 transAbortProcess(oidwrlocked, numoidwrlocked);
640 (typesCausingAbort[TYPE(header)])++;
641 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 0);
643 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
644 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
645 if (c_numelements>=200) {
655 /* Decide the final response */
657 transAbortProcess(oidwrlocked, numoidwrlocked);
658 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
659 if (c_numelements>=200) {
664 return TRANS_SOFT_ABORT;
666 transCommitProcess(oidwrlocked, numoidwrlocked);
667 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
668 if (c_numelements>=200) {
678 /* ==================================
681 * =================================
683 int transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
686 /* Release read locks */
688 /* Release write locks */
689 for(i=0; i< numoidwrlocked; i++) {
690 /* Read from the main heap */
691 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
692 write_unlock(&header->lock);
696 /* clear trec and then release objects locked */
697 struct objlist *ptr=lockedobjs;
700 for(i=0; i<max; i++) {
701 header = (objheader_t *)((char *)(ptr->objs[i]) - sizeof(objheader_t));
703 pthread_mutex_unlock(header->objlock);
710 /* ==================================
713 * =================================
715 int transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
719 struct objlist *ptr=newobjs;
722 for(i=0; i<max; i++) {
724 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
729 /* Copy from transaction cache -> main object store */
730 for (i = 0; i < numoidwrlocked; i++) {
731 /* Read from the main heap */
732 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
734 GETSIZE(tmpsize, header);
735 struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
736 struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
737 dst->___cachedCode___=src->___cachedCode___;
738 dst->___cachedHash___=src->___cachedHash___;
739 A_memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
740 __asm__ __volatile__("": : :"memory");
743 __asm__ __volatile__("": : :"memory");
745 /* Release write locks */
746 for(i=0; i< numoidwrlocked; i++) {
747 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
748 write_unlock(&header->lock);
752 /* clear trec and then release objects locked */
756 for(i=0; i<max; i++) {
757 header = (objheader_t *)(((char *)(ptr->objs[i])) - sizeof(objheader_t));
759 pthread_mutex_unlock(header->objlock);
768 /** ========================================================================================
770 * params : start: start index of the loop
771 * : stop: stop index of the loop
772 * : startptr: pointer that points to where to start looking in the array/ linked list
773 * 0='r'/1='w' if found when visiting objects read/ objects modified
774 * =========================================================================================
777 void getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, int type) {
781 chashlistnode_t *curr = (chashlistnode_t *) startptr;
782 chashlistnode_t *ptr = c_table;
783 for(i = start; i < stop; i++) {
786 /* Inner loop to traverse the linked list of the cache lookupTable */
787 while(curr != NULL) {
788 if(curr->key == NULL)
790 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
791 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
792 unsigned int version = headeraddr->version;
793 /* versions do not match */
794 if(version != header->version) {
796 (typesCausingAbort[TYPE(header)])++;
803 /* Go through oids read that are locked */
804 for(i = start; i < stop; i++) {
805 objheader_t *header = ((void **)startptr)[i];
806 unsigned int version = ((int *)checkptr)[i];
807 if(version != header->version) { /* versions do not match */
809 (typesCausingAbort[TYPE(header)])++;
817 * params: Object header
818 * Locks an object that causes aborts
820 objheader_t * needLock(objheader_t *header, void *gl) {
823 while((lockstatus = pthread_mutex_trylock(header->objlock))
824 && ((ptr = header->trec) == NULL)) { //retry
827 if(lockstatus==0) { //acquired lock
830 } else { //failed to get lock
833 __asm__ __volatile__("":::"memory");
834 //see if other thread is blocked
835 if(ptr->blocked == 1) {
836 //it might be block, so ignore lock and clear our blocked flag
841 INTPTR ptrarray[]={1, (INTPTR)gl, (INTPTR) header};
842 void *lockptr=header->objlock;
843 stopforgc((struct garbagelist *)ptrarray);
844 //grab lock and wait our turn
845 pthread_mutex_lock(lockptr);
847 header=(objheader_t *) ptrarray[2];
849 pthread_mutex_lock(header->objptr);
851 /* we have lock, so we are not blocked anymore */
857 //trec->blocked is zero now
859 /* Save the locked object */
860 if (lockedobjs->offset<MAXOBJLIST) {
861 lockedobjs->objs[lockedobjs->offset++]=OID(header);
863 struct objlist *tmp=malloc(sizeof(struct objlist));
864 tmp->next=lockedobjs;
865 tmp->objs[0]=OID(header);