locking technique for risky objects enabled
[IRC.git] / Robust / src / Runtime / STM / stm.c
1 /* ============================================================
2  * singleTMCommit.c
3  * - single thread commit on local machine
4  * =============================================================
5  * Copyright (c) 2009, University of California, Irvine, USA.
6  * All rights reserved.
7  * Author: Alokika Dash
8  *         adash@uci.edu
9  * =============================================================
10  *
11  */
12
13 #include "tm.h"
14 #include "garbage.h"
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
19
20 #ifdef TRANSSTATS
21 int numTransCommit = 0;
22 int numTransAbort = 0;
23 int nSoftAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
26 #endif
27
28 #ifdef STMSTATS
29 /* Thread variable for locking/unlocking */
30 __thread threadrec_t trec;
31 __thread struct objlist * lockedobjs;
32 int typesCausingAbort[TOTALNUMCLASSANDARRAY];
33 /******Keep track of objects and types causing aborts******/
34 /*TODO
35 #define DEBUGSTMSTAT(args...) { \
36   printf(args); \
37   fflush(stdout); \
38 }
39 */
40 #define DEBUGSTMSTAT(args...)
41 #else
42 #define DEBUGSTMSTAT(args...)
43 #endif
44
45 #ifdef STMDEBUG
46 #define DEBUGSTM(x...) printf(x);
47 #else
48 #define DEBUGSTM(x...)
49 #endif
50
51
52 /* ==================================================
53  * stmStartup
54  * This function starts up the transaction runtime.
55  * ==================================================
56  */
57 int stmStartup() {
58   return 0;
59 }
60
61 /* ======================================
62  * objstrCreate
63  * - create an object store of given size
64  * ======================================
65  */
66 objstr_t *objstrCreate(unsigned int size) {
67   objstr_t *tmp;
68   if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
69     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
70     return NULL;
71   }
72   tmp->size = size;
73   tmp->next = NULL;
74   tmp->top = tmp + 1; //points to end of objstr_t structure!
75   return tmp;
76 }
77
78 void objstrReset() {
79   while(t_cache->next!=NULL) {
80     objstr_t *next=t_cache->next;
81     t_cache->next=t_reserve;
82     t_reserve=t_cache;
83     t_cache=next;
84   }
85   t_cache->top=t_cache+1;
86 }
87
88 //free entire list, starting at store
89 void objstrDelete(objstr_t *store) {
90   objstr_t *tmp;
91   while (store != NULL) {
92     tmp = store->next;
93     free(store);
94     store = tmp;
95   }
96   return;
97 }
98
99 /* =================================================
100  * transStart
101  * This function initializes things required in the
102  * transaction start
103  * =================================================
104  */
105 void transStart() {
106   //Transaction start is currently free...commit and aborting is not
107 }
108
109 /* =======================================================
110  * transCreateObj
111  * This function creates objects in the transaction record
112  * =======================================================
113  */
114 objheader_t *transCreateObj(void * ptr, unsigned int size) {
115   objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
116   objheader_t *retval=&tmp[1];
117   tmp->lock=RW_LOCK_BIAS;
118   tmp->version = 1;
119   tmp->abortCount = 0;
120   tmp->accessCount = 0;
121   tmp->riskyflag = 0;
122   tmp->trec = NULL;
123   //initialize obj lock
124   pthread_mutex_init(&tmp->objlock, NULL);
125   STATUS(tmp)=NEW;
126   // don't insert into table
127   if (newobjs->offset<MAXOBJLIST) {
128     newobjs->objs[newobjs->offset++]=retval;
129   } else {
130     struct objlist *tmp=malloc(sizeof(struct objlist));
131     tmp->next=newobjs;
132     tmp->objs[0]=retval;
133     tmp->offset=1;
134     newobjs=tmp;
135   }
136   return retval; //want space after object header
137 }
138
139 /* This functions inserts randowm wait delays in the order of msec
140  * Mostly used when transaction commits retry*/
141 void randomdelay(int softaborted) {
142   struct timespec req;
143   struct timeval t;
144
145   gettimeofday(&t,NULL);
146
147   req.tv_sec = 0;
148   req.tv_nsec = (long)((t.tv_usec)%(1<<softaborted))<<1; //1-11 microsec
149   nanosleep(&req, NULL);
150   return;
151 }
152
153 /* ==============================================
154  * objstrAlloc
155  * - allocate space in an object store
156  * ==============================================
157  */
158 void *objstrAlloc(unsigned int size) {
159   void *tmp;
160   int i=0;
161   objstr_t *store=t_cache;
162   if ((size&7)!=0) {
163     size+=(8-(size&7));
164   }
165
166   for(; i<2; i++) {
167     if (OSFREE(store)>=size) {
168       tmp=store->top;
169       store->top +=size;
170       return tmp;
171     }
172     if ((store=store->next)==NULL)
173       break;
174   }
175
176   {
177     unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE ? size : DEFAULT_OBJ_STORE_SIZE;
178     objstr_t **otmp=&t_reserve;
179     objstr_t *ptr;
180     while((ptr=*otmp)!=NULL) {
181       if (ptr->size>=newsize) {
182         //remove from list
183         *otmp=ptr->next;
184         ptr->next=t_cache;
185         t_cache=ptr;
186         ptr->top=((char *)(&ptr[1]))+size;
187         return &ptr[1];
188       }
189     }
190
191     objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
192     void *nptr=&os[1];
193     os->next=t_cache;
194     t_cache=os;
195     os->size=newsize;
196     os->top=((char *)nptr)+size;
197     return nptr;
198   }
199 }
200
201 /* =============================================================
202  * transRead
203  * -finds the objects either in main heap
204  * -copies the object into the transaction cache
205  * =============================================================
206  */
207 __attribute__((pure)) void *transRead(void * oid) {
208   objheader_t *tmp, *objheader;
209   objheader_t *objcopy;
210   int size;
211
212   /* Read from the main heap */
213   //No lock for now
214   objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t));
215   GETSIZE(size, header);
216   size += sizeof(objheader_t);
217   objcopy = (objheader_t *) objstrAlloc(size);
218   memcpy(objcopy, header, size);
219 #ifdef STMSTATS
220   header->accessCount++;
221   //FIXME riskratio fix
222   //float riskratio = ((header->abortCount)/(header->accessCount));
223   //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d, riskratio: %f\n", TYPE(header), header->abortCount, header->accessCount, riskratio);
224   //DEBUGSTMSTAT("type: %d, header->abortCount: %d, header->accessCount: %d\n", TYPE(header), header->abortCount, header->accessCount);
225   //if(header->abortCount > MAXABORTS &&  riskratio > NEED_LOCK_THRESHOLD) {
226   if(header->abortCount > MAXABORTS) {
227     /* Set risky flag */
228     header->riskyflag = 1;
229     /* Need locking */
230     needLock(header);
231   }
232 #endif
233   /* Insert into cache's lookup table */
234   STATUS(objcopy)=0;
235   t_chashInsert(oid, &objcopy[1]);
236   return &objcopy[1];
237 }
238
239 void freenewobjs() {
240   struct objlist *ptr=newobjs;
241   while(ptr->next!=NULL) {
242     struct objlist *tmp=ptr->next;
243     free(ptr);
244     ptr=tmp;
245   }
246   ptr->offset=0;
247   newobjs=ptr;
248 }
249
250 #ifdef STMSTATS
251 void freelockedobjs() {
252   struct objlist *ptr=lockedobjs;
253   while(ptr->next!=NULL) {
254     struct objlist *tmp=ptr->next;
255     free(ptr);
256     ptr=tmp;
257   }
258   ptr->offset=0;
259   lockedobjs=ptr;
260 }
261 #endif
262
263 /* ================================================================
264  * transCommit
265  * - This function initiates the transaction commit process
266  * - goes through the transaction cache and decides
267  * - a final response
268  * ================================================================
269  */
270 int transCommit() {
271   int softaborted=0;
272   do {
273     /* Look through all the objects in the transaction hash table */
274     int finalResponse;
275     if (c_numelements<(c_size>>3))
276       finalResponse= alttraverseCache();
277     else
278       finalResponse= traverseCache();
279     if(finalResponse == TRANS_ABORT) {
280 #ifdef TRANSSTATS
281       numTransAbort++;
282       if (softaborted) {
283         nSoftAbortAbort++;
284       }
285 #endif
286       freenewobjs();
287 #ifdef STMSTATS
288       freelockedobjs();
289 #endif
290       objstrReset();
291       t_chashreset();
292       return TRANS_ABORT;
293     }
294     if(finalResponse == TRANS_COMMIT) {
295 #ifdef TRANSSTATS
296       numTransCommit++;
297       if (softaborted) {
298         nSoftAbortCommit++;
299       }
300 #endif
301       freenewobjs();
302 #ifdef STMSTATS
303       freelockedobjs();
304 #endif
305       objstrReset();
306       t_chashreset();
307       return 0;
308     }
309     /* wait a random amount of time before retrying to commit transaction*/
310     if(finalResponse == TRANS_SOFT_ABORT) {
311 #ifdef TRANSSTATS
312       nSoftAbort++;
313 #endif
314       softaborted++;
315       if (softaborted>4) {
316         //retry if too many soft aborts
317         freenewobjs();
318 #ifdef STMSTATS
319     freelockedobjs();
320 #endif
321         objstrReset();
322         t_chashreset();
323         return TRANS_ABORT;
324       }
325       randomdelay(softaborted);
326     } else {
327       printf("Error: in %s() Unknown outcome", __func__);
328       exit(-1);
329     }
330   } while (1);
331 }
332
333 /* ==================================================
334  * traverseCache
335  * - goes through the transaction cache and
336  * - decides if a transaction should commit or abort
337  * ==================================================
338  */
339 int traverseCache() {
340   /* Create info to keep track of objects that can be locked */
341   int numoidrdlocked=0;
342   int numoidwrlocked=0;
343   void * rdlocked[200];
344   int rdversion[200];
345   void * wrlocked[200];
346   int softabort=0;
347   int i;
348   void ** oidrdlocked;
349   void ** oidwrlocked;
350   int * oidrdversion;
351   if (c_numelements<200) {
352     oidrdlocked=rdlocked;
353     oidrdversion=rdversion;
354     oidwrlocked=wrlocked;
355   } else {
356     int size=c_numelements*sizeof(void*);
357     oidrdlocked=malloc(size);
358     oidrdversion=malloc(size);
359     oidwrlocked=malloc(size);
360   }
361   chashlistnode_t *ptr = c_table;
362   /* Represents number of bins in the chash table */
363   unsigned int size = c_size;
364   for(i = 0; i<size; i++) {
365     chashlistnode_t *curr = &ptr[i];
366     /* Inner loop to traverse the linked list of the cache lookupTable */
367     while(curr != NULL) {
368       //if the first bin in hash table is empty
369       if(curr->key == NULL)
370         break;
371       objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
372       objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
373       unsigned int version = headeraddr->version;
374
375       if(STATUS(headeraddr) & DIRTY) {
376         /* Read from the main heap  and compare versions */
377         if(write_trylock(&header->lock)) { //can aquire write lock
378           if (version == header->version) { /* versions match */
379             /* Keep track of objects locked */
380             oidwrlocked[numoidwrlocked++] = OID(header);
381           } else {
382             oidwrlocked[numoidwrlocked++] = OID(header);
383             transAbortProcess(oidwrlocked, numoidwrlocked);
384 #ifdef STMSTATS
385             header->abortCount++;
386             (typesCausingAbort[TYPE(header)])++;
387             getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
388 #endif
389             DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
390             DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
391             if (c_numelements>=200) {
392               free(oidrdlocked);
393               free(oidrdversion);
394               free(oidwrlocked);
395             }
396             return TRANS_ABORT;
397           }
398         } else { /* cannot aquire lock */
399           if(version == header->version) {
400             /* versions match */
401             softabort=1;
402           } else {
403             transAbortProcess(oidwrlocked, numoidwrlocked);
404 #ifdef STMSTATS
405             header->abortCount++;
406             (typesCausingAbort[TYPE(header)])++;
407             getTotalAbortCount(i+1, size, (void *)(curr->next), NULL, 'w');
408 #endif
409             DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
410             DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
411             if (c_numelements>=200) {
412               free(oidrdlocked);
413               free(oidrdversion);
414               free(oidwrlocked);
415             }
416             return TRANS_ABORT;
417           }
418         }
419       } else {
420         oidrdversion[numoidrdlocked]=version;
421         oidrdlocked[numoidrdlocked++] = header;
422       }
423       curr = curr->next;
424     }
425   } //end of for
426
427   //THIS IS THE SERIALIZATION POINT *****
428
429   for(i=0; i<numoidrdlocked; i++) {
430     /* Read from the main heap  and compare versions */
431     objheader_t *header=oidrdlocked[i];
432     unsigned int version=oidrdversion[i];
433     if(header->lock>0) { //not write locked
434       if(version != header->version) { /* versions do not match */
435         oidrdlocked[numoidrdlocked++] = OID(header);
436         transAbortProcess(oidwrlocked, numoidwrlocked);
437 #ifdef STMSTATS
438         header->abortCount++;
439         (typesCausingAbort[TYPE(header)])++;
440         getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
441 #endif
442         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
443         DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
444         if (c_numelements>=200) {
445           free(oidrdlocked);
446           free(oidrdversion);
447           free(oidwrlocked);
448         }
449         return TRANS_ABORT;
450       }
451     } else { /* cannot aquire lock */
452       //do increment as we didn't get lock
453       if(version == header->version) {
454         softabort=1;
455       } else {
456         transAbortProcess(oidwrlocked, numoidwrlocked);
457 #ifdef STMSTATS
458         header->abortCount++;
459         (typesCausingAbort[TYPE(header)])++;
460         getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *) oidrdversion, 'r');
461 #endif
462         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
463         DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
464         if (c_numelements>=200) {
465           free(oidrdlocked);
466           free(oidrdversion);
467           free(oidwrlocked);
468         }
469         return TRANS_ABORT;
470       }
471     }
472   }
473
474   /* Decide the final response */
475   if (softabort) {
476     transAbortProcess(oidwrlocked, numoidwrlocked);
477     DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
478     if (c_numelements>=200) {
479       free(oidrdlocked);
480       free(oidrdversion);
481       free(oidwrlocked);
482     }
483     return TRANS_SOFT_ABORT;
484   } else {
485     transCommitProcess(oidwrlocked, numoidwrlocked);
486     DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
487     if (c_numelements>=200) {
488       free(oidrdlocked);
489       free(oidrdversion);
490       free(oidwrlocked);
491     }
492     return TRANS_COMMIT;
493   }
494 }
495
496 /* ==================================================
497  * alttraverseCache
498  * - goes through the transaction cache and
499  * - decides if a transaction should commit or abort
500  * ==================================================
501  */
502 int alttraverseCache() {
503   /* Create info to keep track of objects that can be locked */
504   int numoidrdlocked=0;
505   int numoidwrlocked=0;
506   void * rdlocked[200];
507   int rdversion[200];
508   void * wrlocked[200];
509   int softabort=0;
510   int i;
511   void ** oidrdlocked;
512   int * oidrdversion;
513   void ** oidwrlocked;
514   if (c_numelements<200) {
515     oidrdlocked=rdlocked;
516     oidrdversion=rdversion;
517     oidwrlocked=wrlocked;
518   } else {
519     int size=c_numelements*sizeof(void*);
520     oidrdlocked=malloc(size);
521     oidrdversion=malloc(size);
522     oidwrlocked=malloc(size);
523   }
524   chashlistnode_t *curr = c_list;
525   /* Inner loop to traverse the linked list of the cache lookupTable */
526   while(curr != NULL) {
527     //if the first bin in hash table is empty
528     objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
529     objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
530     unsigned int version = headeraddr->version;
531
532     if(STATUS(headeraddr) & DIRTY) {
533       /* Read from the main heap  and compare versions */
534       if(write_trylock(&header->lock)) { //can aquire write lock
535         if (version == header->version) { /* versions match */
536           /* Keep track of objects locked */
537           oidwrlocked[numoidwrlocked++] = OID(header);
538         } else {
539           oidwrlocked[numoidwrlocked++] = OID(header);
540           transAbortProcess(oidwrlocked, numoidwrlocked);
541 #ifdef STMSTATS
542           header->abortCount++;
543           (typesCausingAbort[TYPE(header)])++;
544           getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
545 #endif
546           DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
547           DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
548           if (c_numelements>=200) {
549             free(oidrdlocked);
550             free(oidrdversion);
551             free(oidwrlocked);
552           }
553           return TRANS_ABORT;
554         }
555       } else { /* cannot aquire lock */
556         if(version == header->version) {
557           /* versions match */
558           softabort=1;
559         } else {
560           transAbortProcess(oidwrlocked, numoidwrlocked);
561 #ifdef STMSTATS
562           header->abortCount++;
563           (typesCausingAbort[TYPE(header)])++;
564           getTotalAbortCount(0, 1, (void *) curr->next, NULL, 'w');
565 #endif
566           DEBUGSTM("WR Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
567           DEBUGSTMSTAT("WR Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
568           if (c_numelements>=200) {
569             free(oidrdlocked);
570             free(oidrdversion);
571             free(oidwrlocked);
572           }
573           return TRANS_ABORT;
574         }
575       }
576     } else {
577       /* Read from the main heap  and compare versions */
578       oidrdversion[numoidrdlocked]=version;
579       oidrdlocked[numoidrdlocked++] = header;
580     }
581     curr = curr->lnext;
582   }
583   //THIS IS THE SERIALIZATION POINT *****
584   for(i=0; i<numoidrdlocked; i++) {
585     objheader_t * header = oidrdlocked[i];
586     unsigned int version=oidrdversion[i];
587     if(header->lock>=0) {
588       if(version != header->version) {
589         transAbortProcess(oidwrlocked, numoidwrlocked);
590 #ifdef STMSTATS
591         header->abortCount++;
592         (typesCausingAbort[TYPE(header)])++;
593         getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
594 #endif
595         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
596         DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
597         if (c_numelements>=200) {
598           free(oidrdlocked);
599           free(oidrdversion);
600           free(oidwrlocked);
601         }
602         return TRANS_ABORT;
603       }
604     } else { /* cannot aquire lock */
605       if(version == header->version) {
606         softabort=1;
607       } else {
608         transAbortProcess(oidwrlocked, numoidwrlocked);
609 #ifdef STMSTATS
610         header->abortCount++;
611         (typesCausingAbort[TYPE(header)])++;
612         getTotalAbortCount(i+1, numoidrdlocked, oidrdlocked, (void *)oidrdversion, 'r');
613 #endif
614         DEBUGSTM("RD Abort: rd: %u wr: %u tot: %u type: %u ver: %u\n", numoidrdlocked, numoidwrlocked, c_numelements, TYPE(header), header->version);
615         DEBUGSTMSTAT("RD Abort: Access Count: %u AbortCount: %u type: %u ver: %u \n", header->accessCount, header->abortCount, TYPE(header), header->version);
616         if (c_numelements>=200) {
617           free(oidrdlocked);
618           free(oidrdversion);
619           free(oidwrlocked);
620         }
621         return TRANS_ABORT;
622       }
623     }
624   }
625
626   /* Decide the final response */
627   if (softabort) {
628     transAbortProcess(oidwrlocked, numoidwrlocked);
629     DEBUGSTM("Soft Abort: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
630     if (c_numelements>=200) {
631       free(oidrdlocked);
632       free(oidrdversion);
633       free(oidwrlocked);
634     }
635     return TRANS_SOFT_ABORT;
636   } else {
637     transCommitProcess(oidwrlocked, numoidwrlocked);
638     DEBUGSTM("Commit: rd: %u wr: %u tot: %u\n", numoidrdlocked, numoidwrlocked, c_numelements);
639     if (c_numelements>=200) {
640       free(oidrdlocked);
641       free(oidrdversion);
642       free(oidwrlocked);
643     }
644     return TRANS_COMMIT;
645   }
646 }
647
648
649 /* ==================================
650  * transAbortProcess
651  *
652  * =================================
653  */
654 int transAbortProcess(void **oidwrlocked, int numoidwrlocked) {
655   int i;
656   objheader_t *header;
657   /* Release read locks */
658
659   /* Release write locks */
660   for(i=0; i< numoidwrlocked; i++) {
661     /* Read from the main heap */
662     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
663     write_unlock(&header->lock);
664   }
665
666 #ifdef STMSTATS
667   /* clear trec and then release objects locked */
668   struct objlist *ptr=lockedobjs;
669   while(ptr!=NULL) {
670     int max=ptr->offset;
671     for(i=0; i<max; i++) {
672       header = (objheader_t *)((char *)(ptr->objs[i]) - sizeof(objheader_t));
673       header->trec = NULL;
674       pthread_mutex_unlock(&(header->objlock));
675     }
676     ptr=ptr->next;
677   }
678 #endif
679 }
680
681 /* ==================================
682  * transCommitProcess
683  *
684  * =================================
685  */
686 int transCommitProcess(void ** oidwrlocked, int numoidwrlocked) {
687   objheader_t *header;
688   void *ptrcreate;
689   int i;
690   struct objlist *ptr=newobjs;
691   while(ptr!=NULL) {
692     int max=ptr->offset;
693     for(i=0; i<max; i++) {
694       //clear the new flag
695       ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
696     }
697     ptr=ptr->next;
698   }
699
700   /* Copy from transaction cache -> main object store */
701   for (i = 0; i < numoidwrlocked; i++) {
702     /* Read from the main heap */
703     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
704     int tmpsize;
705     GETSIZE(tmpsize, header);
706     struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
707     struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
708     dst->___cachedCode___=src->___cachedCode___;
709     dst->___cachedHash___=src->___cachedHash___;
710     memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
711     header->version += 1;
712   }
713
714   /* Release write locks */
715   for(i=0; i< numoidwrlocked; i++) {
716     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
717     write_unlock(&header->lock);
718   }
719
720 #ifdef STMSTATS
721   /* clear trec and then release objects locked */
722   ptr=lockedobjs;
723   while(ptr!=NULL) {
724     int max=ptr->offset;
725     for(i=0; i<max; i++) {
726       header = (objheader_t *)(((char *)(ptr->objs[i])) - sizeof(objheader_t));
727       header->trec = NULL;
728       pthread_mutex_unlock(&(header->objlock));
729       //TODO printf("%s() Unlock type= %d\n", __func__, TYPE(header));
730     }
731     ptr=ptr->next;
732   }
733 #endif
734   return 0;
735 }
736
737 /** ========================================================================================
738  * getTotalAbortCount
739  * params : start: start index of the loop
740  *        : stop: stop index of the loop
741  *        : startptr: pointer that points to where to start looking in the array/ linked list
742  *          'r'/'w' if found when visiting objects read/ objects modified
743  * =========================================================================================
744  **/
745 #ifdef STMSTATS
746 void getTotalAbortCount(int start, int stop, void *startptr, void *checkptr, char type) {
747   int i;
748   if(type == 'w') {
749     int isFirstTime = 0;
750     chashlistnode_t *curr = (chashlistnode_t *) startptr;
751     chashlistnode_t *ptr = c_table;
752     for(i = start; i < stop; i++) {
753       if(!isFirstTime)
754         curr = &ptr[i];
755       /* Inner loop to traverse the linked list of the cache lookupTable */
756       while(curr != NULL) {
757         if(curr->key == NULL)
758           break;
759         objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
760         objheader_t *header=(objheader_t *)(((char *)curr->key)-sizeof(objheader_t));
761         unsigned int version = headeraddr->version;
762         /* versions do not match */
763         if(version != header->version) {
764           header->abortCount++;
765           (typesCausingAbort[TYPE(header)])++;
766         }
767         curr = curr->next;
768       }
769       isFirstTime = 1;
770     }
771   } else {
772     /* Go through oids read that are locked */
773     for(i = start; i < stop; i++) {
774       objheader_t *header = ((void **)startptr)[i];
775       unsigned int version = ((int *)checkptr)[i];
776       if(version != header->version) { /* versions do not match */
777         header->abortCount++;
778         (typesCausingAbort[TYPE(header)])++;
779       }
780     }
781   }
782 }
783
784 /**
785  * needLock
786  * params: Object header
787  * Locks an object that causes aborts
788  **/
789 void needLock(objheader_t *header) {
790   if(pthread_mutex_trylock(&(header->objlock))) { //busy and failed to get locked
791     trec.blocked = 1; //set blocked flag
792     while(header->trec == NULL) { //retry
793       ;
794     }
795     if(header->trec->blocked == 1) { //ignore locking
796       return;
797     } else { //lock that blocks
798       pthread_mutex_lock(&(header->objlock));
799       //TODO printf("%s() Got lock on type= %d in second try\n", __func__, TYPE(header));
800       /* Reset blocked field */
801       trec.blocked = 0;
802       /* Set trec */
803       header->trec = &trec;
804     }
805   } else { //acquired lock
806     //TODO printf("%s() Got lock on type= %d in first try\n", __func__, TYPE(header));
807     /* Reset blocked field */
808     trec.blocked = 0;
809     /* Set trec */
810     header->trec = &trec;
811   }
812   /* Save the locked object */
813   if (lockedobjs->offset<MAXOBJLIST) {
814     lockedobjs->objs[lockedobjs->offset++]=OID(header);
815   } else {
816     struct objlist *tmp=malloc(sizeof(struct objlist));
817     tmp->next=lockedobjs;
818     tmp->objs[0]=OID(header);
819     tmp->offset=1;
820     lockedobjs=tmp;
821   }
822 }
823 #endif