4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 #include <linux/ring_buffer.h>
7 #include <linux/spinlock.h>
8 #include <linux/debugfs.h>
9 #include <linux/uaccess.h>
10 #include <linux/module.h>
11 #include <linux/percpu.h>
12 #include <linux/mutex.h>
13 #include <linux/sched.h> /* used for sched_clock() (for now) */
14 #include <linux/init.h>
15 #include <linux/hash.h>
16 #include <linux/list.h>
19 /* Up this if you want to test the TIME_EXTENTS and normalization */
23 u64 ring_buffer_time_stamp(int cpu)
25 /* shift to debug/test normalization and TIME_EXTENTS */
26 return sched_clock() << DEBUG_SHIFT;
29 void ring_buffer_normalize_time_stamp(int cpu, u64 *ts)
31 /* Just stupid testing the normalize function and deltas */
35 #define RB_EVNT_HDR_SIZE (sizeof(struct ring_buffer_event))
36 #define RB_ALIGNMENT_SHIFT 2
37 #define RB_ALIGNMENT (1 << RB_ALIGNMENT_SHIFT)
38 #define RB_MAX_SMALL_DATA 28
41 RB_LEN_TIME_EXTEND = 8,
42 RB_LEN_TIME_STAMP = 16,
45 /* inline for ring buffer fast paths */
46 static inline unsigned
47 rb_event_length(struct ring_buffer_event *event)
51 switch (event->type) {
52 case RINGBUF_TYPE_PADDING:
56 case RINGBUF_TYPE_TIME_EXTEND:
57 return RB_LEN_TIME_EXTEND;
59 case RINGBUF_TYPE_TIME_STAMP:
60 return RB_LEN_TIME_STAMP;
62 case RINGBUF_TYPE_DATA:
64 length = event->len << RB_ALIGNMENT_SHIFT;
66 length = event->array[0];
67 return length + RB_EVNT_HDR_SIZE;
76 * ring_buffer_event_length - return the length of the event
77 * @event: the event to get the length of
79 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
81 return rb_event_length(event);
84 /* inline for ring buffer fast paths */
86 rb_event_data(struct ring_buffer_event *event)
88 BUG_ON(event->type != RINGBUF_TYPE_DATA);
89 /* If length is in len field, then array[0] has the data */
91 return (void *)&event->array[0];
92 /* Otherwise length is in array[0] and array[1] has the data */
93 return (void *)&event->array[1];
97 * ring_buffer_event_data - return the data of the event
98 * @event: the event to get the data from
100 void *ring_buffer_event_data(struct ring_buffer_event *event)
102 return rb_event_data(event);
105 #define for_each_buffer_cpu(buffer, cpu) \
106 for_each_cpu_mask(cpu, buffer->cpumask)
109 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
110 #define TS_DELTA_TEST (~TS_MASK)
113 * This hack stolen from mm/slob.c.
114 * We can store per page timing information in the page frame of the page.
115 * Thanks to Peter Zijlstra for suggesting this idea.
120 unsigned long flags; /* mandatory */
121 atomic_t _count; /* mandatory */
122 u64 time_stamp; /* page time stamp */
123 unsigned size; /* size of page data */
124 struct list_head list; /* list of free pages */
131 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
134 static inline void free_buffer_page(struct buffer_page *bpage)
136 reset_page_mapcount(&bpage->page);
137 bpage->page.mapping = NULL;
138 __free_page(&bpage->page);
142 * We need to fit the time_stamp delta into 27 bits.
144 static inline int test_time_stamp(u64 delta)
146 if (delta & TS_DELTA_TEST)
151 #define BUF_PAGE_SIZE PAGE_SIZE
154 * head_page == tail_page && head == tail then buffer is empty.
156 struct ring_buffer_per_cpu {
158 struct ring_buffer *buffer;
160 struct lock_class_key lock_key;
161 struct list_head pages;
162 unsigned long head; /* read from head */
163 unsigned long tail; /* write to tail */
164 struct buffer_page *head_page;
165 struct buffer_page *tail_page;
166 unsigned long overrun;
167 unsigned long entries;
170 atomic_t record_disabled;
179 atomic_t record_disabled;
183 struct ring_buffer_per_cpu **buffers;
186 struct ring_buffer_iter {
187 struct ring_buffer_per_cpu *cpu_buffer;
189 struct buffer_page *head_page;
193 #define RB_WARN_ON(buffer, cond) \
194 if (unlikely(cond)) { \
195 atomic_inc(&buffer->record_disabled); \
201 * check_pages - integrity check of buffer pages
202 * @cpu_buffer: CPU buffer with pages to test
204 * As a safty measure we check to make sure the data pages have not
207 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
209 struct list_head *head = &cpu_buffer->pages;
210 struct buffer_page *page, *tmp;
212 RB_WARN_ON(cpu_buffer, head->next->prev != head);
213 RB_WARN_ON(cpu_buffer, head->prev->next != head);
215 list_for_each_entry_safe(page, tmp, head, list) {
216 RB_WARN_ON(cpu_buffer, page->list.next->prev != &page->list);
217 RB_WARN_ON(cpu_buffer, page->list.prev->next != &page->list);
223 static unsigned rb_head_size(struct ring_buffer_per_cpu *cpu_buffer)
225 return cpu_buffer->head_page->size;
228 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
231 struct list_head *head = &cpu_buffer->pages;
232 struct buffer_page *page, *tmp;
237 for (i = 0; i < nr_pages; i++) {
238 addr = __get_free_page(GFP_KERNEL);
241 page = (struct buffer_page *)virt_to_page(addr);
242 list_add(&page->list, &pages);
245 list_splice(&pages, head);
247 rb_check_pages(cpu_buffer);
252 list_for_each_entry_safe(page, tmp, &pages, list) {
253 list_del_init(&page->list);
254 free_buffer_page(page);
259 static struct ring_buffer_per_cpu *
260 rb_allocate_cpu_buffer(struct ring_buffer *buffer, int cpu)
262 struct ring_buffer_per_cpu *cpu_buffer;
265 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
266 GFP_KERNEL, cpu_to_node(cpu));
270 cpu_buffer->cpu = cpu;
271 cpu_buffer->buffer = buffer;
272 spin_lock_init(&cpu_buffer->lock);
273 INIT_LIST_HEAD(&cpu_buffer->pages);
275 ret = rb_allocate_pages(cpu_buffer, buffer->pages);
277 goto fail_free_buffer;
279 cpu_buffer->head_page
280 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
281 cpu_buffer->tail_page
282 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
291 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
293 struct list_head *head = &cpu_buffer->pages;
294 struct buffer_page *page, *tmp;
296 list_for_each_entry_safe(page, tmp, head, list) {
297 list_del_init(&page->list);
298 free_buffer_page(page);
304 * Causes compile errors if the struct buffer_page gets bigger
305 * than the struct page.
307 extern int ring_buffer_page_too_big(void);
310 * ring_buffer_alloc - allocate a new ring_buffer
311 * @size: the size in bytes that is needed.
312 * @flags: attributes to set for the ring buffer.
314 * Currently the only flag that is available is the RB_FL_OVERWRITE
315 * flag. This flag means that the buffer will overwrite old data
316 * when the buffer wraps. If this flag is not set, the buffer will
317 * drop data when the tail hits the head.
319 struct ring_buffer *ring_buffer_alloc(unsigned long size, unsigned flags)
321 struct ring_buffer *buffer;
325 /* Paranoid! Optimizes out when all is well */
326 if (sizeof(struct buffer_page) > sizeof(struct page))
327 ring_buffer_page_too_big();
330 /* keep it in its own cache line */
331 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
336 buffer->pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
337 buffer->flags = flags;
339 /* need at least two pages */
340 if (buffer->pages == 1)
343 buffer->cpumask = cpu_possible_map;
344 buffer->cpus = nr_cpu_ids;
346 bsize = sizeof(void *) * nr_cpu_ids;
347 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
349 if (!buffer->buffers)
350 goto fail_free_buffer;
352 for_each_buffer_cpu(buffer, cpu) {
353 buffer->buffers[cpu] =
354 rb_allocate_cpu_buffer(buffer, cpu);
355 if (!buffer->buffers[cpu])
356 goto fail_free_buffers;
359 mutex_init(&buffer->mutex);
364 for_each_buffer_cpu(buffer, cpu) {
365 if (buffer->buffers[cpu])
366 rb_free_cpu_buffer(buffer->buffers[cpu]);
368 kfree(buffer->buffers);
376 * ring_buffer_free - free a ring buffer.
377 * @buffer: the buffer to free.
380 ring_buffer_free(struct ring_buffer *buffer)
384 for_each_buffer_cpu(buffer, cpu)
385 rb_free_cpu_buffer(buffer->buffers[cpu]);
390 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
393 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
395 struct buffer_page *page;
399 atomic_inc(&cpu_buffer->record_disabled);
402 for (i = 0; i < nr_pages; i++) {
403 BUG_ON(list_empty(&cpu_buffer->pages));
404 p = cpu_buffer->pages.next;
405 page = list_entry(p, struct buffer_page, list);
406 list_del_init(&page->list);
407 free_buffer_page(page);
409 BUG_ON(list_empty(&cpu_buffer->pages));
411 rb_reset_cpu(cpu_buffer);
413 rb_check_pages(cpu_buffer);
415 atomic_dec(&cpu_buffer->record_disabled);
420 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
421 struct list_head *pages, unsigned nr_pages)
423 struct buffer_page *page;
427 atomic_inc(&cpu_buffer->record_disabled);
430 for (i = 0; i < nr_pages; i++) {
431 BUG_ON(list_empty(pages));
433 page = list_entry(p, struct buffer_page, list);
434 list_del_init(&page->list);
435 list_add_tail(&page->list, &cpu_buffer->pages);
437 rb_reset_cpu(cpu_buffer);
439 rb_check_pages(cpu_buffer);
441 atomic_dec(&cpu_buffer->record_disabled);
445 * ring_buffer_resize - resize the ring buffer
446 * @buffer: the buffer to resize.
447 * @size: the new size.
449 * The tracer is responsible for making sure that the buffer is
450 * not being used while changing the size.
451 * Note: We may be able to change the above requirement by using
452 * RCU synchronizations.
454 * Minimum size is 2 * BUF_PAGE_SIZE.
456 * Returns -1 on failure.
458 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size)
460 struct ring_buffer_per_cpu *cpu_buffer;
461 unsigned nr_pages, rm_pages, new_pages;
462 struct buffer_page *page, *tmp;
463 unsigned long buffer_size;
468 size = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
469 size *= BUF_PAGE_SIZE;
470 buffer_size = buffer->pages * BUF_PAGE_SIZE;
472 /* we need a minimum of two pages */
473 if (size < BUF_PAGE_SIZE * 2)
474 size = BUF_PAGE_SIZE * 2;
476 if (size == buffer_size)
479 mutex_lock(&buffer->mutex);
481 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
483 if (size < buffer_size) {
485 /* easy case, just free pages */
486 BUG_ON(nr_pages >= buffer->pages);
488 rm_pages = buffer->pages - nr_pages;
490 for_each_buffer_cpu(buffer, cpu) {
491 cpu_buffer = buffer->buffers[cpu];
492 rb_remove_pages(cpu_buffer, rm_pages);
498 * This is a bit more difficult. We only want to add pages
499 * when we can allocate enough for all CPUs. We do this
500 * by allocating all the pages and storing them on a local
501 * link list. If we succeed in our allocation, then we
502 * add these pages to the cpu_buffers. Otherwise we just free
503 * them all and return -ENOMEM;
505 BUG_ON(nr_pages <= buffer->pages);
506 new_pages = nr_pages - buffer->pages;
508 for_each_buffer_cpu(buffer, cpu) {
509 for (i = 0; i < new_pages; i++) {
510 addr = __get_free_page(GFP_KERNEL);
513 page = (struct buffer_page *)virt_to_page(addr);
514 list_add(&page->list, &pages);
518 for_each_buffer_cpu(buffer, cpu) {
519 cpu_buffer = buffer->buffers[cpu];
520 rb_insert_pages(cpu_buffer, &pages, new_pages);
523 BUG_ON(!list_empty(&pages));
526 buffer->pages = nr_pages;
527 mutex_unlock(&buffer->mutex);
532 list_for_each_entry_safe(page, tmp, &pages, list) {
533 list_del_init(&page->list);
534 free_buffer_page(page);
539 static inline int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
541 return cpu_buffer->head_page == cpu_buffer->tail_page &&
542 cpu_buffer->head == cpu_buffer->tail;
545 static inline int rb_null_event(struct ring_buffer_event *event)
547 return event->type == RINGBUF_TYPE_PADDING;
550 static inline void *rb_page_index(struct buffer_page *page, unsigned index)
552 void *addr = page_address(&page->page);
557 static inline struct ring_buffer_event *
558 rb_head_event(struct ring_buffer_per_cpu *cpu_buffer)
560 return rb_page_index(cpu_buffer->head_page,
564 static inline struct ring_buffer_event *
565 rb_iter_head_event(struct ring_buffer_iter *iter)
567 return rb_page_index(iter->head_page,
572 * When the tail hits the head and the buffer is in overwrite mode,
573 * the head jumps to the next page and all content on the previous
574 * page is discarded. But before doing so, we update the overrun
575 * variable of the buffer.
577 static void rb_update_overflow(struct ring_buffer_per_cpu *cpu_buffer)
579 struct ring_buffer_event *event;
582 for (head = 0; head < rb_head_size(cpu_buffer);
583 head += rb_event_length(event)) {
585 event = rb_page_index(cpu_buffer->head_page, head);
586 BUG_ON(rb_null_event(event));
587 /* Only count data entries */
588 if (event->type != RINGBUF_TYPE_DATA)
590 cpu_buffer->overrun++;
591 cpu_buffer->entries--;
595 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
596 struct buffer_page **page)
598 struct list_head *p = (*page)->list.next;
600 if (p == &cpu_buffer->pages)
603 *page = list_entry(p, struct buffer_page, list);
607 rb_add_stamp(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts)
609 cpu_buffer->tail_page->time_stamp = *ts;
610 cpu_buffer->write_stamp = *ts;
613 static void rb_reset_read_page(struct ring_buffer_per_cpu *cpu_buffer)
615 cpu_buffer->read_stamp = cpu_buffer->head_page->time_stamp;
616 cpu_buffer->head = 0;
620 rb_reset_iter_read_page(struct ring_buffer_iter *iter)
622 iter->read_stamp = iter->head_page->time_stamp;
627 * ring_buffer_update_event - update event type and data
628 * @event: the even to update
629 * @type: the type of event
630 * @length: the size of the event field in the ring buffer
632 * Update the type and data fields of the event. The length
633 * is the actual size that is written to the ring buffer,
634 * and with this, we can determine what to place into the
638 rb_update_event(struct ring_buffer_event *event,
639 unsigned type, unsigned length)
645 case RINGBUF_TYPE_PADDING:
648 case RINGBUF_TYPE_TIME_EXTEND:
650 (RB_LEN_TIME_EXTEND + (RB_ALIGNMENT-1))
651 >> RB_ALIGNMENT_SHIFT;
654 case RINGBUF_TYPE_TIME_STAMP:
656 (RB_LEN_TIME_STAMP + (RB_ALIGNMENT-1))
657 >> RB_ALIGNMENT_SHIFT;
660 case RINGBUF_TYPE_DATA:
661 length -= RB_EVNT_HDR_SIZE;
662 if (length > RB_MAX_SMALL_DATA) {
664 event->array[0] = length;
667 (length + (RB_ALIGNMENT-1))
668 >> RB_ALIGNMENT_SHIFT;
675 static inline unsigned rb_calculate_event_length(unsigned length)
677 struct ring_buffer_event event; /* Used only for sizeof array */
679 /* zero length can cause confusions */
683 if (length > RB_MAX_SMALL_DATA)
684 length += sizeof(event.array[0]);
686 length += RB_EVNT_HDR_SIZE;
687 length = ALIGN(length, RB_ALIGNMENT);
692 static struct ring_buffer_event *
693 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
694 unsigned type, unsigned long length, u64 *ts)
696 struct buffer_page *head_page, *tail_page;
698 struct ring_buffer *buffer = cpu_buffer->buffer;
699 struct ring_buffer_event *event;
701 tail_page = cpu_buffer->tail_page;
702 head_page = cpu_buffer->head_page;
703 tail = cpu_buffer->tail;
705 if (tail + length > BUF_PAGE_SIZE) {
706 struct buffer_page *next_page = tail_page;
708 rb_inc_page(cpu_buffer, &next_page);
710 if (next_page == head_page) {
711 if (!(buffer->flags & RB_FL_OVERWRITE))
714 /* count overflows */
715 rb_update_overflow(cpu_buffer);
717 rb_inc_page(cpu_buffer, &head_page);
718 cpu_buffer->head_page = head_page;
719 rb_reset_read_page(cpu_buffer);
722 if (tail != BUF_PAGE_SIZE) {
723 event = rb_page_index(tail_page, tail);
725 event->type = RINGBUF_TYPE_PADDING;
728 tail_page->size = tail;
729 tail_page = next_page;
732 cpu_buffer->tail_page = tail_page;
733 cpu_buffer->tail = tail;
734 rb_add_stamp(cpu_buffer, ts);
737 BUG_ON(tail + length > BUF_PAGE_SIZE);
739 event = rb_page_index(tail_page, tail);
740 rb_update_event(event, type, length);
746 rb_add_time_stamp(struct ring_buffer_per_cpu *cpu_buffer,
749 struct ring_buffer_event *event;
752 if (unlikely(*delta > (1ULL << 59) && !once++)) {
753 printk(KERN_WARNING "Delta way too big! %llu"
754 " ts=%llu write stamp = %llu\n",
755 *delta, *ts, cpu_buffer->write_stamp);
760 * The delta is too big, we to add a
763 event = __rb_reserve_next(cpu_buffer,
764 RINGBUF_TYPE_TIME_EXTEND,
770 /* check to see if we went to the next page */
771 if (cpu_buffer->tail) {
772 /* Still on same page, update timestamp */
773 event->time_delta = *delta & TS_MASK;
774 event->array[0] = *delta >> TS_SHIFT;
775 /* commit the time event */
777 rb_event_length(event);
778 cpu_buffer->write_stamp = *ts;
785 static struct ring_buffer_event *
786 rb_reserve_next_event(struct ring_buffer_per_cpu *cpu_buffer,
787 unsigned type, unsigned long length)
789 struct ring_buffer_event *event;
792 ts = ring_buffer_time_stamp(cpu_buffer->cpu);
794 if (cpu_buffer->tail) {
795 delta = ts - cpu_buffer->write_stamp;
797 if (test_time_stamp(delta)) {
800 ret = rb_add_time_stamp(cpu_buffer, &ts, &delta);
805 rb_add_stamp(cpu_buffer, &ts);
809 event = __rb_reserve_next(cpu_buffer, type, length, &ts);
813 /* If the reserve went to the next page, our delta is zero */
814 if (!cpu_buffer->tail)
817 event->time_delta = delta;
823 * ring_buffer_lock_reserve - reserve a part of the buffer
824 * @buffer: the ring buffer to reserve from
825 * @length: the length of the data to reserve (excluding event header)
826 * @flags: a pointer to save the interrupt flags
828 * Returns a reseverd event on the ring buffer to copy directly to.
829 * The user of this interface will need to get the body to write into
830 * and can use the ring_buffer_event_data() interface.
832 * The length is the length of the data needed, not the event length
833 * which also includes the event header.
835 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
836 * If NULL is returned, then nothing has been allocated or locked.
838 struct ring_buffer_event *
839 ring_buffer_lock_reserve(struct ring_buffer *buffer,
840 unsigned long length,
841 unsigned long *flags)
843 struct ring_buffer_per_cpu *cpu_buffer;
844 struct ring_buffer_event *event;
847 if (atomic_read(&buffer->record_disabled))
850 raw_local_irq_save(*flags);
851 cpu = raw_smp_processor_id();
853 if (!cpu_isset(cpu, buffer->cpumask))
856 cpu_buffer = buffer->buffers[cpu];
857 spin_lock(&cpu_buffer->lock);
859 if (atomic_read(&cpu_buffer->record_disabled))
862 length = rb_calculate_event_length(length);
863 if (length > BUF_PAGE_SIZE)
866 event = rb_reserve_next_event(cpu_buffer, RINGBUF_TYPE_DATA, length);
873 spin_unlock(&cpu_buffer->lock);
875 local_irq_restore(*flags);
879 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
880 struct ring_buffer_event *event)
882 cpu_buffer->tail += rb_event_length(event);
883 cpu_buffer->tail_page->size = cpu_buffer->tail;
884 cpu_buffer->write_stamp += event->time_delta;
885 cpu_buffer->entries++;
889 * ring_buffer_unlock_commit - commit a reserved
890 * @buffer: The buffer to commit to
891 * @event: The event pointer to commit.
892 * @flags: the interrupt flags received from ring_buffer_lock_reserve.
894 * This commits the data to the ring buffer, and releases any locks held.
896 * Must be paired with ring_buffer_lock_reserve.
898 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
899 struct ring_buffer_event *event,
902 struct ring_buffer_per_cpu *cpu_buffer;
903 int cpu = raw_smp_processor_id();
905 cpu_buffer = buffer->buffers[cpu];
907 assert_spin_locked(&cpu_buffer->lock);
909 rb_commit(cpu_buffer, event);
911 spin_unlock(&cpu_buffer->lock);
912 raw_local_irq_restore(flags);
918 * ring_buffer_write - write data to the buffer without reserving
919 * @buffer: The ring buffer to write to.
920 * @length: The length of the data being written (excluding the event header)
921 * @data: The data to write to the buffer.
923 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
924 * one function. If you already have the data to write to the buffer, it
925 * may be easier to simply call this function.
927 * Note, like ring_buffer_lock_reserve, the length is the length of the data
928 * and not the length of the event which would hold the header.
930 int ring_buffer_write(struct ring_buffer *buffer,
931 unsigned long length,
934 struct ring_buffer_per_cpu *cpu_buffer;
935 struct ring_buffer_event *event;
936 unsigned long event_length, flags;
941 if (atomic_read(&buffer->record_disabled))
944 local_irq_save(flags);
945 cpu = raw_smp_processor_id();
947 if (!cpu_isset(cpu, buffer->cpumask))
950 cpu_buffer = buffer->buffers[cpu];
951 spin_lock(&cpu_buffer->lock);
953 if (atomic_read(&cpu_buffer->record_disabled))
956 event_length = rb_calculate_event_length(length);
957 event = rb_reserve_next_event(cpu_buffer,
958 RINGBUF_TYPE_DATA, event_length);
962 body = rb_event_data(event);
964 memcpy(body, data, length);
966 rb_commit(cpu_buffer, event);
970 spin_unlock(&cpu_buffer->lock);
972 local_irq_restore(flags);
978 * ring_buffer_lock - lock the ring buffer
979 * @buffer: The ring buffer to lock
980 * @flags: The place to store the interrupt flags
982 * This locks all the per CPU buffers.
984 * Must be unlocked by ring_buffer_unlock.
986 void ring_buffer_lock(struct ring_buffer *buffer, unsigned long *flags)
988 struct ring_buffer_per_cpu *cpu_buffer;
991 local_irq_save(*flags);
993 for_each_buffer_cpu(buffer, cpu) {
994 cpu_buffer = buffer->buffers[cpu];
995 spin_lock(&cpu_buffer->lock);
1000 * ring_buffer_unlock - unlock a locked buffer
1001 * @buffer: The locked buffer to unlock
1002 * @flags: The interrupt flags received by ring_buffer_lock
1004 void ring_buffer_unlock(struct ring_buffer *buffer, unsigned long flags)
1006 struct ring_buffer_per_cpu *cpu_buffer;
1009 for (cpu = buffer->cpus - 1; cpu >= 0; cpu--) {
1010 if (!cpu_isset(cpu, buffer->cpumask))
1012 cpu_buffer = buffer->buffers[cpu];
1013 spin_unlock(&cpu_buffer->lock);
1016 local_irq_restore(flags);
1020 * ring_buffer_record_disable - stop all writes into the buffer
1021 * @buffer: The ring buffer to stop writes to.
1023 * This prevents all writes to the buffer. Any attempt to write
1024 * to the buffer after this will fail and return NULL.
1026 * The caller should call synchronize_sched() after this.
1028 void ring_buffer_record_disable(struct ring_buffer *buffer)
1030 atomic_inc(&buffer->record_disabled);
1034 * ring_buffer_record_enable - enable writes to the buffer
1035 * @buffer: The ring buffer to enable writes
1037 * Note, multiple disables will need the same number of enables
1038 * to truely enable the writing (much like preempt_disable).
1040 void ring_buffer_record_enable(struct ring_buffer *buffer)
1042 atomic_dec(&buffer->record_disabled);
1046 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
1047 * @buffer: The ring buffer to stop writes to.
1048 * @cpu: The CPU buffer to stop
1050 * This prevents all writes to the buffer. Any attempt to write
1051 * to the buffer after this will fail and return NULL.
1053 * The caller should call synchronize_sched() after this.
1055 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
1057 struct ring_buffer_per_cpu *cpu_buffer;
1059 if (!cpu_isset(cpu, buffer->cpumask))
1062 cpu_buffer = buffer->buffers[cpu];
1063 atomic_inc(&cpu_buffer->record_disabled);
1067 * ring_buffer_record_enable_cpu - enable writes to the buffer
1068 * @buffer: The ring buffer to enable writes
1069 * @cpu: The CPU to enable.
1071 * Note, multiple disables will need the same number of enables
1072 * to truely enable the writing (much like preempt_disable).
1074 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
1076 struct ring_buffer_per_cpu *cpu_buffer;
1078 if (!cpu_isset(cpu, buffer->cpumask))
1081 cpu_buffer = buffer->buffers[cpu];
1082 atomic_dec(&cpu_buffer->record_disabled);
1086 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
1087 * @buffer: The ring buffer
1088 * @cpu: The per CPU buffer to get the entries from.
1090 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
1092 struct ring_buffer_per_cpu *cpu_buffer;
1094 if (!cpu_isset(cpu, buffer->cpumask))
1097 cpu_buffer = buffer->buffers[cpu];
1098 return cpu_buffer->entries;
1102 * ring_buffer_overrun_cpu - get the number of overruns in a cpu_buffer
1103 * @buffer: The ring buffer
1104 * @cpu: The per CPU buffer to get the number of overruns from
1106 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
1108 struct ring_buffer_per_cpu *cpu_buffer;
1110 if (!cpu_isset(cpu, buffer->cpumask))
1113 cpu_buffer = buffer->buffers[cpu];
1114 return cpu_buffer->overrun;
1118 * ring_buffer_entries - get the number of entries in a buffer
1119 * @buffer: The ring buffer
1121 * Returns the total number of entries in the ring buffer
1124 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
1126 struct ring_buffer_per_cpu *cpu_buffer;
1127 unsigned long entries = 0;
1130 /* if you care about this being correct, lock the buffer */
1131 for_each_buffer_cpu(buffer, cpu) {
1132 cpu_buffer = buffer->buffers[cpu];
1133 entries += cpu_buffer->entries;
1140 * ring_buffer_overrun_cpu - get the number of overruns in buffer
1141 * @buffer: The ring buffer
1143 * Returns the total number of overruns in the ring buffer
1146 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
1148 struct ring_buffer_per_cpu *cpu_buffer;
1149 unsigned long overruns = 0;
1152 /* if you care about this being correct, lock the buffer */
1153 for_each_buffer_cpu(buffer, cpu) {
1154 cpu_buffer = buffer->buffers[cpu];
1155 overruns += cpu_buffer->overrun;
1162 * ring_buffer_iter_reset - reset an iterator
1163 * @iter: The iterator to reset
1165 * Resets the iterator, so that it will start from the beginning
1168 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
1170 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1172 iter->head_page = cpu_buffer->head_page;
1173 iter->head = cpu_buffer->head;
1174 rb_reset_iter_read_page(iter);
1178 * ring_buffer_iter_empty - check if an iterator has no more to read
1179 * @iter: The iterator to check
1181 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
1183 struct ring_buffer_per_cpu *cpu_buffer;
1185 cpu_buffer = iter->cpu_buffer;
1187 return iter->head_page == cpu_buffer->tail_page &&
1188 iter->head == cpu_buffer->tail;
1192 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
1193 struct ring_buffer_event *event)
1197 switch (event->type) {
1198 case RINGBUF_TYPE_PADDING:
1201 case RINGBUF_TYPE_TIME_EXTEND:
1202 delta = event->array[0];
1204 delta += event->time_delta;
1205 cpu_buffer->read_stamp += delta;
1208 case RINGBUF_TYPE_TIME_STAMP:
1209 /* FIXME: not implemented */
1212 case RINGBUF_TYPE_DATA:
1213 cpu_buffer->read_stamp += event->time_delta;
1223 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
1224 struct ring_buffer_event *event)
1228 switch (event->type) {
1229 case RINGBUF_TYPE_PADDING:
1232 case RINGBUF_TYPE_TIME_EXTEND:
1233 delta = event->array[0];
1235 delta += event->time_delta;
1236 iter->read_stamp += delta;
1239 case RINGBUF_TYPE_TIME_STAMP:
1240 /* FIXME: not implemented */
1243 case RINGBUF_TYPE_DATA:
1244 iter->read_stamp += event->time_delta;
1253 static void rb_advance_head(struct ring_buffer_per_cpu *cpu_buffer)
1255 struct ring_buffer_event *event;
1259 * Check if we are at the end of the buffer.
1261 if (cpu_buffer->head >= cpu_buffer->head_page->size) {
1262 BUG_ON(cpu_buffer->head_page == cpu_buffer->tail_page);
1263 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1264 rb_reset_read_page(cpu_buffer);
1268 event = rb_head_event(cpu_buffer);
1270 if (event->type == RINGBUF_TYPE_DATA)
1271 cpu_buffer->entries--;
1273 length = rb_event_length(event);
1276 * This should not be called to advance the header if we are
1277 * at the tail of the buffer.
1279 BUG_ON((cpu_buffer->head_page == cpu_buffer->tail_page) &&
1280 (cpu_buffer->head + length > cpu_buffer->tail));
1282 rb_update_read_stamp(cpu_buffer, event);
1284 cpu_buffer->head += length;
1286 /* check for end of page */
1287 if ((cpu_buffer->head >= cpu_buffer->head_page->size) &&
1288 (cpu_buffer->head_page != cpu_buffer->tail_page))
1289 rb_advance_head(cpu_buffer);
1292 static void rb_advance_iter(struct ring_buffer_iter *iter)
1294 struct ring_buffer *buffer;
1295 struct ring_buffer_per_cpu *cpu_buffer;
1296 struct ring_buffer_event *event;
1299 cpu_buffer = iter->cpu_buffer;
1300 buffer = cpu_buffer->buffer;
1303 * Check if we are at the end of the buffer.
1305 if (iter->head >= iter->head_page->size) {
1306 BUG_ON(iter->head_page == cpu_buffer->tail_page);
1307 rb_inc_page(cpu_buffer, &iter->head_page);
1308 rb_reset_iter_read_page(iter);
1312 event = rb_iter_head_event(iter);
1314 length = rb_event_length(event);
1317 * This should not be called to advance the header if we are
1318 * at the tail of the buffer.
1320 BUG_ON((iter->head_page == cpu_buffer->tail_page) &&
1321 (iter->head + length > cpu_buffer->tail));
1323 rb_update_iter_read_stamp(iter, event);
1325 iter->head += length;
1327 /* check for end of page padding */
1328 if ((iter->head >= iter->head_page->size) &&
1329 (iter->head_page != cpu_buffer->tail_page))
1330 rb_advance_iter(iter);
1334 * ring_buffer_peek - peek at the next event to be read
1335 * @buffer: The ring buffer to read
1336 * @cpu: The cpu to peak at
1337 * @ts: The timestamp counter of this event.
1339 * This will return the event that will be read next, but does
1340 * not consume the data.
1342 struct ring_buffer_event *
1343 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts)
1345 struct ring_buffer_per_cpu *cpu_buffer;
1346 struct ring_buffer_event *event;
1348 if (!cpu_isset(cpu, buffer->cpumask))
1351 cpu_buffer = buffer->buffers[cpu];
1354 if (rb_per_cpu_empty(cpu_buffer))
1357 event = rb_head_event(cpu_buffer);
1359 switch (event->type) {
1360 case RINGBUF_TYPE_PADDING:
1361 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
1362 rb_reset_read_page(cpu_buffer);
1365 case RINGBUF_TYPE_TIME_EXTEND:
1366 /* Internal data, OK to advance */
1367 rb_advance_head(cpu_buffer);
1370 case RINGBUF_TYPE_TIME_STAMP:
1371 /* FIXME: not implemented */
1372 rb_advance_head(cpu_buffer);
1375 case RINGBUF_TYPE_DATA:
1377 *ts = cpu_buffer->read_stamp + event->time_delta;
1378 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1390 * ring_buffer_iter_peek - peek at the next event to be read
1391 * @iter: The ring buffer iterator
1392 * @ts: The timestamp counter of this event.
1394 * This will return the event that will be read next, but does
1395 * not increment the iterator.
1397 struct ring_buffer_event *
1398 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
1400 struct ring_buffer *buffer;
1401 struct ring_buffer_per_cpu *cpu_buffer;
1402 struct ring_buffer_event *event;
1404 if (ring_buffer_iter_empty(iter))
1407 cpu_buffer = iter->cpu_buffer;
1408 buffer = cpu_buffer->buffer;
1411 if (rb_per_cpu_empty(cpu_buffer))
1414 event = rb_iter_head_event(iter);
1416 switch (event->type) {
1417 case RINGBUF_TYPE_PADDING:
1418 rb_inc_page(cpu_buffer, &iter->head_page);
1419 rb_reset_iter_read_page(iter);
1422 case RINGBUF_TYPE_TIME_EXTEND:
1423 /* Internal data, OK to advance */
1424 rb_advance_iter(iter);
1427 case RINGBUF_TYPE_TIME_STAMP:
1428 /* FIXME: not implemented */
1429 rb_advance_iter(iter);
1432 case RINGBUF_TYPE_DATA:
1434 *ts = iter->read_stamp + event->time_delta;
1435 ring_buffer_normalize_time_stamp(cpu_buffer->cpu, ts);
1447 * ring_buffer_consume - return an event and consume it
1448 * @buffer: The ring buffer to get the next event from
1450 * Returns the next event in the ring buffer, and that event is consumed.
1451 * Meaning, that sequential reads will keep returning a different event,
1452 * and eventually empty the ring buffer if the producer is slower.
1454 struct ring_buffer_event *
1455 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts)
1457 struct ring_buffer_per_cpu *cpu_buffer;
1458 struct ring_buffer_event *event;
1460 if (!cpu_isset(cpu, buffer->cpumask))
1463 event = ring_buffer_peek(buffer, cpu, ts);
1467 cpu_buffer = buffer->buffers[cpu];
1468 rb_advance_head(cpu_buffer);
1474 * ring_buffer_read_start - start a non consuming read of the buffer
1475 * @buffer: The ring buffer to read from
1476 * @cpu: The cpu buffer to iterate over
1478 * This starts up an iteration through the buffer. It also disables
1479 * the recording to the buffer until the reading is finished.
1480 * This prevents the reading from being corrupted. This is not
1481 * a consuming read, so a producer is not expected.
1483 * Must be paired with ring_buffer_finish.
1485 struct ring_buffer_iter *
1486 ring_buffer_read_start(struct ring_buffer *buffer, int cpu)
1488 struct ring_buffer_per_cpu *cpu_buffer;
1489 struct ring_buffer_iter *iter;
1491 if (!cpu_isset(cpu, buffer->cpumask))
1494 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
1498 cpu_buffer = buffer->buffers[cpu];
1500 iter->cpu_buffer = cpu_buffer;
1502 atomic_inc(&cpu_buffer->record_disabled);
1503 synchronize_sched();
1505 spin_lock(&cpu_buffer->lock);
1506 iter->head = cpu_buffer->head;
1507 iter->head_page = cpu_buffer->head_page;
1508 rb_reset_iter_read_page(iter);
1509 spin_unlock(&cpu_buffer->lock);
1515 * ring_buffer_finish - finish reading the iterator of the buffer
1516 * @iter: The iterator retrieved by ring_buffer_start
1518 * This re-enables the recording to the buffer, and frees the
1522 ring_buffer_read_finish(struct ring_buffer_iter *iter)
1524 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1526 atomic_dec(&cpu_buffer->record_disabled);
1531 * ring_buffer_read - read the next item in the ring buffer by the iterator
1532 * @iter: The ring buffer iterator
1533 * @ts: The time stamp of the event read.
1535 * This reads the next event in the ring buffer and increments the iterator.
1537 struct ring_buffer_event *
1538 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
1540 struct ring_buffer_event *event;
1542 event = ring_buffer_iter_peek(iter, ts);
1546 rb_advance_iter(iter);
1552 * ring_buffer_size - return the size of the ring buffer (in bytes)
1553 * @buffer: The ring buffer.
1555 unsigned long ring_buffer_size(struct ring_buffer *buffer)
1557 return BUF_PAGE_SIZE * buffer->pages;
1561 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
1563 cpu_buffer->head_page
1564 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1565 cpu_buffer->tail_page
1566 = list_entry(cpu_buffer->pages.next, struct buffer_page, list);
1568 cpu_buffer->head = cpu_buffer->tail = 0;
1569 cpu_buffer->overrun = 0;
1570 cpu_buffer->entries = 0;
1574 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
1575 * @buffer: The ring buffer to reset a per cpu buffer of
1576 * @cpu: The CPU buffer to be reset
1578 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
1580 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
1581 unsigned long flags;
1583 if (!cpu_isset(cpu, buffer->cpumask))
1586 raw_local_irq_save(flags);
1587 spin_lock(&cpu_buffer->lock);
1589 rb_reset_cpu(cpu_buffer);
1591 spin_unlock(&cpu_buffer->lock);
1592 raw_local_irq_restore(flags);
1596 * ring_buffer_reset - reset a ring buffer
1597 * @buffer: The ring buffer to reset all cpu buffers
1599 void ring_buffer_reset(struct ring_buffer *buffer)
1601 unsigned long flags;
1604 ring_buffer_lock(buffer, &flags);
1606 for_each_buffer_cpu(buffer, cpu)
1607 rb_reset_cpu(buffer->buffers[cpu]);
1609 ring_buffer_unlock(buffer, flags);
1613 * rind_buffer_empty - is the ring buffer empty?
1614 * @buffer: The ring buffer to test
1616 int ring_buffer_empty(struct ring_buffer *buffer)
1618 struct ring_buffer_per_cpu *cpu_buffer;
1621 /* yes this is racy, but if you don't like the race, lock the buffer */
1622 for_each_buffer_cpu(buffer, cpu) {
1623 cpu_buffer = buffer->buffers[cpu];
1624 if (!rb_per_cpu_empty(cpu_buffer))
1631 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
1632 * @buffer: The ring buffer
1633 * @cpu: The CPU buffer to test
1635 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
1637 struct ring_buffer_per_cpu *cpu_buffer;
1639 if (!cpu_isset(cpu, buffer->cpumask))
1642 cpu_buffer = buffer->buffers[cpu];
1643 return rb_per_cpu_empty(cpu_buffer);
1647 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
1648 * @buffer_a: One buffer to swap with
1649 * @buffer_b: The other buffer to swap with
1651 * This function is useful for tracers that want to take a "snapshot"
1652 * of a CPU buffer and has another back up buffer lying around.
1653 * it is expected that the tracer handles the cpu buffer not being
1654 * used at the moment.
1656 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
1657 struct ring_buffer *buffer_b, int cpu)
1659 struct ring_buffer_per_cpu *cpu_buffer_a;
1660 struct ring_buffer_per_cpu *cpu_buffer_b;
1662 if (!cpu_isset(cpu, buffer_a->cpumask) ||
1663 !cpu_isset(cpu, buffer_b->cpumask))
1666 /* At least make sure the two buffers are somewhat the same */
1667 if (buffer_a->size != buffer_b->size ||
1668 buffer_a->pages != buffer_b->pages)
1671 cpu_buffer_a = buffer_a->buffers[cpu];
1672 cpu_buffer_b = buffer_b->buffers[cpu];
1675 * We can't do a synchronize_sched here because this
1676 * function can be called in atomic context.
1677 * Normally this will be called from the same CPU as cpu.
1678 * If not it's up to the caller to protect this.
1680 atomic_inc(&cpu_buffer_a->record_disabled);
1681 atomic_inc(&cpu_buffer_b->record_disabled);
1683 buffer_a->buffers[cpu] = cpu_buffer_b;
1684 buffer_b->buffers[cpu] = cpu_buffer_a;
1686 cpu_buffer_b->buffer = buffer_a;
1687 cpu_buffer_a->buffer = buffer_b;
1689 atomic_dec(&cpu_buffer_a->record_disabled);
1690 atomic_dec(&cpu_buffer_b->record_disabled);