benchmark silo added
[c11concurrency-benchmarks.git] / silo / masstree / kvthread.hh
1 /* Masstree
2  * Eddie Kohler, Yandong Mao, Robert Morris
3  * Copyright (c) 2012-2014 President and Fellows of Harvard College
4  * Copyright (c) 2012-2014 Massachusetts Institute of Technology
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a
7  * copy of this software and associated documentation files (the "Software"),
8  * to deal in the Software without restriction, subject to the conditions
9  * listed in the Masstree LICENSE file. These conditions include: you must
10  * preserve this copyright notice, and you cannot mention the copyright
11  * holders in advertising related to the Software without their permission.
12  * The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
13  * notice is a summary of the Masstree LICENSE file; the license in that file
14  * is legally binding.
15  */
16 #ifndef KVTHREAD_HH
17 #define KVTHREAD_HH 1
18 #include "mtcounters.hh"
19 #include "compiler.hh"
20 #include "circular_int.hh"
21 #include "timestamp.hh"
22 #include <assert.h>
23 #include <pthread.h>
24 #include <sys/mman.h>
25 #include <stdlib.h>
26
27 class threadinfo;
28 class loginfo;
29
30 extern volatile uint64_t globalepoch;    // global epoch, updated regularly
31 extern volatile bool recovering;
32
33 struct memdebug {
34 #if HAVE_MEMDEBUG
35     enum {
36         magic_value = 389612313 /* = 0x17390319 */,
37         magic_free_value = 2015593488 /* = 0x78238410 */
38     };
39     int magic;
40     int freetype;
41     size_t size;
42     int after_rcu;
43     int line;
44     const char* file;
45
46     static void* make(void* p, size_t size, int freetype) {
47         if (p) {
48             memdebug *m = reinterpret_cast<memdebug *>(p);
49             m->magic = magic_value;
50             m->freetype = freetype;
51             m->size = size;
52             m->after_rcu = 0;
53             m->line = 0;
54             m->file = 0;
55             return m + 1;
56         } else
57             return p;
58     }
59     static void set_landmark(void* p, const char* file, int line) {
60         if (p) {
61             memdebug* m = reinterpret_cast<memdebug*>(p) - 1;
62             m->file = file;
63             m->line = line;
64         }
65     }
66     static void *check_free(void *p, size_t size, int freetype) {
67         memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
68         free_checks(m, size, freetype, false, "deallocate");
69         m->magic = magic_free_value;
70         return m;
71     }
72     static void check_rcu(void *p, size_t size, int freetype) {
73         memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
74         free_checks(m, size, freetype, false, "deallocate_rcu");
75         m->after_rcu = 1;
76     }
77     static void *check_free_after_rcu(void *p, int freetype) {
78         memdebug *m = reinterpret_cast<memdebug *>(p) - 1;
79         free_checks(m, 0, freetype, true, "free_after_rcu");
80         m->magic = magic_free_value;
81         return m;
82     }
83     static bool check_use(const void *p, int type) {
84         const memdebug *m = reinterpret_cast<const memdebug *>(p) - 1;
85         return m->magic == magic_value && (type == 0 || (m->freetype >> 8) == type);
86     }
87     static bool check_use(const void *p, int type1, int type2) {
88         const memdebug *m = reinterpret_cast<const memdebug *>(p) - 1;
89         return m->magic == magic_value
90             && ((m->freetype >> 8) == type1 || (m->freetype >> 8) == type2);
91     }
92     static void assert_use(const void *p, memtag tag) {
93         if (!check_use(p, tag))
94             hard_assert_use(p, tag, (memtag) -1);
95     }
96     static void assert_use(const void *p, memtag tag1, memtag tag2) {
97         if (!check_use(p, tag1, tag2))
98             hard_assert_use(p, tag1, tag2);
99     }
100   private:
101     static void free_checks(const memdebug *m, size_t size, int freetype,
102                             int after_rcu, const char *op) {
103         if (m->magic != magic_value
104             || m->freetype != freetype
105             || (!after_rcu && m->size != size)
106             || m->after_rcu != after_rcu)
107             hard_free_checks(m, freetype, size, after_rcu, op);
108     }
109     void landmark(char* buf, size_t size) const;
110     static void hard_free_checks(const memdebug* m, size_t size, int freetype,
111                                  int after_rcu, const char* op);
112     static void hard_assert_use(const void* ptr, memtag tag1, memtag tag2);
113 #else
114     static void *make(void *p, size_t, int) {
115         return p;
116     }
117     static void set_landmark(void*, const char*, int) {
118     }
119     static void *check_free(void *p, size_t, int) {
120         return p;
121     }
122     static void check_rcu(void *, size_t, int) {
123     }
124     static void *check_free_after_rcu(void *p, int) {
125         return p;
126     }
127     static bool check_use(void *, memtag) {
128         return true;
129     }
130     static bool check_use(void *, memtag, memtag) {
131         return true;
132     }
133     static void assert_use(void *, memtag) {
134     }
135     static void assert_use(void *, memtag, memtag) {
136     }
137 #endif
138 };
139
140 enum {
141 #if HAVE_MEMDEBUG
142     memdebug_size = sizeof(memdebug)
143 #else
144     memdebug_size = 0
145 #endif
146 };
147
148 struct limbo_element {
149     void *ptr_;
150     int freetype_;
151     uint64_t epoch_;
152 };
153
154 struct limbo_group {
155     enum { capacity = (4076 - sizeof(limbo_group *)) / sizeof(limbo_element) };
156     int head_;
157     int tail_;
158     limbo_element e_[capacity];
159     limbo_group *next_;
160     limbo_group()
161         : head_(0), tail_(0), next_() {
162     }
163     void push_back(void *ptr, int freetype, uint64_t epoch) {
164         assert(tail_ < capacity);
165         e_[tail_].ptr_ = ptr;
166         e_[tail_].freetype_ = freetype;
167         e_[tail_].epoch_ = epoch;
168         ++tail_;
169     }
170 };
171
172 template <int N> struct has_threadcounter {
173     static bool test(threadcounter ci) {
174         return unsigned(ci) < unsigned(N);
175     }
176 };
177 template <> struct has_threadcounter<0> {
178     static bool test(threadcounter) {
179         return false;
180     }
181 };
182
183 struct rcu_callback {
184     virtual ~rcu_callback() {
185     }
186     virtual void operator()(threadinfo& ti) = 0;
187 };
188
189 class threadinfo {
190   public:
191     enum {
192         TI_MAIN, TI_PROCESS, TI_LOG, TI_CHECKPOINT
193     };
194
195     static threadinfo *make(int purpose, int index);
196     // XXX destructor
197     static pthread_key_t key;
198
199     // thread information
200     int purpose() const {
201         return purpose_;
202     }
203     int index() const {
204         return index_;
205     }
206     loginfo* logger() const {
207         return logger_;
208     }
209     void set_logger(loginfo* logger) {
210         assert(!logger_ && logger);
211         logger_ = logger;
212     }
213     static threadinfo *allthreads;
214     threadinfo* next() const {
215         return next_;
216     }
217
218     // timestamps
219     kvtimestamp_t operation_timestamp() const {
220         return timestamp();
221     }
222     kvtimestamp_t update_timestamp() const {
223         return ts_;
224     }
225     kvtimestamp_t update_timestamp(kvtimestamp_t x) const {
226         if (circular_int<kvtimestamp_t>::less_equal(ts_, x))
227             // x might be a marker timestamp; ensure result is not
228             ts_ = (x | 1) + 1;
229         return ts_;
230     }
231     kvtimestamp_t update_timestamp(kvtimestamp_t x, kvtimestamp_t y) const {
232         if (circular_int<kvtimestamp_t>::less(x, y))
233             x = y;
234         if (circular_int<kvtimestamp_t>::less_equal(ts_, x))
235             // x might be a marker timestamp; ensure result is not
236             ts_ = (x | 1) + 1;
237         return ts_;
238     }
239     void increment_timestamp() {
240         ts_ += 2;
241     }
242     void advance_timestamp(kvtimestamp_t x) {
243         if (circular_int<kvtimestamp_t>::less(ts_, x))
244             ts_ = x;
245     }
246
247     // event counters
248     void mark(threadcounter ci) {
249         if (has_threadcounter<int(ncounters)>::test(ci))
250             ++counters_[ci];
251     }
252     void mark(threadcounter ci, int64_t delta) {
253         if (has_threadcounter<int(ncounters)>::test(ci))
254             counters_[ci] += delta;
255     }
256     bool has_counter(threadcounter ci) const {
257         return has_threadcounter<int(ncounters)>::test(ci);
258     }
259     uint64_t counter(threadcounter ci) const {
260         return has_threadcounter<int(ncounters)>::test(ci) ? counters_[ci] : 0;
261     }
262
263     struct accounting_relax_fence_function {
264         threadinfo *ti_;
265         threadcounter ci_;
266         accounting_relax_fence_function(threadinfo *ti, threadcounter ci)
267             : ti_(ti), ci_(ci) {
268         }
269         void operator()() {
270             relax_fence();
271             ti_->mark(ci_);
272         }
273     };
274     /** @brief Return a function object that calls mark(ci); relax_fence().
275      *
276      * This function object can be used to count the number of relax_fence()s
277      * executed. */
278     accounting_relax_fence_function accounting_relax_fence(threadcounter ci) {
279         return accounting_relax_fence_function(this, ci);
280     }
281
282     struct stable_accounting_relax_fence_function {
283         threadinfo *ti_;
284         stable_accounting_relax_fence_function(threadinfo *ti)
285             : ti_(ti) {
286         }
287         template <typename V>
288         void operator()(V v) {
289             relax_fence();
290             ti_->mark(threadcounter(tc_stable + (v.isleaf() << 1) + v.splitting()));
291         }
292     };
293     /** @brief Return a function object that calls mark(ci); relax_fence().
294      *
295      * This function object can be used to count the number of relax_fence()s
296      * executed. */
297     stable_accounting_relax_fence_function stable_fence() {
298         return stable_accounting_relax_fence_function(this);
299     }
300
301     accounting_relax_fence_function lock_fence(threadcounter ci) {
302         return accounting_relax_fence_function(this, ci);
303     }
304
305     // memory allocation
306     void* allocate(size_t sz, memtag tag) {
307         void *p = malloc(sz + memdebug_size);
308         p = memdebug::make(p, sz, tag << 8);
309         if (p)
310             mark(threadcounter(tc_alloc + (tag > memtag_value)), sz);
311         return p;
312     }
313     void deallocate(void* p, size_t sz, memtag tag) {
314         // in C++ allocators, 'p' must be nonnull
315         assert(p);
316         p = memdebug::check_free(p, sz, tag << 8);
317         free(p);
318         mark(threadcounter(tc_alloc + (tag > memtag_value)), -sz);
319     }
320     void deallocate_rcu(void *p, size_t sz, memtag tag) {
321         assert(p);
322         memdebug::check_rcu(p, sz, tag << 8);
323         record_rcu(p, tag << 8);
324         mark(threadcounter(tc_alloc + (tag > memtag_value)), -sz);
325     }
326
327     void* pool_allocate(size_t sz, memtag tag) {
328         int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
329         assert(nl <= pool_max_nlines);
330         if (unlikely(!pool_[nl - 1]))
331             refill_pool(nl);
332         void *p = pool_[nl - 1];
333         if (p) {
334             pool_[nl - 1] = *reinterpret_cast<void **>(p);
335             p = memdebug::make(p, sz, (tag << 8) + nl);
336             mark(threadcounter(tc_alloc + (tag > memtag_value)),
337                  nl * CACHE_LINE_SIZE);
338         }
339         return p;
340     }
341     void pool_deallocate(void* p, size_t sz, memtag tag) {
342         int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
343         assert(p && nl <= pool_max_nlines);
344         p = memdebug::check_free(p, sz, (tag << 8) + nl);
345         if (use_pool()) {
346             *reinterpret_cast<void **>(p) = pool_[nl - 1];
347             pool_[nl - 1] = p;
348         } else
349             free(p);
350         mark(threadcounter(tc_alloc + (tag > memtag_value)),
351              -nl * CACHE_LINE_SIZE);
352     }
353     void pool_deallocate_rcu(void* p, size_t sz, memtag tag) {
354         int nl = (sz + memdebug_size + CACHE_LINE_SIZE - 1) / CACHE_LINE_SIZE;
355         assert(p && nl <= pool_max_nlines);
356         memdebug::check_rcu(p, sz, (tag << 8) + nl);
357         record_rcu(p, (tag << 8) + nl);
358         mark(threadcounter(tc_alloc + (tag > memtag_value)),
359              -nl * CACHE_LINE_SIZE);
360     }
361
362     // RCU
363     void rcu_start() {
364         if (gc_epoch_ != globalepoch)
365             gc_epoch_ = globalepoch;
366     }
367     void rcu_stop() {
368         if (limbo_epoch_ && (gc_epoch_ - limbo_epoch_) > 1)
369             hard_rcu_quiesce();
370         gc_epoch_ = 0;
371     }
372     void rcu_quiesce() {
373         rcu_start();
374         if (limbo_epoch_ && (gc_epoch_ - limbo_epoch_) > 2)
375             hard_rcu_quiesce();
376     }
377     typedef ::rcu_callback rcu_callback;
378     void rcu_register(rcu_callback* cb) {
379         record_rcu(cb, -1);
380     }
381
382     // thread management
383     void run();
384     int run(void* (*thread_func)(threadinfo*), void* thread_data = 0);
385     pthread_t threadid() const {
386         return threadid_;
387     }
388     void* thread_data() const {
389         return thread_data_;
390     }
391
392     static threadinfo *current() {
393         return (threadinfo *) pthread_getspecific(key);
394     }
395
396     void report_rcu(void *ptr) const;
397     static void report_rcu_all(void *ptr);
398
399   private:
400     union {
401         struct {
402             uint64_t gc_epoch_;
403             uint64_t limbo_epoch_;
404             loginfo *logger_;
405
406             threadinfo *next_;
407             int purpose_;
408             int index_;         // the index of a udp, logging, tcp,
409                                 // checkpoint or recover thread
410
411             pthread_t threadid_;
412         };
413         char padding1[CACHE_LINE_SIZE];
414     };
415
416   private:
417     enum { pool_max_nlines = 20 };
418     void *pool_[pool_max_nlines];
419
420     limbo_group *limbo_head_;
421     limbo_group *limbo_tail_;
422     mutable kvtimestamp_t ts_;
423
424     //enum { ncounters = (int) tc_max };
425     enum { ncounters = 0 };
426     uint64_t counters_[ncounters];
427
428     void* (*thread_func_)(threadinfo*);
429     void* thread_data_;
430
431     void refill_pool(int nl);
432     void refill_rcu();
433
434     void free_rcu(void *p, int freetype) {
435         if ((freetype & 255) == 0) {
436             p = memdebug::check_free_after_rcu(p, freetype);
437             ::free(p);
438         } else if (freetype == -1)
439             (*static_cast<rcu_callback *>(p))(*this);
440         else {
441             p = memdebug::check_free_after_rcu(p, freetype);
442             int nl = freetype & 255;
443             *reinterpret_cast<void **>(p) = pool_[nl - 1];
444             pool_[nl - 1] = p;
445         }
446     }
447
448     void record_rcu(void* ptr, int freetype) {
449         if (recovering && freetype == (memtag_value << 8)) {
450             free_rcu(ptr, freetype);
451             return;
452         }
453         if (limbo_tail_->tail_ == limbo_tail_->capacity)
454             refill_rcu();
455         uint64_t epoch = globalepoch;
456         limbo_tail_->push_back(ptr, freetype, epoch);
457         if (!limbo_epoch_)
458             limbo_epoch_ = epoch;
459     }
460
461 #if ENABLE_ASSERTIONS
462     static int no_pool_value;
463     static bool use_pool() {
464         return !no_pool_value;
465     }
466 #else
467     static bool use_pool() {
468         return true;
469     }
470 #endif
471
472     void hard_rcu_quiesce();
473     static void* thread_trampoline(void*);
474     friend class loginfo;
475 };
476
477 #endif