avoid resizing...
[IRC.git] / Robust / src / Runtime / STM / stm.c
1 /* ============================================================
2  * singleTMCommit.c 
3  * - single thread commit on local machine
4  * =============================================================
5  * Copyright (c) 2009, University of California, Irvine, USA.
6  * All rights reserved.
7  * Author: Alokika Dash 
8  *         adash@uci.edu
9  * =============================================================
10  *
11  */
12
13 #include "tm.h"
14 #include "garbage.h"
15 /* Thread transaction variables */
16 __thread objstr_t *t_cache;
17 __thread objstr_t *t_reserve;
18 __thread struct objlist * newobjs;
19
20 #ifdef TRANSSTATS
21 int numTransCommit = 0;
22 int numTransAbort = 0;
23 int nSoftAbort = 0;
24 int nSoftAbortCommit = 0;
25 int nSoftAbortAbort = 0;
26 #endif
27
28
29 /* ==================================================
30  * stmStartup
31  * This function starts up the transaction runtime. 
32  * ==================================================
33  */
34 int stmStartup() {
35   return 0;
36 }
37
38 /* ======================================
39  * objstrCreate
40  * - create an object store of given size
41  * ======================================
42  */
43 objstr_t *objstrCreate(unsigned int size) {
44   objstr_t *tmp;
45   if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
46     printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
47     return NULL;
48   }
49   tmp->size = size;
50   tmp->next = NULL;
51   tmp->top = tmp + 1; //points to end of objstr_t structure!
52   return tmp;
53 }
54
55 void objstrReset() {
56   while(t_cache->next!=NULL) {
57     objstr_t *next=t_cache->next;
58     t_cache->next=t_reserve;
59     t_reserve=t_cache;
60     t_cache=next;
61   }
62   t_cache->top=t_cache+1;
63 }
64
65 //free entire list, starting at store
66 void objstrDelete(objstr_t *store) {
67   objstr_t *tmp;
68   while (store != NULL) {
69     tmp = store->next;
70     free(store);
71     store = tmp;
72   }
73   return;
74 }
75
76 /* =================================================
77  * transStart
78  * This function initializes things required in the 
79  * transaction start
80  * =================================================
81  */
82 void transStart() {
83   //Transaction start is currently free...commit and aborting is not
84 }
85
86 /* =======================================================
87  * transCreateObj
88  * This function creates objects in the transaction record 
89  * =======================================================
90  */
91 objheader_t *transCreateObj(void * ptr, unsigned int size) {
92   objheader_t *tmp = mygcmalloc(ptr, (sizeof(objheader_t) + size));
93   objheader_t *retval=&tmp[1];
94   tmp->lock=RW_LOCK_BIAS;
95   tmp->version = 1;
96   STATUS(tmp)=NEW;
97   // don't insert into table
98   if (newobjs->offset<MAXOBJLIST) {
99     newobjs->objs[newobjs->offset++]=retval;
100   } else {
101     struct objlist *tmp=malloc(sizeof(struct objlist));
102     tmp->next=newobjs;
103     tmp->objs[0]=retval;
104     tmp->offset=1;
105     newobjs=tmp;
106   }
107   return retval; //want space after object header
108 }
109
110 /* This functions inserts randowm wait delays in the order of msec
111  * Mostly used when transaction commits retry*/
112 void randomdelay() {
113   struct timespec req;
114   time_t t;
115
116   t = time(NULL);
117   req.tv_sec = 0;
118   req.tv_nsec = (long)(t%4); //1-11 microsec
119   nanosleep(&req, NULL);
120   return;
121 }
122
123 /* ==============================================
124  * objstrAlloc
125  * - allocate space in an object store
126  * ==============================================
127  */
128 void *objstrAlloc(unsigned int size) {
129   void *tmp;
130   int i=0;
131   objstr_t *store=t_cache;
132   if ((size&7)!=0) {
133     size+=(8-(size&7));
134   }
135
136   for(;i<2;i++) {
137     if (OSFREE(store)>=size) {
138       tmp=store->top;
139       store->top +=size;
140       return tmp;
141     }
142     if ((store=store->next)==NULL)
143       break;
144   }
145
146   {
147     unsigned int newsize=size>DEFAULT_OBJ_STORE_SIZE?size:DEFAULT_OBJ_STORE_SIZE;
148     objstr_t **otmp=&t_reserve;
149     objstr_t *ptr;
150     while((ptr=*otmp)!=NULL) {
151       if (ptr->size>=newsize) {
152         //remove from list
153         *otmp=ptr->next;
154         ptr->next=t_cache;
155         t_cache=ptr;
156         ptr->top=((char *)(&ptr[1]))+size;
157         return &ptr[1];
158       }
159     }
160     
161     objstr_t *os=(objstr_t *)calloc(1,(sizeof(objstr_t) + newsize));
162     void *nptr=&os[1];
163     os->next=t_cache;
164     t_cache=os;
165     os->size=newsize;
166     os->top=((char *)nptr)+size;
167     return nptr;
168   }
169 }
170
171 /* =============================================================
172  * transRead
173  * -finds the objects either in main heap
174  * -copies the object into the transaction cache
175  * =============================================================
176  */
177 __attribute__((pure)) void *transRead(void * oid) {
178   objheader_t *tmp, *objheader;
179   objheader_t *objcopy;
180   int size;
181
182   //quick case for new objects
183   if (((struct ___Object___ *)oid)->___objstatus___ & NEW)
184     return oid;
185
186   /* Read from the main heap */
187   //No lock for now
188   objheader_t *header = (objheader_t *)(((char *)oid) - sizeof(objheader_t)); 
189   GETSIZE(size, header);
190   size += sizeof(objheader_t);
191   objcopy = (objheader_t *) objstrAlloc(size);
192   memcpy(objcopy, header, size);
193   /* Insert into cache's lookup table */
194   STATUS(objcopy)=0;
195   t_chashInsert(oid, &objcopy[1]);
196   return &objcopy[1];
197 }
198
199 void freenewobjs() {
200   struct objlist *ptr=newobjs;
201   while(ptr->next!=NULL) {
202     struct objlist *tmp=ptr->next;
203     free(ptr);
204     ptr=tmp;
205   }
206   ptr->offset=0;
207   newobjs=ptr;
208 }
209
210 /* ================================================================
211  * transCommit
212  * - This function initiates the transaction commit process
213  * - goes through the transaction cache and decides
214  * - a final response 
215  * ================================================================
216  */
217 int transCommit() {
218 #ifdef TRANSSTATS
219   int softaborted=0;
220 #endif
221   do {
222     /* Look through all the objects in the transaction hash table */
223     int finalResponse = traverseCache();
224     if(finalResponse == TRANS_ABORT) {
225 #ifdef TRANSSTATS
226       numTransAbort++;
227       if (softaborted) {
228         nSoftAbortAbort++;
229       }
230 #endif
231       freenewobjs();
232       objstrReset();
233       t_chashreset();
234       return TRANS_ABORT;
235     }
236     if(finalResponse == TRANS_COMMIT) {
237 #ifdef TRANSSTATS
238       numTransCommit++;
239       if (softaborted) {
240         nSoftAbortCommit++;
241       }
242 #endif
243       freenewobjs();
244       objstrReset();
245       t_chashreset();
246       return 0;
247     }
248     /* wait a random amount of time before retrying to commit transaction*/
249     if(finalResponse == TRANS_SOFT_ABORT) {
250 #ifdef TRANSSTATS
251       nSoftAbort++;
252       softaborted=1;
253 #endif
254       randomdelay();
255     } else {
256       printf("Error: in %s() Unknown outcome", __func__);
257       exit(-1);
258     }
259   } while (1);
260 }
261
262 /* ==================================================
263  * traverseCache
264  * - goes through the transaction cache and
265  * - decides if a transaction should commit or abort
266  * ==================================================
267  */
268 int traverseCache() {
269   /* Create info to keep track of objects that can be locked */
270   int numoidrdlocked=0;
271   int numoidwrlocked=0;
272   void * rdlocked[200];
273   void * wrlocked[200];
274   int softabort=0;
275   int i;
276   void ** oidrdlocked;
277   void ** oidwrlocked;
278   if (c_numelements<200) {
279     oidrdlocked=rdlocked;
280     oidwrlocked=wrlocked;
281   } else {
282     int size=c_numelements*sizeof(void*);
283     oidrdlocked=malloc(size);
284     oidwrlocked=malloc(size);
285   }
286   chashlistnode_t *ptr = c_table;
287   /* Represents number of bins in the chash table */
288   unsigned int size = c_size;
289   for(i = 0; i<size; i++) {
290     chashlistnode_t *curr = &ptr[i];
291     /* Inner loop to traverse the linked list of the cache lookupTable */
292     while(curr != NULL) {
293       //if the first bin in hash table is empty
294       if(curr->key == NULL)
295         break;
296       objheader_t * headeraddr=&((objheader_t *) curr->val)[-1];
297       
298       unsigned int version = headeraddr->version;
299       objheader_t *header=(objheader_t *) (((char *)curr->key)-sizeof(objheader_t));
300       
301       if(STATUS(headeraddr) & DIRTY) {
302         /* Read from the main heap  and compare versions */
303         if(write_trylock(&header->lock)) { //can aquire write lock
304           if (version == header->version) {/* versions match */
305             /* Keep track of objects locked */
306             oidwrlocked[numoidwrlocked++] = OID(header);
307           } else { 
308             oidwrlocked[numoidwrlocked++] = OID(header);
309             transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
310             return TRANS_ABORT;
311           }
312         } else { /* cannot aquire lock */
313           if(version == header->version) /* versions match */
314             softabort=1;
315           else {
316             transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
317             return TRANS_ABORT;
318           }
319         }
320       } else {
321         /* Read from the main heap  and compare versions */
322         if(read_trylock(&header->lock)) { //can further aquire read locks
323           if(version == header->version) {/* versions match */
324             oidrdlocked[numoidrdlocked++] = OID(header);
325           } else {
326             oidrdlocked[numoidrdlocked++] = OID(header);
327             transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
328             return TRANS_ABORT;
329           }
330         } else { /* cannot aquire lock */
331           if(version == header->version)
332             softabort=1;
333           else {
334             transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
335             return TRANS_ABORT;
336           }
337         }
338       }
339     
340       curr = curr->next;
341     }
342   } //end of for
343   
344   /* Decide the final response */
345   if (softabort) {
346     transAbortProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
347     return TRANS_SOFT_ABORT;
348   } else {
349     transCommitProcess(oidrdlocked, &numoidrdlocked, oidwrlocked, &numoidwrlocked);
350     return TRANS_COMMIT;
351   }
352 }
353
354
355 /* ==================================
356  * transAbortProcess
357  *
358  * =================================
359  */
360 int transAbortProcess(void **oidrdlocked, int *numoidrdlocked, void **oidwrlocked, int *numoidwrlocked) {
361   int i;
362   objheader_t *header;
363   /* Release read locks */
364   for(i=0; i< *numoidrdlocked; i++) {
365     /* Read from the main heap */
366     header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t));
367     read_unlock(&header->lock);
368   }
369
370   /* Release write locks */
371   for(i=0; i< *numoidwrlocked; i++) {
372     /* Read from the main heap */
373     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
374     write_unlock(&header->lock);
375   }
376   if (c_numelements>=200) {
377     free(oidrdlocked);
378     free(oidwrlocked);
379   }
380 }
381
382 /* ==================================
383  * transCommitProcess
384  *
385  * =================================
386  */
387 int transCommitProcess(void ** oidrdlocked, int *numoidrdlocked,
388                     void ** oidwrlocked, int *numoidwrlocked) {
389   objheader_t *header;
390   void *ptrcreate;
391   int i;
392   struct objlist *ptr=newobjs;
393   while(ptr!=NULL) {
394     int max=ptr->offset;
395     for(i=0;i<max;i++) {
396       //clear the new flag
397       ((struct ___Object___ *)ptr->objs[i])->___objstatus___=0;
398     }
399     ptr=ptr->next;
400   }
401   
402   /* Copy from transaction cache -> main object store */
403   for (i = 0; i < *numoidwrlocked; i++) {
404     /* Read from the main heap */ 
405     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t));
406     int tmpsize;
407     GETSIZE(tmpsize, header);
408     struct ___Object___ *dst=(struct ___Object___*)oidwrlocked[i];
409     struct ___Object___ *src=t_chashSearch(oidwrlocked[i]);
410     dst->___cachedCode___=src->___cachedCode___;
411     dst->___cachedHash___=src->___cachedHash___;
412     memcpy(&dst[1], &src[1], tmpsize-sizeof(struct ___Object___));
413     header->version += 1;
414   }
415   
416   /* Release read locks */
417   for(i=0; i< *numoidrdlocked; i++) {
418     /* Read from the main heap */
419     header = (objheader_t *)(((char *)(oidrdlocked[i])) - sizeof(objheader_t)); 
420     read_unlock(&header->lock);
421   }
422
423   /* Release write locks */
424   for(i=0; i< *numoidwrlocked; i++) {
425     header = (objheader_t *)(((char *)(oidwrlocked[i])) - sizeof(objheader_t)); 
426     write_unlock(&header->lock);
427   }
428   if (c_numelements>=200) {
429     free(oidrdlocked);
430     free(oidwrlocked);
431   }
432   return 0;
433 }
434