1 /* ============================================================
3 * - single thread commit on local machine
4 * =============================================================
5 * Copyright (c) 2009, University of California, Irvine, USA.
9 * =============================================================
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
21 int numTransCommit = 0;
22 int numTransAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
29 int typesCausingAbort[TOTALNUMCLASSANDARRAY];
30 /******Keep track of objects and types causing aborts******/
31 #define DEBUGSTMSTAT(args...) { \
36 #define DEBUGSTMSTAT(args...)
40 #define DEBUGSTM(x...) printf(x);
42 #define DEBUGSTM(x...)
46 /* ==================================================
48 * This function starts up the transaction runtime.
49 * ==================================================
55 /* ======================================
57 * - create an object store of given size
58 * ======================================
60 objstr_t *objstrCreate(unsigned int size) {
62 if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
63 printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
68 tmp->top = tmp + 1; //points to end of objstr_t structure!
73 while(t_cache->next!=NULL) {
74 objstr_t *next=t_cache->next;
75 t_cache->next=t_reserve;
79 t_cache->top=t_cache+1;
82 //free entire list, starting at store
83 void objstrDelete(objstr_t *store) {
85 while (store != NULL) {
93 /* =================================================
95 * This function initializes things required in the
97 * =================================================
100 //Transaction start is currently free...commit and aborting is not
103 /* =======================================================
105 * This function creates objects in the transaction record
106 * =======================================================
108 objheader_t *transCreateObj(void * ptr, unsigned int size) {
109 objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
110 objheader_t *retval=&tmp[1];
111 tmp->lock=RW_LOCK_BIAS;
114 // don't insert into table
115 if (newobjs->offset<MAXOBJLIST) {
116 newobjs->objs[newobjs->offset++]=retval;
118 struct objlist *tmp=malloc(sizeof(struct objlist));
124 return retval; //want space after object header
127 /* This functions inserts randowm wait delays in the order of msec
128 * Mostly used when transaction commits retry*/
129 void randomdelay(int softaborted) {
133 gettimeofday(&t,NULL);
136 req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
137 nanosleep(&req, NULL);
141 /* ==============================================
143 * - allocate space in an object store
144 * ==============================================
146 void *objstrAlloc(unsigned int size) {
149 objstr_t *store=t_cache;
155 if (OSFREE(store)>=size) {
160 if ((store=store->next)==NULL)
165 unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE ? size : DEFAULT_OBJ_STORE_SIZE;
166 objstr_t **otmp=&t_reserve;
168 while((ptr=*otmp)!=NULL) {
169 if (ptr->size>=newsize) {
174 ptr->top=((char *)(&ptr[1]))+size;
179 objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
184 os->top=((char *)nptr)+size;
189 /* =============================================================
191 * -finds the objects either in main heap
192 * -copies the object into the transaction cache
193 * =============================================================
195 __attribute__((pure)) void *transRead(void * oid) {
196 objheader_t *tmp, *objheader;
197 objheader_t *objcopy;
200 /* Read from the main heap */
202 objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
203 GETSIZE(size, header);
204 size += sizeof(objheader_t);
205 objcopy = (objheader_t *) objstrAlloc(size);
206 memcpy(objcopy, header, size);
208 header->accessCount++;
210 /* Insert into cache's lookup table */
212 t_chashInsert(oid, &objcopy[1]);
217 struct objlist *ptr=newobjs;
218 while(ptr->next!=NULL) {
219 struct objlist *tmp=ptr->next;
227 /* ================================================================
229 * - This function initiates the transaction commit process
230 * - goes through the transaction cache and decides
232 * ================================================================
237 /* Look through all the objects in the transaction hash table */
239 if (c_numelements<(c_size>>3))
240 finalResponse= alttraverseCache();
242 finalResponse= traverseCache();
243 if(finalResponse == TRANS_ABORT) {
255 if(finalResponse == TRANS_COMMIT) {
267 /* wait a random amount of time before retrying to commit transaction*/
268 if(finalResponse == TRANS_SOFT_ABORT) {
274 //retry if too many soft aborts
280 randomdelay(softaborted);
282 printf("Error: in %s() Unknown outcome", __func__);
288 /* ==================================================
290 * - goes through the transaction cache and
291 * - decides if a transaction should commit or abort
292 * ==================================================
294 int traverseCache() {
295 /* Create info to keep track of objects that can be locked */
296 int numoidrdlocked=0;
297 int numoidwrlocked=0;
298 void * rdlocked[200];
300 void * wrlocked[200];
306 if (c_numelements<200) {
307 oidrdlocked=rdlocked;
308 oidrdversion=rdversion;
309 oidwrlocked=wrlocked;
311 int size=c_numelements*sizeof(void*);
312 oidrdlocked=malloc(size);
313 oidrdversion=malloc(size);
314 oidwrlocked=malloc(size);
316 chashlistnode_t *ptr = c_table;
317 /* Represents number of bins in the chash table */
318 unsigned int size = c_size;
319 for(i = 0; i<size; i++) {
320 chashlistnode_t *curr = &ptr[i];
321 /* Inner loop to traverse the linked list of the cache lookupTable */
322 while(curr != NULL) {
323 //if the first bin in hash table is empty
324 if(curr->key == NULL)
326 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
327 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
328 unsigned int version = headeraddr->version;
330 if(STATUS(headeraddr) & DIRTY) {
331 /* Read from the main heap and compare versions */
332 if(write_trylock(&header->lock)) { //can aquire write lock
333 if (version == header->version) { /* versions match */
334 /* Keep track of objects locked */
335 oidwrlocked[numoidwrlocked++] = OID(header);
337 oidwrlocked[numoidwrlocked++] = OID(header);
338 transAbortProcess(oidwrlocked, numoidwrlocked);
340 header->abortCount++;
341 (typesCausingAbort[TYPE(header)])++;
342 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
344 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
345 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
346 if (c_numelements>=200) {
353 } else { /* cannot aquire lock */
354 if(version == header->version) {
358 transAbortProcess(oidwrlocked, numoidwrlocked);
360 header->abortCount++;
361 (typesCausingAbort[TYPE(header)])++;
362 getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
364 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
365 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
366 if (c_numelements>=200) {
375 oidrdversion[numoidrdlocked]=version;
376 oidrdlocked[numoidrdlocked++] = header;
382 //THIS IS THE SERIALIZATION POINT *****
384 for(i=0; i<numoidrdlocked; i++) {
385 /* Read from the main heap and compare versions */
386 objheader_t *header=oidrdlocked[i];
387 unsigned int version=oidrdversion[i];
388 if(header->lock>0) { //not write locked
389 if(version != header->version) { /* versions do not match */
390 oidrdlocked[numoidrdlocked++] = OID(header);
391 transAbortProcess(oidwrlocked, numoidwrlocked);
393 header->abortCount++;
394 (typesCausingAbort[TYPE(header)])++;
395 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
397 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
398 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
399 if (c_numelements>=200) {
406 } else { /* cannot aquire lock */
407 //do increment as we didn't get lock
408 if(version == header->version) {
411 transAbortProcess(oidwrlocked, numoidwrlocked);
413 header->abortCount++;
414 (typesCausingAbort[TYPE(header)])++;
415 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
417 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
418 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
419 if (c_numelements>=200) {
429 /* Decide the final response */
431 transAbortProcess(oidwrlocked, numoidwrlocked);
432 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
433 if (c_numelements>=200) {
438 return TRANS_SOFT_ABORT;
440 transCommitProcess(oidwrlocked, numoidwrlocked);
441 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
442 if (c_numelements>=200) {
451 /* ==================================================
453 * - goes through the transaction cache and
454 * - decides if a transaction should commit or abort
455 * ==================================================
457 int alttraverseCache() {
458 /* Create info to keep track of objects that can be locked */
459 int numoidrdlocked=0;
460 int numoidwrlocked=0;
461 void * rdlocked[200];
463 void * wrlocked[200];
469 if (c_numelements<200) {
470 oidrdlocked=rdlocked;
471 oidrdversion=rdversion;
472 oidwrlocked=wrlocked;
474 int size=c_numelements*sizeof(void*);
475 oidrdlocked=malloc(size);
476 oidrdversion=malloc(size);
477 oidwrlocked=malloc(size);
479 chashlistnode_t *curr = c_list;
480 /* Inner loop to traverse the linked list of the cache lookupTable */
481 while(curr != NULL) {
482 //if the first bin in hash table is empty
483 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
484 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
485 unsigned int version = headeraddr->version;
487 if(STATUS(headeraddr) & DIRTY) {
488 /* Read from the main heap and compare versions */
489 if(write_trylock(&header->lock)) { //can aquire write lock
490 if (version == header->version) { /* versions match */
491 /* Keep track of objects locked */
492 oidwrlocked[numoidwrlocked++] = OID(header);
494 oidwrlocked[numoidwrlocked++] = OID(header);
495 transAbortProcess(oidwrlocked, numoidwrlocked);
497 header->abortCount++;
498 (typesCausingAbort[TYPE(header)])++;
499 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
501 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
502 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
503 if (c_numelements>=200) {
510 } else { /* cannot aquire lock */
511 if(version == header->version) {
515 transAbortProcess(oidwrlocked, numoidwrlocked);
517 header->abortCount++;
518 (typesCausingAbort[TYPE(header)])++;
519 getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
521 DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
522 DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
523 if (c_numelements>=200) {
532 /* Read from the main heap and compare versions */
533 oidrdversion[numoidrdlocked]=version;
534 oidrdlocked[numoidrdlocked++] = header;
538 //THIS IS THE SERIALIZATION POINT *****
539 for(i=0; i<numoidrdlocked; i++) {
540 objheader_t * header = oidrdlocked[i];
541 unsigned int version=oidrdversion[i];
542 if(header->lock>=0) {
543 if(version != header->version) {
544 transAbortProcess(oidwrlocked, numoidwrlocked);
546 header->abortCount++;
547 (typesCausingAbort[TYPE(header)])++;
548 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
550 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
551 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
552 if (c_numelements>=200) {
559 } else { /* cannot aquire lock */
560 if(version == header->version) {
563 transAbortProcess(oidwrlocked, numoidwrlocked);
565 header->abortCount++;
566 (typesCausingAbort[TYPE(header)])++;
567 getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
569 DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
570 DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
571 if (c_numelements>=200) {
581 /* Decide the final response */
583 transAbortProcess(oidwrlocked, numoidwrlocked);
584 DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
585 if (c_numelements>=200) {
590 return TRANS_SOFT_ABORT;
592 transCommitProcess(oidwrlocked, numoidwrlocked);
593 DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
594 if (c_numelements>=200) {
604 /* ==================================
607 * =================================
609 int transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
612 /* Release read locks */
614 /* Release write locks */
615 for(i=0; i< numoidwrlocked; i++) {
616 /* Read from the main heap */
617 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
618 write_unlock(&header->lock);
622 /* ==================================
625 * =================================
627 int transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
631 struct objlist *ptr=newobjs;
634 for(i=0; i<max; i++) {
636 ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
641 /* Copy from transaction cache -> main object store */
642 for (i = 0; i < numoidwrlocked; i++) {
643 /* Read from the main heap */
644 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
646 GETSIZE(tmpsize, header);
647 struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
648 struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
649 dst->___cachedCode___=src->___cachedCode___;
650 dst->___cachedHash___=src->___cachedHash___;
651 memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
652 header->version += 1;
655 /* Release write locks */
656 for(i=0; i< numoidwrlocked; i++) {
657 header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
658 write_unlock(&header->lock);
663 /** ========================================================================================
665 * params : start: start index of the loop
666 * : stop: stop index of the loop
667 * : startptr: pointer that points to where to start looking in the array/ linked list
668 * 'r'/'w' if found when visiting objects read/ objects modified
669 * =========================================================================================
672 void getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, char type) {
673 printf("Inside %s()\n", __func__);
677 chashlistnode_t *curr = (chashlistnode_t *) startptr;
678 chashlistnode_t *ptr = c_table;
679 for(i = start; i < stop; i++) {
682 /* Inner loop to traverse the linked list of the cache lookupTable */
683 while(curr != NULL) {
684 if(curr->key == NULL)
686 objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
687 objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
688 unsigned int version = headeraddr->version;
689 /* versions do not match */
690 if(version != header->version) {
691 header->abortCount++;
692 (typesCausingAbort[TYPE(header)])++;
699 /* Go through oids read that are locked */
700 for(i = start; i < stop; i++) {
701 objheader_t *header = ((void **)startptr)[i];
702 unsigned int version = ((int *)checkptr)[i];
703 if(version != header->version) { /* versions do not match */
704 header->abortCount++;
705 (typesCausingAbort[TYPE(header)])++;
711 void getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, char type) {