ring_buffer: reset buffer page when freeing
[firefly-linux-kernel-4.4.55.git] / kernel / trace / ring_buffer.c
1 /*
2  * Generic ring buffer
3  *
4  * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5  */
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h>        /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
17 #include <linux/fs.h>
18
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
20 #define DEBUG_SHIFT 0
21
22 /* FIXME!!! */
23 u64 ring_buffer_time_stamp(int cpu)
24 {
25         /* shift to debug/test normalization and TIME_EXTENTS */
26         return sched_clock() << DEBUG_SHIFT;
27 }
28
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
30 {
31         /* Just stupid testing the normalize function and deltas */
32         *ts >>= DEBUG_SHIFT;
33 }
34
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT      2
37 #define RB_ALIGNMENT            (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA       28
39
40 enum {
41         RB_LEN_TIME_EXTEND = 8,
42         RB_LEN_TIME_STAMP = 16,
43 };
44
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
48 {
49         unsigned length;
50
51         switch (event->type) {
52         case RINGBUF_TYPE_PADDING:
53                 /* undefined */
54                 return -1;
55
56         case RINGBUF_TYPE_TIME_EXTEND:
57                 return RB_LEN_TIME_EXTEND;
58
59         case RINGBUF_TYPE_TIME_STAMP:
60                 return RB_LEN_TIME_STAMP;
61
62         case RINGBUF_TYPE_DATA:
63                 if (event->len)
64                         length = event->len << RB_ALIGNMENT_SHIFT;
65                 else
66                         length = event->array[0];
67                 return length + RB_EVNT_HDR_SIZE;
68         default:
69                 BUG();
70         }
71         /* not hit */
72         return 0;
73 }
74
75 /**
76  * ring_buffer_event_length - return the length of the event
77  * @event: the event to get the length of
78  */
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
80 {
81         return rb_event_length(event);
82 }
83
84 /* inline for ring buffer fast paths */
85 static inline void *
86 rb_event_data(struct ring_buffer_event *event)
87 {
88         BUG_ON(event->type != RINGBUF_TYPE_DATA);
89         /* If length is in len field, then array[0] has the data */
90         if (event->len)
91                 return (void *)&event->array[0];
92         /* Otherwise length is in array[0] and array[1] has the data */
93         return (void *)&event->array[1];
94 }
95
96 /**
97  * ring_buffer_event_data - return the data of the event
98  * @event: the event to get the data from
99  */
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
101 {
102         return rb_event_data(event);
103 }
104
105 #define for_each_buffer_cpu(buffer, cpu)                \
106         for_each_cpu_mask(cpu, buffer->cpumask)
107
108 #define TS_SHIFT        27
109 #define TS_MASK         ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST   (~TS_MASK)
111
112 /*
113  * This hack stolen from mm/slob.c.
114  * We can store per page timing information in the page frame of the page.
115  * Thanks to Peter Zijlstra for suggesting this idea.
116  */
117 struct buffer_page {
118         union {
119                 struct {
120                         unsigned long    flags;         /* mandatory */
121                         atomic_t         _count;        /* mandatory */
122                         u64              time_stamp;    /* page time stamp */
123                         unsigned         size;          /* size of page data */
124                         struct list_head list;          /* list of free pages */
125                 };
126                 struct page page;
127         };
128 };
129
130 /*
131  * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
132  * this issue out.
133  */
134 static inline void free_buffer_page(struct buffer_page *bpage)
135 {
136         reset_page_mapcount(&bpage->page);
137         bpage->page.mapping = NULL;
138         __free_page(&bpage->page);
139 }
140
141 /*
142  * We need to fit the time_stamp delta into 27 bits.
143  */
144 static inline int test_time_stamp(u64 delta)
145 {
146         if (delta & TS_DELTA_TEST)
147                 return 1;
148         return 0;
149 }
150
151 #define BUF_PAGE_SIZE PAGE_SIZE
152
153 /*
154  * head_page == tail_page && head == tail then buffer is empty.
155  */
156 struct ring_buffer_per_cpu {
157         int                             cpu;
158         struct ring_buffer              *buffer;
159         spinlock_t                      lock;
160         struct lock_class_key           lock_key;
161         struct list_head                pages;
162         unsigned long                   head;   /* read from head */
163         unsigned long                   tail;   /* write to tail */
164         struct buffer_page              *head_page;
165         struct buffer_page              *tail_page;
166         unsigned long                   overrun;
167         unsigned long                   entries;
168         u64                             write_stamp;
169         u64                             read_stamp;
170         atomic_t                        record_disabled;
171 };
172
173 struct ring_buffer {
174         unsigned long                   size;
175         unsigned                        pages;
176         unsigned                        flags;
177         int                             cpus;
178         cpumask_t                       cpumask;
179         atomic_t                        record_disabled;
180
181         struct mutex                    mutex;
182
183         struct ring_buffer_per_cpu      **buffers;
184 };
185
186 struct ring_buffer_iter {
187         struct ring_buffer_per_cpu      *cpu_buffer;
188         unsigned long                   head;
189         struct buffer_page              *head_page;
190         u64                             read_stamp;
191 };
192
193 #define RB_WARN_ON(buffer, cond)                        \
194         if (unlikely(cond)) {                           \
195                 atomic_inc(&buffer->record_disabled);   \
196                 WARN_ON(1);                             \
197                 return -1;                              \
198         }
199
200 /**
201  * check_pages - integrity check of buffer pages
202  * @cpu_buffer: CPU buffer with pages to test
203  *
204  * As a safty measure we check to make sure the data pages have not
205  * been corrupted.
206  */
207 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
208 {
209         struct list_head *head = &cpu_buffer->pages;
210         struct buffer_page *page, *tmp;
211
212         RB_WARN_ON(cpu_buffer, head->next->prev != head);
213         RB_WARN_ON(cpu_buffer, head->prev->next != head);
214
215         list_for_each_entry_safe(page, tmp, head, list) {
216                 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
217                 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
218         }
219
220         return 0;
221 }
222
223 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
224 {
225         return cpu_buffer->head_page->size;
226 }
227
228 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
229                              unsigned nr_pages)
230 {
231         struct list_head *head = &cpu_buffer->pages;
232         struct buffer_page *page, *tmp;
233         unsigned long addr;
234         LIST_HEAD(pages);
235         unsigned i;
236
237         for (i = 0; i < nr_pages; i++) {
238                 addr = __get_free_page(GFP_KERNEL);
239                 if (!addr)
240                         goto free_pages;
241                 page = (struct buffer_page *)virt_to_page(addr);
242                 list_add(&page->list, &pages);
243         }
244
245         list_splice(&pages, head);
246
247         rb_check_pages(cpu_buffer);
248
249         return 0;
250
251  free_pages:
252         list_for_each_entry_safe(page, tmp, &pages, list) {
253                 list_del_init(&page->list);
254                 free_buffer_page(page);
255         }
256         return -ENOMEM;
257 }
258
259 static struct ring_buffer_per_cpu *
260 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
261 {
262         struct ring_buffer_per_cpu *cpu_buffer;
263         int ret;
264
265         cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
266                                   GFP_KERNEL, cpu_to_node(cpu));
267         if (!cpu_buffer)
268                 return NULL;
269
270         cpu_buffer->cpu = cpu;
271         cpu_buffer->buffer = buffer;
272         spin_lock_init(&cpu_buffer->lock);
273         INIT_LIST_HEAD(&cpu_buffer->pages);
274
275         ret = rb_allocate_pages(cpu_buffer, buffer->pages);
276         if (ret < 0)
277                 goto fail_free_buffer;
278
279         cpu_buffer->head_page
280                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
281         cpu_buffer->tail_page
282                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
283
284         return cpu_buffer;
285
286  fail_free_buffer:
287         kfree(cpu_buffer);
288         return NULL;
289 }
290
291 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
292 {
293         struct list_head *head = &cpu_buffer->pages;
294         struct buffer_page *page, *tmp;
295
296         list_for_each_entry_safe(page, tmp, head, list) {
297                 list_del_init(&page->list);
298                 free_buffer_page(page);
299         }
300         kfree(cpu_buffer);
301 }
302
303 /*
304  * Causes compile errors if the struct buffer_page gets bigger
305  * than the struct page.
306  */
307 extern int ring_buffer_page_too_big(void);
308
309 /**
310  * ring_buffer_alloc - allocate a new ring_buffer
311  * @size: the size in bytes that is needed.
312  * @flags: attributes to set for the ring buffer.
313  *
314  * Currently the only flag that is available is the RB_FL_OVERWRITE
315  * flag. This flag means that the buffer will overwrite old data
316  * when the buffer wraps. If this flag is not set, the buffer will
317  * drop data when the tail hits the head.
318  */
319 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
320 {
321         struct ring_buffer *buffer;
322         int bsize;
323         int cpu;
324
325         /* Paranoid! Optimizes out when all is well */
326         if (sizeof(struct buffer_page) > sizeof(struct page))
327                 ring_buffer_page_too_big();
328
329
330         /* keep it in its own cache line */
331         buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
332                          GFP_KERNEL);
333         if (!buffer)
334                 return NULL;
335
336         buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
337         buffer->flags = flags;
338
339         /* need at least two pages */
340         if (buffer->pages == 1)
341                 buffer->pages++;
342
343         buffer->cpumask = cpu_possible_map;
344         buffer->cpus = nr_cpu_ids;
345
346         bsize = sizeof(void *) * nr_cpu_ids;
347         buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
348                                   GFP_KERNEL);
349         if (!buffer->buffers)
350                 goto fail_free_buffer;
351
352         for_each_buffer_cpu(buffer, cpu) {
353                 buffer->buffers[cpu] =
354                         rb_allocate_cpu_buffer(buffer, cpu);
355                 if (!buffer->buffers[cpu])
356                         goto fail_free_buffers;
357         }
358
359         mutex_init(&buffer->mutex);
360
361         return buffer;
362
363  fail_free_buffers:
364         for_each_buffer_cpu(buffer, cpu) {
365                 if (buffer->buffers[cpu])
366                         rb_free_cpu_buffer(buffer->buffers[cpu]);
367         }
368         kfree(buffer->buffers);
369
370  fail_free_buffer:
371         kfree(buffer);
372         return NULL;
373 }
374
375 /**
376  * ring_buffer_free - free a ring buffer.
377  * @buffer: the buffer to free.
378  */
379 void
380 ring_buffer_free(struct ring_buffer *buffer)
381 {
382         int cpu;
383
384         for_each_buffer_cpu(buffer, cpu)
385                 rb_free_cpu_buffer(buffer->buffers[cpu]);
386
387         kfree(buffer);
388 }
389
390 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
391
392 static void
393 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
394 {
395         struct buffer_page *page;
396         struct list_head *p;
397         unsigned i;
398
399         atomic_inc(&cpu_buffer->record_disabled);
400         synchronize_sched();
401
402         for (i = 0; i < nr_pages; i++) {
403                 BUG_ON(list_empty(&cpu_buffer->pages));
404                 p = cpu_buffer->pages.next;
405                 page = list_entry(p, struct buffer_page, list);
406                 list_del_init(&page->list);
407                 free_buffer_page(page);
408         }
409         BUG_ON(list_empty(&cpu_buffer->pages));
410
411         rb_reset_cpu(cpu_buffer);
412
413         rb_check_pages(cpu_buffer);
414
415         atomic_dec(&cpu_buffer->record_disabled);
416
417 }
418
419 static void
420 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
421                 struct list_head *pages, unsigned nr_pages)
422 {
423         struct buffer_page *page;
424         struct list_head *p;
425         unsigned i;
426
427         atomic_inc(&cpu_buffer->record_disabled);
428         synchronize_sched();
429
430         for (i = 0; i < nr_pages; i++) {
431                 BUG_ON(list_empty(pages));
432                 p = pages->next;
433                 page = list_entry(p, struct buffer_page, list);
434                 list_del_init(&page->list);
435                 list_add_tail(&page->list, &cpu_buffer->pages);
436         }
437         rb_reset_cpu(cpu_buffer);
438
439         rb_check_pages(cpu_buffer);
440
441         atomic_dec(&cpu_buffer->record_disabled);
442 }
443
444 /**
445  * ring_buffer_resize - resize the ring buffer
446  * @buffer: the buffer to resize.
447  * @size: the new size.
448  *
449  * The tracer is responsible for making sure that the buffer is
450  * not being used while changing the size.
451  * Note: We may be able to change the above requirement by using
452  *  RCU synchronizations.
453  *
454  * Minimum size is 2 * BUF_PAGE_SIZE.
455  *
456  * Returns -1 on failure.
457  */
458 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
459 {
460         struct ring_buffer_per_cpu *cpu_buffer;
461         unsigned nr_pages, rm_pages, new_pages;
462         struct buffer_page *page, *tmp;
463         unsigned long buffer_size;
464         unsigned long addr;
465         LIST_HEAD(pages);
466         int i, cpu;
467
468         size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
469         size *= BUF_PAGE_SIZE;
470         buffer_size = buffer->pages * BUF_PAGE_SIZE;
471
472         /* we need a minimum of two pages */
473         if (size < BUF_PAGE_SIZE * 2)
474                 size = BUF_PAGE_SIZE * 2;
475
476         if (size == buffer_size)
477                 return size;
478
479         mutex_lock(&buffer->mutex);
480
481         nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
482
483         if (size < buffer_size) {
484
485                 /* easy case, just free pages */
486                 BUG_ON(nr_pages >= buffer->pages);
487
488                 rm_pages = buffer->pages - nr_pages;
489
490                 for_each_buffer_cpu(buffer, cpu) {
491                         cpu_buffer = buffer->buffers[cpu];
492                         rb_remove_pages(cpu_buffer, rm_pages);
493                 }
494                 goto out;
495         }
496
497         /*
498          * This is a bit more difficult. We only want to add pages
499          * when we can allocate enough for all CPUs. We do this
500          * by allocating all the pages and storing them on a local
501          * link list. If we succeed in our allocation, then we
502          * add these pages to the cpu_buffers. Otherwise we just free
503          * them all and return -ENOMEM;
504          */
505         BUG_ON(nr_pages <= buffer->pages);
506         new_pages = nr_pages - buffer->pages;
507
508         for_each_buffer_cpu(buffer, cpu) {
509                 for (i = 0; i < new_pages; i++) {
510                         addr = __get_free_page(GFP_KERNEL);
511                         if (!addr)
512                                 goto free_pages;
513                         page = (struct buffer_page *)virt_to_page(addr);
514                         list_add(&page->list, &pages);
515                 }
516         }
517
518         for_each_buffer_cpu(buffer, cpu) {
519                 cpu_buffer = buffer->buffers[cpu];
520                 rb_insert_pages(cpu_buffer, &pages, new_pages);
521         }
522
523         BUG_ON(!list_empty(&pages));
524
525  out:
526         buffer->pages = nr_pages;
527         mutex_unlock(&buffer->mutex);
528
529         return size;
530
531  free_pages:
532         list_for_each_entry_safe(page, tmp, &pages, list) {
533                 list_del_init(&page->list);
534                 free_buffer_page(page);
535         }
536         return -ENOMEM;
537 }
538
539 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
540 {
541         return cpu_buffer->head_page == cpu_buffer->tail_page &&
542                 cpu_buffer->head == cpu_buffer->tail;
543 }
544
545 static inline int rb_null_event(struct ring_buffer_event *event)
546 {
547         return event->type == RINGBUF_TYPE_PADDING;
548 }
549
550 static inline void *rb_page_index(struct buffer_page *page, unsigned index)
551 {
552         void *addr = page_address(&page->page);
553
554         return addr + index;
555 }
556
557 static inline struct ring_buffer_event *
558 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
559 {
560         return rb_page_index(cpu_buffer->head_page,
561                              cpu_buffer->head);
562 }
563
564 static inline struct ring_buffer_event *
565 rb_iter_head_event(struct ring_buffer_iter *iter)
566 {
567         return rb_page_index(iter->head_page,
568                              iter->head);
569 }
570
571 /*
572  * When the tail hits the head and the buffer is in overwrite mode,
573  * the head jumps to the next page and all content on the previous
574  * page is discarded. But before doing so, we update the overrun
575  * variable of the buffer.
576  */
577 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
578 {
579         struct ring_buffer_event *event;
580         unsigned long head;
581
582         for (head = 0; head < rb_head_size(cpu_buffer);
583              head += rb_event_length(event)) {
584
585                 event = rb_page_index(cpu_buffer->head_page, head);
586                 BUG_ON(rb_null_event(event));
587                 /* Only count data entries */
588                 if (event->type != RINGBUF_TYPE_DATA)
589                         continue;
590                 cpu_buffer->overrun++;
591                 cpu_buffer->entries--;
592         }
593 }
594
595 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
596                                struct buffer_page **page)
597 {
598         struct list_head *p = (*page)->list.next;
599
600         if (p == &cpu_buffer->pages)
601                 p = p->next;
602
603         *page = list_entry(p, struct buffer_page, list);
604 }
605
606 static inline void
607 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
608 {
609         cpu_buffer->tail_page->time_stamp = *ts;
610         cpu_buffer->write_stamp = *ts;
611 }
612
613 static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
614 {
615         cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
616         cpu_buffer->head = 0;
617 }
618
619 static void
620 rb_reset_iter_read_page(struct ring_buffer_iter *iter)
621 {
622         iter->read_stamp = iter->head_page->time_stamp;
623         iter->head = 0;
624 }
625
626 /**
627  * ring_buffer_update_event - update event type and data
628  * @event: the even to update
629  * @type: the type of event
630  * @length: the size of the event field in the ring buffer
631  *
632  * Update the type and data fields of the event. The length
633  * is the actual size that is written to the ring buffer,
634  * and with this, we can determine what to place into the
635  * data field.
636  */
637 static inline void
638 rb_update_event(struct ring_buffer_event *event,
639                          unsigned type, unsigned length)
640 {
641         event->type = type;
642
643         switch (type) {
644
645         case RINGBUF_TYPE_PADDING:
646                 break;
647
648         case RINGBUF_TYPE_TIME_EXTEND:
649                 event->len =
650                         (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
651                         >> RB_ALIGNMENT_SHIFT;
652                 break;
653
654         case RINGBUF_TYPE_TIME_STAMP:
655                 event->len =
656                         (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
657                         >> RB_ALIGNMENT_SHIFT;
658                 break;
659
660         case RINGBUF_TYPE_DATA:
661                 length -= RB_EVNT_HDR_SIZE;
662                 if (length > RB_MAX_SMALL_DATA) {
663                         event->len = 0;
664                         event->array[0] = length;
665                 } else
666                         event->len =
667                                 (length + (RB_ALIGNMENT-1))
668                                 >> RB_ALIGNMENT_SHIFT;
669                 break;
670         default:
671                 BUG();
672         }
673 }
674
675 static inline unsigned rb_calculate_event_length(unsigned length)
676 {
677         struct ring_buffer_event event; /* Used only for sizeof array */
678
679         /* zero length can cause confusions */
680         if (!length)
681                 length = 1;
682
683         if (length > RB_MAX_SMALL_DATA)
684                 length += sizeof(event.array[0]);
685
686         length += RB_EVNT_HDR_SIZE;
687         length = ALIGN(length, RB_ALIGNMENT);
688
689         return length;
690 }
691
692 static struct ring_buffer_event *
693 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
694                   unsigned type, unsigned long length, u64 *ts)
695 {
696         struct buffer_page *head_page, *tail_page;
697         unsigned long tail;
698         struct ring_buffer *buffer = cpu_buffer->buffer;
699         struct ring_buffer_event *event;
700
701         tail_page = cpu_buffer->tail_page;
702         head_page = cpu_buffer->head_page;
703         tail = cpu_buffer->tail;
704
705         if (tail + length > BUF_PAGE_SIZE) {
706                 struct buffer_page *next_page = tail_page;
707
708                 rb_inc_page(cpu_buffer, &next_page);
709
710                 if (next_page == head_page) {
711                         if (!(buffer->flags & RB_FL_OVERWRITE))
712                                 return NULL;
713
714                         /* count overflows */
715                         rb_update_overflow(cpu_buffer);
716
717                         rb_inc_page(cpu_buffer, &head_page);
718                         cpu_buffer->head_page = head_page;
719                         rb_reset_read_page(cpu_buffer);
720                 }
721
722                 if (tail != BUF_PAGE_SIZE) {
723                         event = rb_page_index(tail_page, tail);
724                         /* page padding */
725                         event->type = RINGBUF_TYPE_PADDING;
726                 }
727
728                 tail_page->size = tail;
729                 tail_page = next_page;
730                 tail_page->size = 0;
731                 tail = 0;
732                 cpu_buffer->tail_page = tail_page;
733                 cpu_buffer->tail = tail;
734                 rb_add_stamp(cpu_buffer, ts);
735         }
736
737         BUG_ON(tail + length > BUF_PAGE_SIZE);
738
739         event = rb_page_index(tail_page, tail);
740         rb_update_event(event, type, length);
741
742         return event;
743 }
744
745 static int
746 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
747                   u64 *ts, u64 *delta)
748 {
749         struct ring_buffer_event *event;
750         static int once;
751
752         if (unlikely(*delta > (1ULL << 59) && !once++)) {
753                 printk(KERN_WARNING "Delta way too big! %llu"
754                        " ts=%llu write stamp = %llu\n",
755                        *delta, *ts, cpu_buffer->write_stamp);
756                 WARN_ON(1);
757         }
758
759         /*
760          * The delta is too big, we to add a
761          * new timestamp.
762          */
763         event = __rb_reserve_next(cpu_buffer,
764                                   RINGBUF_TYPE_TIME_EXTEND,
765                                   RB_LEN_TIME_EXTEND,
766                                   ts);
767         if (!event)
768                 return -1;
769
770         /* check to see if we went to the next page */
771         if (cpu_buffer->tail) {
772                 /* Still on same page, update timestamp */
773                 event->time_delta = *delta & TS_MASK;
774                 event->array[0] = *delta >> TS_SHIFT;
775                 /* commit the time event */
776                 cpu_buffer->tail +=
777                         rb_event_length(event);
778                 cpu_buffer->write_stamp = *ts;
779                 *delta = 0;
780         }
781
782         return 0;
783 }
784
785 static struct ring_buffer_event *
786 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
787                       unsigned type, unsigned long length)
788 {
789         struct ring_buffer_event *event;
790         u64 ts, delta;
791
792         ts = ring_buffer_time_stamp(cpu_buffer->cpu);
793
794         if (cpu_buffer->tail) {
795                 delta = ts - cpu_buffer->write_stamp;
796
797                 if (test_time_stamp(delta)) {
798                         int ret;
799
800                         ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
801                         if (ret < 0)
802                                 return NULL;
803                 }
804         } else {
805                 rb_add_stamp(cpu_buffer, &ts);
806                 delta = 0;
807         }
808
809         event = __rb_reserve_next(cpu_buffer, type, length, &ts);
810         if (!event)
811                 return NULL;
812
813         /* If the reserve went to the next page, our delta is zero */
814         if (!cpu_buffer->tail)
815                 delta = 0;
816
817         event->time_delta = delta;
818
819         return event;
820 }
821
822 /**
823  * ring_buffer_lock_reserve - reserve a part of the buffer
824  * @buffer: the ring buffer to reserve from
825  * @length: the length of the data to reserve (excluding event header)
826  * @flags: a pointer to save the interrupt flags
827  *
828  * Returns a reseverd event on the ring buffer to copy directly to.
829  * The user of this interface will need to get the body to write into
830  * and can use the ring_buffer_event_data() interface.
831  *
832  * The length is the length of the data needed, not the event length
833  * which also includes the event header.
834  *
835  * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
836  * If NULL is returned, then nothing has been allocated or locked.
837  */
838 struct ring_buffer_event *
839 ring_buffer_lock_reserve(struct ring_buffer *buffer,
840                          unsigned long length,
841                          unsigned long *flags)
842 {
843         struct ring_buffer_per_cpu *cpu_buffer;
844         struct ring_buffer_event *event;
845         int cpu;
846
847         if (atomic_read(&buffer->record_disabled))
848                 return NULL;
849
850         raw_local_irq_save(*flags);
851         cpu = raw_smp_processor_id();
852
853         if (!cpu_isset(cpu, buffer->cpumask))
854                 goto out_irq;
855
856         cpu_buffer = buffer->buffers[cpu];
857         spin_lock(&cpu_buffer->lock);
858
859         if (atomic_read(&cpu_buffer->record_disabled))
860                 goto no_record;
861
862         length = rb_calculate_event_length(length);
863         if (length > BUF_PAGE_SIZE)
864                 return NULL;
865
866         event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
867         if (!event)
868                 goto no_record;
869
870         return event;
871
872  no_record:
873         spin_unlock(&cpu_buffer->lock);
874  out_irq:
875         local_irq_restore(*flags);
876         return NULL;
877 }
878
879 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
880                       struct ring_buffer_event *event)
881 {
882         cpu_buffer->tail += rb_event_length(event);
883         cpu_buffer->tail_page->size = cpu_buffer->tail;
884         cpu_buffer->write_stamp += event->time_delta;
885         cpu_buffer->entries++;
886 }
887
888 /**
889  * ring_buffer_unlock_commit - commit a reserved
890  * @buffer: The buffer to commit to
891  * @event: The event pointer to commit.
892  * @flags: the interrupt flags received from ring_buffer_lock_reserve.
893  *
894  * This commits the data to the ring buffer, and releases any locks held.
895  *
896  * Must be paired with ring_buffer_lock_reserve.
897  */
898 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
899                               struct ring_buffer_event *event,
900                               unsigned long flags)
901 {
902         struct ring_buffer_per_cpu *cpu_buffer;
903         int cpu = raw_smp_processor_id();
904
905         cpu_buffer = buffer->buffers[cpu];
906
907         assert_spin_locked(&cpu_buffer->lock);
908
909         rb_commit(cpu_buffer, event);
910
911         spin_unlock(&cpu_buffer->lock);
912         raw_local_irq_restore(flags);
913
914         return 0;
915 }
916
917 /**
918  * ring_buffer_write - write data to the buffer without reserving
919  * @buffer: The ring buffer to write to.
920  * @length: The length of the data being written (excluding the event header)
921  * @data: The data to write to the buffer.
922  *
923  * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
924  * one function. If you already have the data to write to the buffer, it
925  * may be easier to simply call this function.
926  *
927  * Note, like ring_buffer_lock_reserve, the length is the length of the data
928  * and not the length of the event which would hold the header.
929  */
930 int ring_buffer_write(struct ring_buffer *buffer,
931                         unsigned long length,
932                         void *data)
933 {
934         struct ring_buffer_per_cpu *cpu_buffer;
935         struct ring_buffer_event *event;
936         unsigned long event_length, flags;
937         void *body;
938         int ret = -EBUSY;
939         int cpu;
940
941         if (atomic_read(&buffer->record_disabled))
942                 return -EBUSY;
943
944         local_irq_save(flags);
945         cpu = raw_smp_processor_id();
946
947         if (!cpu_isset(cpu, buffer->cpumask))
948                 goto out_irq;
949
950         cpu_buffer = buffer->buffers[cpu];
951         spin_lock(&cpu_buffer->lock);
952
953         if (atomic_read(&cpu_buffer->record_disabled))
954                 goto out;
955
956         event_length = rb_calculate_event_length(length);
957         event = rb_reserve_next_event(cpu_buffer,
958                                       RINGBUF_TYPE_DATA, event_length);
959         if (!event)
960                 goto out;
961
962         body = rb_event_data(event);
963
964         memcpy(body, data, length);
965
966         rb_commit(cpu_buffer, event);
967
968         ret = 0;
969  out:
970         spin_unlock(&cpu_buffer->lock);
971  out_irq:
972         local_irq_restore(flags);
973
974         return ret;
975 }
976
977 /**
978  * ring_buffer_lock - lock the ring buffer
979  * @buffer: The ring buffer to lock
980  * @flags: The place to store the interrupt flags
981  *
982  * This locks all the per CPU buffers.
983  *
984  * Must be unlocked by ring_buffer_unlock.
985  */
986 void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
987 {
988         struct ring_buffer_per_cpu *cpu_buffer;
989         int cpu;
990
991         local_irq_save(*flags);
992
993         for_each_buffer_cpu(buffer, cpu) {
994                 cpu_buffer = buffer->buffers[cpu];
995                 spin_lock(&cpu_buffer->lock);
996         }
997 }
998
999 /**
1000  * ring_buffer_unlock - unlock a locked buffer
1001  * @buffer: The locked buffer to unlock
1002  * @flags: The interrupt flags received by ring_buffer_lock
1003  */
1004 void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
1005 {
1006         struct ring_buffer_per_cpu *cpu_buffer;
1007         int cpu;
1008
1009         for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
1010                 if (!cpu_isset(cpu, buffer->cpumask))
1011                         continue;
1012                 cpu_buffer = buffer->buffers[cpu];
1013                 spin_unlock(&cpu_buffer->lock);
1014         }
1015
1016         local_irq_restore(flags);
1017 }
1018
1019 /**
1020  * ring_buffer_record_disable - stop all writes into the buffer
1021  * @buffer: The ring buffer to stop writes to.
1022  *
1023  * This prevents all writes to the buffer. Any attempt to write
1024  * to the buffer after this will fail and return NULL.
1025  *
1026  * The caller should call synchronize_sched() after this.
1027  */
1028 void ring_buffer_record_disable(struct ring_buffer *buffer)
1029 {
1030         atomic_inc(&buffer->record_disabled);
1031 }
1032
1033 /**
1034  * ring_buffer_record_enable - enable writes to the buffer
1035  * @buffer: The ring buffer to enable writes
1036  *
1037  * Note, multiple disables will need the same number of enables
1038  * to truely enable the writing (much like preempt_disable).
1039  */
1040 void ring_buffer_record_enable(struct ring_buffer *buffer)
1041 {
1042         atomic_dec(&buffer->record_disabled);
1043 }
1044
1045 /**
1046  * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1047  * @buffer: The ring buffer to stop writes to.
1048  * @cpu: The CPU buffer to stop
1049  *
1050  * This prevents all writes to the buffer. Any attempt to write
1051  * to the buffer after this will fail and return NULL.
1052  *
1053  * The caller should call synchronize_sched() after this.
1054  */
1055 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1056 {
1057         struct ring_buffer_per_cpu *cpu_buffer;
1058
1059         if (!cpu_isset(cpu, buffer->cpumask))
1060                 return;
1061
1062         cpu_buffer = buffer->buffers[cpu];
1063         atomic_inc(&cpu_buffer->record_disabled);
1064 }
1065
1066 /**
1067  * ring_buffer_record_enable_cpu - enable writes to the buffer
1068  * @buffer: The ring buffer to enable writes
1069  * @cpu: The CPU to enable.
1070  *
1071  * Note, multiple disables will need the same number of enables
1072  * to truely enable the writing (much like preempt_disable).
1073  */
1074 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1075 {
1076         struct ring_buffer_per_cpu *cpu_buffer;
1077
1078         if (!cpu_isset(cpu, buffer->cpumask))
1079                 return;
1080
1081         cpu_buffer = buffer->buffers[cpu];
1082         atomic_dec(&cpu_buffer->record_disabled);
1083 }
1084
1085 /**
1086  * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1087  * @buffer: The ring buffer
1088  * @cpu: The per CPU buffer to get the entries from.
1089  */
1090 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1091 {
1092         struct ring_buffer_per_cpu *cpu_buffer;
1093
1094         if (!cpu_isset(cpu, buffer->cpumask))
1095                 return 0;
1096
1097         cpu_buffer = buffer->buffers[cpu];
1098         return cpu_buffer->entries;
1099 }
1100
1101 /**
1102  * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1103  * @buffer: The ring buffer
1104  * @cpu: The per CPU buffer to get the number of overruns from
1105  */
1106 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1107 {
1108         struct ring_buffer_per_cpu *cpu_buffer;
1109
1110         if (!cpu_isset(cpu, buffer->cpumask))
1111                 return 0;
1112
1113         cpu_buffer = buffer->buffers[cpu];
1114         return cpu_buffer->overrun;
1115 }
1116
1117 /**
1118  * ring_buffer_entries - get the number of entries in a buffer
1119  * @buffer: The ring buffer
1120  *
1121  * Returns the total number of entries in the ring buffer
1122  * (all CPU entries)
1123  */
1124 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1125 {
1126         struct ring_buffer_per_cpu *cpu_buffer;
1127         unsigned long entries = 0;
1128         int cpu;
1129
1130         /* if you care about this being correct, lock the buffer */
1131         for_each_buffer_cpu(buffer, cpu) {
1132                 cpu_buffer = buffer->buffers[cpu];
1133                 entries += cpu_buffer->entries;
1134         }
1135
1136         return entries;
1137 }
1138
1139 /**
1140  * ring_buffer_overrun_cpu - get the number of overruns in buffer
1141  * @buffer: The ring buffer
1142  *
1143  * Returns the total number of overruns in the ring buffer
1144  * (all CPU entries)
1145  */
1146 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1147 {
1148         struct ring_buffer_per_cpu *cpu_buffer;
1149         unsigned long overruns = 0;
1150         int cpu;
1151
1152         /* if you care about this being correct, lock the buffer */
1153         for_each_buffer_cpu(buffer, cpu) {
1154                 cpu_buffer = buffer->buffers[cpu];
1155                 overruns += cpu_buffer->overrun;
1156         }
1157
1158         return overruns;
1159 }
1160
1161 /**
1162  * ring_buffer_iter_reset - reset an iterator
1163  * @iter: The iterator to reset
1164  *
1165  * Resets the iterator, so that it will start from the beginning
1166  * again.
1167  */
1168 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1169 {
1170         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1171
1172         iter->head_page = cpu_buffer->head_page;
1173         iter->head = cpu_buffer->head;
1174         rb_reset_iter_read_page(iter);
1175 }
1176
1177 /**
1178  * ring_buffer_iter_empty - check if an iterator has no more to read
1179  * @iter: The iterator to check
1180  */
1181 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1182 {
1183         struct ring_buffer_per_cpu *cpu_buffer;
1184
1185         cpu_buffer = iter->cpu_buffer;
1186
1187         return iter->head_page == cpu_buffer->tail_page &&
1188                 iter->head == cpu_buffer->tail;
1189 }
1190
1191 static void
1192 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1193                      struct ring_buffer_event *event)
1194 {
1195         u64 delta;
1196
1197         switch (event->type) {
1198         case RINGBUF_TYPE_PADDING:
1199                 return;
1200
1201         case RINGBUF_TYPE_TIME_EXTEND:
1202                 delta = event->array[0];
1203                 delta <<= TS_SHIFT;
1204                 delta += event->time_delta;
1205                 cpu_buffer->read_stamp += delta;
1206                 return;
1207
1208         case RINGBUF_TYPE_TIME_STAMP:
1209                 /* FIXME: not implemented */
1210                 return;
1211
1212         case RINGBUF_TYPE_DATA:
1213                 cpu_buffer->read_stamp += event->time_delta;
1214                 return;
1215
1216         default:
1217                 BUG();
1218         }
1219         return;
1220 }
1221
1222 static void
1223 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1224                           struct ring_buffer_event *event)
1225 {
1226         u64 delta;
1227
1228         switch (event->type) {
1229         case RINGBUF_TYPE_PADDING:
1230                 return;
1231
1232         case RINGBUF_TYPE_TIME_EXTEND:
1233                 delta = event->array[0];
1234                 delta <<= TS_SHIFT;
1235                 delta += event->time_delta;
1236                 iter->read_stamp += delta;
1237                 return;
1238
1239         case RINGBUF_TYPE_TIME_STAMP:
1240                 /* FIXME: not implemented */
1241                 return;
1242
1243         case RINGBUF_TYPE_DATA:
1244                 iter->read_stamp += event->time_delta;
1245                 return;
1246
1247         default:
1248                 BUG();
1249         }
1250         return;
1251 }
1252
1253 static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1254 {
1255         struct ring_buffer_event *event;
1256         unsigned length;
1257
1258         /*
1259          * Check if we are at the end of the buffer.
1260          */
1261         if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1262                 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1263                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1264                 rb_reset_read_page(cpu_buffer);
1265                 return;
1266         }
1267
1268         event = rb_head_event(cpu_buffer);
1269
1270         if (event->type == RINGBUF_TYPE_DATA)
1271                 cpu_buffer->entries--;
1272
1273         length = rb_event_length(event);
1274
1275         /*
1276          * This should not be called to advance the header if we are
1277          * at the tail of the buffer.
1278          */
1279         BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1280                (cpu_buffer->head + length > cpu_buffer->tail));
1281
1282         rb_update_read_stamp(cpu_buffer, event);
1283
1284         cpu_buffer->head += length;
1285
1286         /* check for end of page */
1287         if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1288             (cpu_buffer->head_page != cpu_buffer->tail_page))
1289                 rb_advance_head(cpu_buffer);
1290 }
1291
1292 static void rb_advance_iter(struct ring_buffer_iter *iter)
1293 {
1294         struct ring_buffer *buffer;
1295         struct ring_buffer_per_cpu *cpu_buffer;
1296         struct ring_buffer_event *event;
1297         unsigned length;
1298
1299         cpu_buffer = iter->cpu_buffer;
1300         buffer = cpu_buffer->buffer;
1301
1302         /*
1303          * Check if we are at the end of the buffer.
1304          */
1305         if (iter->head >= iter->head_page->size) {
1306                 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1307                 rb_inc_page(cpu_buffer, &iter->head_page);
1308                 rb_reset_iter_read_page(iter);
1309                 return;
1310         }
1311
1312         event = rb_iter_head_event(iter);
1313
1314         length = rb_event_length(event);
1315
1316         /*
1317          * This should not be called to advance the header if we are
1318          * at the tail of the buffer.
1319          */
1320         BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1321                (iter->head + length > cpu_buffer->tail));
1322
1323         rb_update_iter_read_stamp(iter, event);
1324
1325         iter->head += length;
1326
1327         /* check for end of page padding */
1328         if ((iter->head >= iter->head_page->size) &&
1329             (iter->head_page != cpu_buffer->tail_page))
1330                 rb_advance_iter(iter);
1331 }
1332
1333 /**
1334  * ring_buffer_peek - peek at the next event to be read
1335  * @buffer: The ring buffer to read
1336  * @cpu: The cpu to peak at
1337  * @ts: The timestamp counter of this event.
1338  *
1339  * This will return the event that will be read next, but does
1340  * not consume the data.
1341  */
1342 struct ring_buffer_event *
1343 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1344 {
1345         struct ring_buffer_per_cpu *cpu_buffer;
1346         struct ring_buffer_event *event;
1347
1348         if (!cpu_isset(cpu, buffer->cpumask))
1349                 return NULL;
1350
1351         cpu_buffer = buffer->buffers[cpu];
1352
1353  again:
1354         if (rb_per_cpu_empty(cpu_buffer))
1355                 return NULL;
1356
1357         event = rb_head_event(cpu_buffer);
1358
1359         switch (event->type) {
1360         case RINGBUF_TYPE_PADDING:
1361                 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1362                 rb_reset_read_page(cpu_buffer);
1363                 goto again;
1364
1365         case RINGBUF_TYPE_TIME_EXTEND:
1366                 /* Internal data, OK to advance */
1367                 rb_advance_head(cpu_buffer);
1368                 goto again;
1369
1370         case RINGBUF_TYPE_TIME_STAMP:
1371                 /* FIXME: not implemented */
1372                 rb_advance_head(cpu_buffer);
1373                 goto again;
1374
1375         case RINGBUF_TYPE_DATA:
1376                 if (ts) {
1377                         *ts = cpu_buffer->read_stamp + event->time_delta;
1378                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1379                 }
1380                 return event;
1381
1382         default:
1383                 BUG();
1384         }
1385
1386         return NULL;
1387 }
1388
1389 /**
1390  * ring_buffer_iter_peek - peek at the next event to be read
1391  * @iter: The ring buffer iterator
1392  * @ts: The timestamp counter of this event.
1393  *
1394  * This will return the event that will be read next, but does
1395  * not increment the iterator.
1396  */
1397 struct ring_buffer_event *
1398 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1399 {
1400         struct ring_buffer *buffer;
1401         struct ring_buffer_per_cpu *cpu_buffer;
1402         struct ring_buffer_event *event;
1403
1404         if (ring_buffer_iter_empty(iter))
1405                 return NULL;
1406
1407         cpu_buffer = iter->cpu_buffer;
1408         buffer = cpu_buffer->buffer;
1409
1410  again:
1411         if (rb_per_cpu_empty(cpu_buffer))
1412                 return NULL;
1413
1414         event = rb_iter_head_event(iter);
1415
1416         switch (event->type) {
1417         case RINGBUF_TYPE_PADDING:
1418                 rb_inc_page(cpu_buffer, &iter->head_page);
1419                 rb_reset_iter_read_page(iter);
1420                 goto again;
1421
1422         case RINGBUF_TYPE_TIME_EXTEND:
1423                 /* Internal data, OK to advance */
1424                 rb_advance_iter(iter);
1425                 goto again;
1426
1427         case RINGBUF_TYPE_TIME_STAMP:
1428                 /* FIXME: not implemented */
1429                 rb_advance_iter(iter);
1430                 goto again;
1431
1432         case RINGBUF_TYPE_DATA:
1433                 if (ts) {
1434                         *ts = iter->read_stamp + event->time_delta;
1435                         ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1436                 }
1437                 return event;
1438
1439         default:
1440                 BUG();
1441         }
1442
1443         return NULL;
1444 }
1445
1446 /**
1447  * ring_buffer_consume - return an event and consume it
1448  * @buffer: The ring buffer to get the next event from
1449  *
1450  * Returns the next event in the ring buffer, and that event is consumed.
1451  * Meaning, that sequential reads will keep returning a different event,
1452  * and eventually empty the ring buffer if the producer is slower.
1453  */
1454 struct ring_buffer_event *
1455 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1456 {
1457         struct ring_buffer_per_cpu *cpu_buffer;
1458         struct ring_buffer_event *event;
1459
1460         if (!cpu_isset(cpu, buffer->cpumask))
1461                 return NULL;
1462
1463         event = ring_buffer_peek(buffer, cpu, ts);
1464         if (!event)
1465                 return NULL;
1466
1467         cpu_buffer = buffer->buffers[cpu];
1468         rb_advance_head(cpu_buffer);
1469
1470         return event;
1471 }
1472
1473 /**
1474  * ring_buffer_read_start - start a non consuming read of the buffer
1475  * @buffer: The ring buffer to read from
1476  * @cpu: The cpu buffer to iterate over
1477  *
1478  * This starts up an iteration through the buffer. It also disables
1479  * the recording to the buffer until the reading is finished.
1480  * This prevents the reading from being corrupted. This is not
1481  * a consuming read, so a producer is not expected.
1482  *
1483  * Must be paired with ring_buffer_finish.
1484  */
1485 struct ring_buffer_iter *
1486 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1487 {
1488         struct ring_buffer_per_cpu *cpu_buffer;
1489         struct ring_buffer_iter *iter;
1490
1491         if (!cpu_isset(cpu, buffer->cpumask))
1492                 return NULL;
1493
1494         iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1495         if (!iter)
1496                 return NULL;
1497
1498         cpu_buffer = buffer->buffers[cpu];
1499
1500         iter->cpu_buffer = cpu_buffer;
1501
1502         atomic_inc(&cpu_buffer->record_disabled);
1503         synchronize_sched();
1504
1505         spin_lock(&cpu_buffer->lock);
1506         iter->head = cpu_buffer->head;
1507         iter->head_page = cpu_buffer->head_page;
1508         rb_reset_iter_read_page(iter);
1509         spin_unlock(&cpu_buffer->lock);
1510
1511         return iter;
1512 }
1513
1514 /**
1515  * ring_buffer_finish - finish reading the iterator of the buffer
1516  * @iter: The iterator retrieved by ring_buffer_start
1517  *
1518  * This re-enables the recording to the buffer, and frees the
1519  * iterator.
1520  */
1521 void
1522 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1523 {
1524         struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1525
1526         atomic_dec(&cpu_buffer->record_disabled);
1527         kfree(iter);
1528 }
1529
1530 /**
1531  * ring_buffer_read - read the next item in the ring buffer by the iterator
1532  * @iter: The ring buffer iterator
1533  * @ts: The time stamp of the event read.
1534  *
1535  * This reads the next event in the ring buffer and increments the iterator.
1536  */
1537 struct ring_buffer_event *
1538 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1539 {
1540         struct ring_buffer_event *event;
1541
1542         event = ring_buffer_iter_peek(iter, ts);
1543         if (!event)
1544                 return NULL;
1545
1546         rb_advance_iter(iter);
1547
1548         return event;
1549 }
1550
1551 /**
1552  * ring_buffer_size - return the size of the ring buffer (in bytes)
1553  * @buffer: The ring buffer.
1554  */
1555 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1556 {
1557         return BUF_PAGE_SIZE * buffer->pages;
1558 }
1559
1560 static void
1561 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1562 {
1563         cpu_buffer->head_page
1564                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1565         cpu_buffer->tail_page
1566                 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1567
1568         cpu_buffer->head = cpu_buffer->tail = 0;
1569         cpu_buffer->overrun = 0;
1570         cpu_buffer->entries = 0;
1571 }
1572
1573 /**
1574  * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1575  * @buffer: The ring buffer to reset a per cpu buffer of
1576  * @cpu: The CPU buffer to be reset
1577  */
1578 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1579 {
1580         struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1581         unsigned long flags;
1582
1583         if (!cpu_isset(cpu, buffer->cpumask))
1584                 return;
1585
1586         raw_local_irq_save(flags);
1587         spin_lock(&cpu_buffer->lock);
1588
1589         rb_reset_cpu(cpu_buffer);
1590
1591         spin_unlock(&cpu_buffer->lock);
1592         raw_local_irq_restore(flags);
1593 }
1594
1595 /**
1596  * ring_buffer_reset - reset a ring buffer
1597  * @buffer: The ring buffer to reset all cpu buffers
1598  */
1599 void ring_buffer_reset(struct ring_buffer *buffer)
1600 {
1601         unsigned long flags;
1602         int cpu;
1603
1604         ring_buffer_lock(buffer, &flags);
1605
1606         for_each_buffer_cpu(buffer, cpu)
1607                 rb_reset_cpu(buffer->buffers[cpu]);
1608
1609         ring_buffer_unlock(buffer, flags);
1610 }
1611
1612 /**
1613  * rind_buffer_empty - is the ring buffer empty?
1614  * @buffer: The ring buffer to test
1615  */
1616 int ring_buffer_empty(struct ring_buffer *buffer)
1617 {
1618         struct ring_buffer_per_cpu *cpu_buffer;
1619         int cpu;
1620
1621         /* yes this is racy, but if you don't like the race, lock the buffer */
1622         for_each_buffer_cpu(buffer, cpu) {
1623                 cpu_buffer = buffer->buffers[cpu];
1624                 if (!rb_per_cpu_empty(cpu_buffer))
1625                         return 0;
1626         }
1627         return 1;
1628 }
1629
1630 /**
1631  * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1632  * @buffer: The ring buffer
1633  * @cpu: The CPU buffer to test
1634  */
1635 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1636 {
1637         struct ring_buffer_per_cpu *cpu_buffer;
1638
1639         if (!cpu_isset(cpu, buffer->cpumask))
1640                 return 1;
1641
1642         cpu_buffer = buffer->buffers[cpu];
1643         return rb_per_cpu_empty(cpu_buffer);
1644 }
1645
1646 /**
1647  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1648  * @buffer_a: One buffer to swap with
1649  * @buffer_b: The other buffer to swap with
1650  *
1651  * This function is useful for tracers that want to take a "snapshot"
1652  * of a CPU buffer and has another back up buffer lying around.
1653  * it is expected that the tracer handles the cpu buffer not being
1654  * used at the moment.
1655  */
1656 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1657                          struct ring_buffer *buffer_b, int cpu)
1658 {
1659         struct ring_buffer_per_cpu *cpu_buffer_a;
1660         struct ring_buffer_per_cpu *cpu_buffer_b;
1661
1662         if (!cpu_isset(cpu, buffer_a->cpumask) ||
1663             !cpu_isset(cpu, buffer_b->cpumask))
1664                 return -EINVAL;
1665
1666         /* At least make sure the two buffers are somewhat the same */
1667         if (buffer_a->size != buffer_b->size ||
1668             buffer_a->pages != buffer_b->pages)
1669                 return -EINVAL;
1670
1671         cpu_buffer_a = buffer_a->buffers[cpu];
1672         cpu_buffer_b = buffer_b->buffers[cpu];
1673
1674         /*
1675          * We can't do a synchronize_sched here because this
1676          * function can be called in atomic context.
1677          * Normally this will be called from the same CPU as cpu.
1678          * If not it's up to the caller to protect this.
1679          */
1680         atomic_inc(&cpu_buffer_a->record_disabled);
1681         atomic_inc(&cpu_buffer_b->record_disabled);
1682
1683         buffer_a->buffers[cpu] = cpu_buffer_b;
1684         buffer_b->buffers[cpu] = cpu_buffer_a;
1685
1686         cpu_buffer_b->buffer = buffer_a;
1687         cpu_buffer_a->buffer = buffer_b;
1688
1689         atomic_dec(&cpu_buffer_a->record_disabled);
1690         atomic_dec(&cpu_buffer_b->record_disabled);
1691
1692         return 0;
1693 }
1694