1 /*
2 * Generic ring buffer
3 *
4 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
5 */
6 #include <linux/ftrace_event.h>
7 #include <linux/ring_buffer.h>
8 #include <linux/trace_clock.h>
9 #include <linux/trace_seq.h>
10 #include <linux/spinlock.h>
11 #include <linux/irq_work.h>
12 #include <linux/debugfs.h>
13 #include <linux/uaccess.h>
14 #include <linux/hardirq.h>
15 #include <linux/kthread.h> /* for self test */
16 #include <linux/kmemcheck.h>
17 #include <linux/module.h>
18 #include <linux/percpu.h>
19 #include <linux/mutex.h>
20 #include <linux/delay.h>
21 #include <linux/slab.h>
22 #include <linux/init.h>
23 #include <linux/hash.h>
24 #include <linux/list.h>
25 #include <linux/cpu.h>
26 #include <linux/fs.h>
27
28 #include <asm/local.h>
29
30 static void update_pages_handler(struct work_struct *work);
31
32 /*
33 * The ring buffer header is special. We must manually up keep it.
34 */
ring_buffer_print_entry_header(struct trace_seq * s)35 int ring_buffer_print_entry_header(struct trace_seq *s)
36 {
37 int ret;
38
39 ret = trace_seq_puts(s, "# compressed entry header\n");
40 ret = trace_seq_puts(s, "\ttype_len : 5 bits\n");
41 ret = trace_seq_puts(s, "\ttime_delta : 27 bits\n");
42 ret = trace_seq_puts(s, "\tarray : 32 bits\n");
43 ret = trace_seq_putc(s, '\n');
44 ret = trace_seq_printf(s, "\tpadding : type == %d\n",
45 RINGBUF_TYPE_PADDING);
46 ret = trace_seq_printf(s, "\ttime_extend : type == %d\n",
47 RINGBUF_TYPE_TIME_EXTEND);
48 ret = trace_seq_printf(s, "\tdata max type_len == %d\n",
49 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
50
51 return ret;
52 }
53
54 /*
55 * The ring buffer is made up of a list of pages. A separate list of pages is
56 * allocated for each CPU. A writer may only write to a buffer that is
57 * associated with the CPU it is currently executing on. A reader may read
58 * from any per cpu buffer.
59 *
60 * The reader is special. For each per cpu buffer, the reader has its own
61 * reader page. When a reader has read the entire reader page, this reader
62 * page is swapped with another page in the ring buffer.
63 *
64 * Now, as long as the writer is off the reader page, the reader can do what
65 * ever it wants with that page. The writer will never write to that page
66 * again (as long as it is out of the ring buffer).
67 *
68 * Here's some silly ASCII art.
69 *
70 * +------+
71 * |reader| RING BUFFER
72 * |page |
73 * +------+ +---+ +---+ +---+
74 * | |-->| |-->| |
75 * +---+ +---+ +---+
76 * ^ |
77 * | |
78 * +---------------+
79 *
80 *
81 * +------+
82 * |reader| RING BUFFER
83 * |page |------------------v
84 * +------+ +---+ +---+ +---+
85 * | |-->| |-->| |
86 * +---+ +---+ +---+
87 * ^ |
88 * | |
89 * +---------------+
90 *
91 *
92 * +------+
93 * |reader| RING BUFFER
94 * |page |------------------v
95 * +------+ +---+ +---+ +---+
96 * ^ | |-->| |-->| |
97 * | +---+ +---+ +---+
98 * | |
99 * | |
100 * +------------------------------+
101 *
102 *
103 * +------+
104 * |buffer| RING BUFFER
105 * |page |------------------v
106 * +------+ +---+ +---+ +---+
107 * ^ | | | |-->| |
108 * | New +---+ +---+ +---+
109 * | Reader------^ |
110 * | page |
111 * +------------------------------+
112 *
113 *
114 * After we make this swap, the reader can hand this page off to the splice
115 * code and be done with it. It can even allocate a new page if it needs to
116 * and swap that into the ring buffer.
117 *
118 * We will be using cmpxchg soon to make all this lockless.
119 *
120 */
121
122 /*
123 * A fast way to enable or disable all ring buffers is to
124 * call tracing_on or tracing_off. Turning off the ring buffers
125 * prevents all ring buffers from being recorded to.
126 * Turning this switch on, makes it OK to write to the
127 * ring buffer, if the ring buffer is enabled itself.
128 *
129 * There's three layers that must be on in order to write
130 * to the ring buffer.
131 *
132 * 1) This global flag must be set.
133 * 2) The ring buffer must be enabled for recording.
134 * 3) The per cpu buffer must be enabled for recording.
135 *
136 * In case of an anomaly, this global flag has a bit set that
137 * will permantly disable all ring buffers.
138 */
139
140 /*
141 * Global flag to disable all recording to ring buffers
142 * This has two bits: ON, DISABLED
143 *
144 * ON DISABLED
145 * ---- ----------
146 * 0 0 : ring buffers are off
147 * 1 0 : ring buffers are on
148 * X 1 : ring buffers are permanently disabled
149 */
150
151 enum {
152 RB_BUFFERS_ON_BIT = 0,
153 RB_BUFFERS_DISABLED_BIT = 1,
154 };
155
156 enum {
157 RB_BUFFERS_ON = 1 << RB_BUFFERS_ON_BIT,
158 RB_BUFFERS_DISABLED = 1 << RB_BUFFERS_DISABLED_BIT,
159 };
160
161 static unsigned long ring_buffer_flags __read_mostly = RB_BUFFERS_ON;
162
163 /* Used for individual buffers (after the counter) */
164 #define RB_BUFFER_OFF (1 << 20)
165
166 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
167
168 /**
169 * tracing_off_permanent - permanently disable ring buffers
170 *
171 * This function, once called, will disable all ring buffers
172 * permanently.
173 */
tracing_off_permanent(void)174 void tracing_off_permanent(void)
175 {
176 set_bit(RB_BUFFERS_DISABLED_BIT, &ring_buffer_flags);
177 }
178
179 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
180 #define RB_ALIGNMENT 4U
181 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
182 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
183
184 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
185 # define RB_FORCE_8BYTE_ALIGNMENT 0
186 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT
187 #else
188 # define RB_FORCE_8BYTE_ALIGNMENT 1
189 # define RB_ARCH_ALIGNMENT 8U
190 #endif
191
192 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
193
194 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
195 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
196
197 enum {
198 RB_LEN_TIME_EXTEND = 8,
199 RB_LEN_TIME_STAMP = 16,
200 };
201
202 #define skip_time_extend(event) \
203 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
204
rb_null_event(struct ring_buffer_event * event)205 static inline int rb_null_event(struct ring_buffer_event *event)
206 {
207 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
208 }
209
rb_event_set_padding(struct ring_buffer_event * event)210 static void rb_event_set_padding(struct ring_buffer_event *event)
211 {
212 /* padding has a NULL time_delta */
213 event->type_len = RINGBUF_TYPE_PADDING;
214 event->time_delta = 0;
215 }
216
217 static unsigned
rb_event_data_length(struct ring_buffer_event * event)218 rb_event_data_length(struct ring_buffer_event *event)
219 {
220 unsigned length;
221
222 if (event->type_len)
223 length = event->type_len * RB_ALIGNMENT;
224 else
225 length = event->array[0];
226 return length + RB_EVNT_HDR_SIZE;
227 }
228
229 /*
230 * Return the length of the given event. Will return
231 * the length of the time extend if the event is a
232 * time extend.
233 */
234 static inline unsigned
rb_event_length(struct ring_buffer_event * event)235 rb_event_length(struct ring_buffer_event *event)
236 {
237 switch (event->type_len) {
238 case RINGBUF_TYPE_PADDING:
239 if (rb_null_event(event))
240 /* undefined */
241 return -1;
242 return event->array[0] + RB_EVNT_HDR_SIZE;
243
244 case RINGBUF_TYPE_TIME_EXTEND:
245 return RB_LEN_TIME_EXTEND;
246
247 case RINGBUF_TYPE_TIME_STAMP:
248 return RB_LEN_TIME_STAMP;
249
250 case RINGBUF_TYPE_DATA:
251 return rb_event_data_length(event);
252 default:
253 BUG();
254 }
255 /* not hit */
256 return 0;
257 }
258
259 /*
260 * Return total length of time extend and data,
261 * or just the event length for all other events.
262 */
263 static inline unsigned
rb_event_ts_length(struct ring_buffer_event * event)264 rb_event_ts_length(struct ring_buffer_event *event)
265 {
266 unsigned len = 0;
267
268 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
269 /* time extends include the data event after it */
270 len = RB_LEN_TIME_EXTEND;
271 event = skip_time_extend(event);
272 }
273 return len + rb_event_length(event);
274 }
275
276 /**
277 * ring_buffer_event_length - return the length of the event
278 * @event: the event to get the length of
279 *
280 * Returns the size of the data load of a data event.
281 * If the event is something other than a data event, it
282 * returns the size of the event itself. With the exception
283 * of a TIME EXTEND, where it still returns the size of the
284 * data load of the data event after it.
285 */
ring_buffer_event_length(struct ring_buffer_event * event)286 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
287 {
288 unsigned length;
289
290 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
291 event = skip_time_extend(event);
292
293 length = rb_event_length(event);
294 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
295 return length;
296 length -= RB_EVNT_HDR_SIZE;
297 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
298 length -= sizeof(event->array[0]);
299 return length;
300 }
301 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
302
303 /* inline for ring buffer fast paths */
304 static void *
rb_event_data(struct ring_buffer_event * event)305 rb_event_data(struct ring_buffer_event *event)
306 {
307 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
308 event = skip_time_extend(event);
309 BUG_ON(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
310 /* If length is in len field, then array[0] has the data */
311 if (event->type_len)
312 return (void *)&event->array[0];
313 /* Otherwise length is in array[0] and array[1] has the data */
314 return (void *)&event->array[1];
315 }
316
317 /**
318 * ring_buffer_event_data - return the data of the event
319 * @event: the event to get the data from
320 */
ring_buffer_event_data(struct ring_buffer_event * event)321 void *ring_buffer_event_data(struct ring_buffer_event *event)
322 {
323 return rb_event_data(event);
324 }
325 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
326
327 #define for_each_buffer_cpu(buffer, cpu) \
328 for_each_cpu(cpu, buffer->cpumask)
329
330 #define TS_SHIFT 27
331 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
332 #define TS_DELTA_TEST (~TS_MASK)
333
334 /* Flag when events were overwritten */
335 #define RB_MISSED_EVENTS (1 << 31)
336 /* Missed count stored at end */
337 #define RB_MISSED_STORED (1 << 30)
338
339 #define RB_MISSED_FLAGS (RB_MISSED_EVENTS|RB_MISSED_STORED)
340
341 struct buffer_data_page {
342 u64 time_stamp; /* page time stamp */
343 local_t commit; /* write committed index */
344 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
345 };
346
347 /*
348 * Note, the buffer_page list must be first. The buffer pages
349 * are allocated in cache lines, which means that each buffer
350 * page will be at the beginning of a cache line, and thus
351 * the least significant bits will be zero. We use this to
352 * add flags in the list struct pointers, to make the ring buffer
353 * lockless.
354 */
355 struct buffer_page {
356 struct list_head list; /* list of buffer pages */
357 local_t write; /* index for next write */
358 unsigned read; /* index for next read */
359 local_t entries; /* entries on this page */
360 unsigned long real_end; /* real end of data */
361 struct buffer_data_page *page; /* Actual data page */
362 };
363
364 /*
365 * The buffer page counters, write and entries, must be reset
366 * atomically when crossing page boundaries. To synchronize this
367 * update, two counters are inserted into the number. One is
368 * the actual counter for the write position or count on the page.
369 *
370 * The other is a counter of updaters. Before an update happens
371 * the update partition of the counter is incremented. This will
372 * allow the updater to update the counter atomically.
373 *
374 * The counter is 20 bits, and the state data is 12.
375 */
376 #define RB_WRITE_MASK 0xfffff
377 #define RB_WRITE_INTCNT (1 << 20)
378
rb_init_page(struct buffer_data_page * bpage)379 static void rb_init_page(struct buffer_data_page *bpage)
380 {
381 local_set(&bpage->commit, 0);
382 }
383
384 /**
385 * ring_buffer_page_len - the size of data on the page.
386 * @page: The page to read
387 *
388 * Returns the amount of data on the page, including buffer page header.
389 */
ring_buffer_page_len(void * page)390 size_t ring_buffer_page_len(void *page)
391 {
392 struct buffer_data_page *bpage = page;
393
394 return (local_read(&bpage->commit) & ~RB_MISSED_FLAGS)
395 + BUF_PAGE_HDR_SIZE;
396 }
397
398 /*
399 * Also stolen from mm/slob.c. Thanks to Mathieu Desnoyers for pointing
400 * this issue out.
401 */
free_buffer_page(struct buffer_page * bpage)402 static void free_buffer_page(struct buffer_page *bpage)
403 {
404 free_page((unsigned long)bpage->page);
405 kfree(bpage);
406 }
407
408 /*
409 * We need to fit the time_stamp delta into 27 bits.
410 */
test_time_stamp(u64 delta)411 static inline int test_time_stamp(u64 delta)
412 {
413 if (delta & TS_DELTA_TEST)
414 return 1;
415 return 0;
416 }
417
418 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
419
420 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
421 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
422
ring_buffer_print_page_header(struct trace_seq * s)423 int ring_buffer_print_page_header(struct trace_seq *s)
424 {
425 struct buffer_data_page field;
426 int ret;
427
428 ret = trace_seq_printf(s, "\tfield: u64 timestamp;\t"
429 "offset:0;\tsize:%u;\tsigned:%u;\n",
430 (unsigned int)sizeof(field.time_stamp),
431 (unsigned int)is_signed_type(u64));
432
433 ret = trace_seq_printf(s, "\tfield: local_t commit;\t"
434 "offset:%u;\tsize:%u;\tsigned:%u;\n",
435 (unsigned int)offsetof(typeof(field), commit),
436 (unsigned int)sizeof(field.commit),
437 (unsigned int)is_signed_type(long));
438
439 ret = trace_seq_printf(s, "\tfield: int overwrite;\t"
440 "offset:%u;\tsize:%u;\tsigned:%u;\n",
441 (unsigned int)offsetof(typeof(field), commit),
442 1,
443 (unsigned int)is_signed_type(long));
444
445 ret = trace_seq_printf(s, "\tfield: char data;\t"
446 "offset:%u;\tsize:%u;\tsigned:%u;\n",
447 (unsigned int)offsetof(typeof(field), data),
448 (unsigned int)BUF_PAGE_SIZE,
449 (unsigned int)is_signed_type(char));
450
451 return ret;
452 }
453
454 struct rb_irq_work {
455 struct irq_work work;
456 wait_queue_head_t waiters;
457 wait_queue_head_t full_waiters;
458 bool waiters_pending;
459 bool full_waiters_pending;
460 bool wakeup_full;
461 };
462
463 /*
464 * head_page == tail_page && head == tail then buffer is empty.
465 */
466 struct ring_buffer_per_cpu {
467 int cpu;
468 atomic_t record_disabled;
469 struct ring_buffer *buffer;
470 raw_spinlock_t reader_lock; /* serialize readers */
471 arch_spinlock_t lock;
472 struct lock_class_key lock_key;
473 unsigned long nr_pages;
474 unsigned int current_context;
475 struct list_head *pages;
476 struct buffer_page *head_page; /* read from head */
477 struct buffer_page *tail_page; /* write to tail */
478 struct buffer_page *commit_page; /* committed pages */
479 struct buffer_page *reader_page;
480 unsigned long lost_events;
481 unsigned long last_overrun;
482 local_t entries_bytes;
483 local_t entries;
484 local_t overrun;
485 local_t commit_overrun;
486 local_t dropped_events;
487 local_t committing;
488 local_t commits;
489 unsigned long read;
490 unsigned long read_bytes;
491 u64 write_stamp;
492 u64 read_stamp;
493 /* ring buffer pages to update, > 0 to add, < 0 to remove */
494 long nr_pages_to_update;
495 struct list_head new_pages; /* new pages to add */
496 struct work_struct update_pages_work;
497 struct completion update_done;
498
499 struct rb_irq_work irq_work;
500 };
501
502 struct ring_buffer {
503 unsigned flags;
504 int cpus;
505 atomic_t record_disabled;
506 atomic_t resize_disabled;
507 cpumask_var_t cpumask;
508
509 struct lock_class_key *reader_lock_key;
510
511 struct mutex mutex;
512
513 struct ring_buffer_per_cpu **buffers;
514
515 #ifdef CONFIG_HOTPLUG_CPU
516 struct notifier_block cpu_notify;
517 #endif
518 u64 (*clock)(void);
519
520 struct rb_irq_work irq_work;
521 };
522
523 struct ring_buffer_iter {
524 struct ring_buffer_per_cpu *cpu_buffer;
525 unsigned long head;
526 struct buffer_page *head_page;
527 struct buffer_page *cache_reader_page;
528 unsigned long cache_read;
529 u64 read_stamp;
530 };
531
532 /*
533 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
534 *
535 * Schedules a delayed work to wake up any task that is blocked on the
536 * ring buffer waiters queue.
537 */
rb_wake_up_waiters(struct irq_work * work)538 static void rb_wake_up_waiters(struct irq_work *work)
539 {
540 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
541
542 wake_up_all(&rbwork->waiters);
543 if (rbwork->wakeup_full) {
544 rbwork->wakeup_full = false;
545 wake_up_all(&rbwork->full_waiters);
546 }
547 }
548
549 /**
550 * ring_buffer_wait - wait for input to the ring buffer
551 * @buffer: buffer to wait on
552 * @cpu: the cpu buffer to wait on
553 * @full: wait until a full page is available, if @cpu != RING_BUFFER_ALL_CPUS
554 *
555 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
556 * as data is added to any of the @buffer's cpu buffers. Otherwise
557 * it will wait for data to be added to a specific cpu buffer.
558 */
ring_buffer_wait(struct ring_buffer * buffer,int cpu,bool full)559 int ring_buffer_wait(struct ring_buffer *buffer, int cpu, bool full)
560 {
561 struct ring_buffer_per_cpu *uninitialized_var(cpu_buffer);
562 DEFINE_WAIT(wait);
563 struct rb_irq_work *work;
564 int ret = 0;
565
566 /*
567 * Depending on what the caller is waiting for, either any
568 * data in any cpu buffer, or a specific buffer, put the
569 * caller on the appropriate wait queue.
570 */
571 if (cpu == RING_BUFFER_ALL_CPUS) {
572 work = &buffer->irq_work;
573 /* Full only makes sense on per cpu reads */
574 full = false;
575 } else {
576 if (!cpumask_test_cpu(cpu, buffer->cpumask))
577 return -ENODEV;
578 cpu_buffer = buffer->buffers[cpu];
579 work = &cpu_buffer->irq_work;
580 }
581
582
583 while (true) {
584 if (full)
585 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
586 else
587 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
588
589 /*
590 * The events can happen in critical sections where
591 * checking a work queue can cause deadlocks.
592 * After adding a task to the queue, this flag is set
593 * only to notify events to try to wake up the queue
594 * using irq_work.
595 *
596 * We don't clear it even if the buffer is no longer
597 * empty. The flag only causes the next event to run
598 * irq_work to do the work queue wake up. The worse
599 * that can happen if we race with !trace_empty() is that
600 * an event will cause an irq_work to try to wake up
601 * an empty queue.
602 *
603 * There's no reason to protect this flag either, as
604 * the work queue and irq_work logic will do the necessary
605 * synchronization for the wake ups. The only thing
606 * that is necessary is that the wake up happens after
607 * a task has been queued. It's OK for spurious wake ups.
608 */
609 if (full)
610 work->full_waiters_pending = true;
611 else
612 work->waiters_pending = true;
613
614 if (signal_pending(current)) {
615 ret = -EINTR;
616 break;
617 }
618
619 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
620 break;
621
622 if (cpu != RING_BUFFER_ALL_CPUS &&
623 !ring_buffer_empty_cpu(buffer, cpu)) {
624 unsigned long flags;
625 bool pagebusy;
626
627 if (!full)
628 break;
629
630 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
631 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
632 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
633
634 if (!pagebusy)
635 break;
636 }
637
638 schedule();
639 }
640
641 if (full)
642 finish_wait(&work->full_waiters, &wait);
643 else
644 finish_wait(&work->waiters, &wait);
645
646 return ret;
647 }
648
649 /**
650 * ring_buffer_poll_wait - poll on buffer input
651 * @buffer: buffer to wait on
652 * @cpu: the cpu buffer to wait on
653 * @filp: the file descriptor
654 * @poll_table: The poll descriptor
655 *
656 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
657 * as data is added to any of the @buffer's cpu buffers. Otherwise
658 * it will wait for data to be added to a specific cpu buffer.
659 *
660 * Returns POLLIN | POLLRDNORM if data exists in the buffers,
661 * zero otherwise.
662 */
ring_buffer_poll_wait(struct ring_buffer * buffer,int cpu,struct file * filp,poll_table * poll_table)663 int ring_buffer_poll_wait(struct ring_buffer *buffer, int cpu,
664 struct file *filp, poll_table *poll_table)
665 {
666 struct ring_buffer_per_cpu *cpu_buffer;
667 struct rb_irq_work *work;
668
669 if (cpu == RING_BUFFER_ALL_CPUS)
670 work = &buffer->irq_work;
671 else {
672 if (!cpumask_test_cpu(cpu, buffer->cpumask))
673 return -EINVAL;
674
675 cpu_buffer = buffer->buffers[cpu];
676 work = &cpu_buffer->irq_work;
677 }
678
679 poll_wait(filp, &work->waiters, poll_table);
680 work->waiters_pending = true;
681 /*
682 * There's a tight race between setting the waiters_pending and
683 * checking if the ring buffer is empty. Once the waiters_pending bit
684 * is set, the next event will wake the task up, but we can get stuck
685 * if there's only a single event in.
686 *
687 * FIXME: Ideally, we need a memory barrier on the writer side as well,
688 * but adding a memory barrier to all events will cause too much of a
689 * performance hit in the fast path. We only need a memory barrier when
690 * the buffer goes from empty to having content. But as this race is
691 * extremely small, and it's not a problem if another event comes in, we
692 * will fix it later.
693 */
694 smp_mb();
695
696 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
697 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
698 return POLLIN | POLLRDNORM;
699 return 0;
700 }
701
702 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
703 #define RB_WARN_ON(b, cond) \
704 ({ \
705 int _____ret = unlikely(cond); \
706 if (_____ret) { \
707 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
708 struct ring_buffer_per_cpu *__b = \
709 (void *)b; \
710 atomic_inc(&__b->buffer->record_disabled); \
711 } else \
712 atomic_inc(&b->record_disabled); \
713 WARN_ON(1); \
714 } \
715 _____ret; \
716 })
717
718 /* Up this if you want to test the TIME_EXTENTS and normalization */
719 #define DEBUG_SHIFT 0
720
rb_time_stamp(struct ring_buffer * buffer)721 static inline u64 rb_time_stamp(struct ring_buffer *buffer)
722 {
723 /* shift to debug/test normalization and TIME_EXTENTS */
724 return buffer->clock() << DEBUG_SHIFT;
725 }
726
ring_buffer_time_stamp(struct ring_buffer * buffer,int cpu)727 u64 ring_buffer_time_stamp(struct ring_buffer *buffer, int cpu)
728 {
729 u64 time;
730
731 preempt_disable_notrace();
732 time = rb_time_stamp(buffer);
733 preempt_enable_no_resched_notrace();
734
735 return time;
736 }
737 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
738
ring_buffer_normalize_time_stamp(struct ring_buffer * buffer,int cpu,u64 * ts)739 void ring_buffer_normalize_time_stamp(struct ring_buffer *buffer,
740 int cpu, u64 *ts)
741 {
742 /* Just stupid testing the normalize function and deltas */
743 *ts >>= DEBUG_SHIFT;
744 }
745 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
746
747 /*
748 * Making the ring buffer lockless makes things tricky.
749 * Although writes only happen on the CPU that they are on,
750 * and they only need to worry about interrupts. Reads can
751 * happen on any CPU.
752 *
753 * The reader page is always off the ring buffer, but when the
754 * reader finishes with a page, it needs to swap its page with
755 * a new one from the buffer. The reader needs to take from
756 * the head (writes go to the tail). But if a writer is in overwrite
757 * mode and wraps, it must push the head page forward.
758 *
759 * Here lies the problem.
760 *
761 * The reader must be careful to replace only the head page, and
762 * not another one. As described at the top of the file in the
763 * ASCII art, the reader sets its old page to point to the next
764 * page after head. It then sets the page after head to point to
765 * the old reader page. But if the writer moves the head page
766 * during this operation, the reader could end up with the tail.
767 *
768 * We use cmpxchg to help prevent this race. We also do something
769 * special with the page before head. We set the LSB to 1.
770 *
771 * When the writer must push the page forward, it will clear the
772 * bit that points to the head page, move the head, and then set
773 * the bit that points to the new head page.
774 *
775 * We also don't want an interrupt coming in and moving the head
776 * page on another writer. Thus we use the second LSB to catch
777 * that too. Thus:
778 *
779 * head->list->prev->next bit 1 bit 0
780 * ------- -------
781 * Normal page 0 0
782 * Points to head page 0 1
783 * New head page 1 0
784 *
785 * Note we can not trust the prev pointer of the head page, because:
786 *
787 * +----+ +-----+ +-----+
788 * | |------>| T |---X--->| N |
789 * | |<------| | | |
790 * +----+ +-----+ +-----+
791 * ^ ^ |
792 * | +-----+ | |
793 * +----------| R |----------+ |
794 * | |<-----------+
795 * +-----+
796 *
797 * Key: ---X--> HEAD flag set in pointer
798 * T Tail page
799 * R Reader page
800 * N Next page
801 *
802 * (see __rb_reserve_next() to see where this happens)
803 *
804 * What the above shows is that the reader just swapped out
805 * the reader page with a page in the buffer, but before it
806 * could make the new header point back to the new page added
807 * it was preempted by a writer. The writer moved forward onto
808 * the new page added by the reader and is about to move forward
809 * again.
810 *
811 * You can see, it is legitimate for the previous pointer of
812 * the head (or any page) not to point back to itself. But only
813 * temporarially.
814 */
815
816 #define RB_PAGE_NORMAL 0UL
817 #define RB_PAGE_HEAD 1UL
818 #define RB_PAGE_UPDATE 2UL
819
820
821 #define RB_FLAG_MASK 3UL
822
823 /* PAGE_MOVED is not part of the mask */
824 #define RB_PAGE_MOVED 4UL
825
826 /*
827 * rb_list_head - remove any bit
828 */
rb_list_head(struct list_head * list)829 static struct list_head *rb_list_head(struct list_head *list)
830 {
831 unsigned long val = (unsigned long)list;
832
833 return (struct list_head *)(val & ~RB_FLAG_MASK);
834 }
835
836 /*
837 * rb_is_head_page - test if the given page is the head page
838 *
839 * Because the reader may move the head_page pointer, we can
840 * not trust what the head page is (it may be pointing to
841 * the reader page). But if the next page is a header page,
842 * its flags will be non zero.
843 */
844 static inline int
rb_is_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * page,struct list_head * list)845 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
846 struct buffer_page *page, struct list_head *list)
847 {
848 unsigned long val;
849
850 val = (unsigned long)list->next;
851
852 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
853 return RB_PAGE_MOVED;
854
855 return val & RB_FLAG_MASK;
856 }
857
858 /*
859 * rb_is_reader_page
860 *
861 * The unique thing about the reader page, is that, if the
862 * writer is ever on it, the previous pointer never points
863 * back to the reader page.
864 */
rb_is_reader_page(struct buffer_page * page)865 static int rb_is_reader_page(struct buffer_page *page)
866 {
867 struct list_head *list = page->list.prev;
868
869 return rb_list_head(list->next) != &page->list;
870 }
871
872 /*
873 * rb_set_list_to_head - set a list_head to be pointing to head.
874 */
rb_set_list_to_head(struct ring_buffer_per_cpu * cpu_buffer,struct list_head * list)875 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
876 struct list_head *list)
877 {
878 unsigned long *ptr;
879
880 ptr = (unsigned long *)&list->next;
881 *ptr |= RB_PAGE_HEAD;
882 *ptr &= ~RB_PAGE_UPDATE;
883 }
884
885 /*
886 * rb_head_page_activate - sets up head page
887 */
rb_head_page_activate(struct ring_buffer_per_cpu * cpu_buffer)888 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
889 {
890 struct buffer_page *head;
891
892 head = cpu_buffer->head_page;
893 if (!head)
894 return;
895
896 /*
897 * Set the previous list pointer to have the HEAD flag.
898 */
899 rb_set_list_to_head(cpu_buffer, head->list.prev);
900 }
901
rb_list_head_clear(struct list_head * list)902 static void rb_list_head_clear(struct list_head *list)
903 {
904 unsigned long *ptr = (unsigned long *)&list->next;
905
906 *ptr &= ~RB_FLAG_MASK;
907 }
908
909 /*
910 * rb_head_page_dactivate - clears head page ptr (for free list)
911 */
912 static void
rb_head_page_deactivate(struct ring_buffer_per_cpu * cpu_buffer)913 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
914 {
915 struct list_head *hd;
916
917 /* Go through the whole list and clear any pointers found. */
918 rb_list_head_clear(cpu_buffer->pages);
919
920 list_for_each(hd, cpu_buffer->pages)
921 rb_list_head_clear(hd);
922 }
923
rb_head_page_set(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag,int new_flag)924 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
925 struct buffer_page *head,
926 struct buffer_page *prev,
927 int old_flag, int new_flag)
928 {
929 struct list_head *list;
930 unsigned long val = (unsigned long)&head->list;
931 unsigned long ret;
932
933 list = &prev->list;
934
935 val &= ~RB_FLAG_MASK;
936
937 ret = cmpxchg((unsigned long *)&list->next,
938 val | old_flag, val | new_flag);
939
940 /* check if the reader took the page */
941 if ((ret & ~RB_FLAG_MASK) != val)
942 return RB_PAGE_MOVED;
943
944 return ret & RB_FLAG_MASK;
945 }
946
rb_head_page_set_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)947 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
948 struct buffer_page *head,
949 struct buffer_page *prev,
950 int old_flag)
951 {
952 return rb_head_page_set(cpu_buffer, head, prev,
953 old_flag, RB_PAGE_UPDATE);
954 }
955
rb_head_page_set_head(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)956 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
957 struct buffer_page *head,
958 struct buffer_page *prev,
959 int old_flag)
960 {
961 return rb_head_page_set(cpu_buffer, head, prev,
962 old_flag, RB_PAGE_HEAD);
963 }
964
rb_head_page_set_normal(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)965 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
966 struct buffer_page *head,
967 struct buffer_page *prev,
968 int old_flag)
969 {
970 return rb_head_page_set(cpu_buffer, head, prev,
971 old_flag, RB_PAGE_NORMAL);
972 }
973
rb_inc_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page ** bpage)974 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
975 struct buffer_page **bpage)
976 {
977 struct list_head *p = rb_list_head((*bpage)->list.next);
978
979 *bpage = list_entry(p, struct buffer_page, list);
980 }
981
982 static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu * cpu_buffer)983 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
984 {
985 struct buffer_page *head;
986 struct buffer_page *page;
987 struct list_head *list;
988 int i;
989
990 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
991 return NULL;
992
993 /* sanity check */
994 list = cpu_buffer->pages;
995 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
996 return NULL;
997
998 page = head = cpu_buffer->head_page;
999 /*
1000 * It is possible that the writer moves the header behind
1001 * where we started, and we miss in one loop.
1002 * A second loop should grab the header, but we'll do
1003 * three loops just because I'm paranoid.
1004 */
1005 for (i = 0; i < 3; i++) {
1006 do {
1007 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1008 cpu_buffer->head_page = page;
1009 return page;
1010 }
1011 rb_inc_page(cpu_buffer, &page);
1012 } while (page != head);
1013 }
1014
1015 RB_WARN_ON(cpu_buffer, 1);
1016
1017 return NULL;
1018 }
1019
rb_head_page_replace(struct buffer_page * old,struct buffer_page * new)1020 static int rb_head_page_replace(struct buffer_page *old,
1021 struct buffer_page *new)
1022 {
1023 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1024 unsigned long val;
1025 unsigned long ret;
1026
1027 val = *ptr & ~RB_FLAG_MASK;
1028 val |= RB_PAGE_HEAD;
1029
1030 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1031
1032 return ret == val;
1033 }
1034
1035 /*
1036 * rb_tail_page_update - move the tail page forward
1037 *
1038 * Returns 1 if moved tail page, 0 if someone else did.
1039 */
rb_tail_page_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)1040 static int rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1041 struct buffer_page *tail_page,
1042 struct buffer_page *next_page)
1043 {
1044 struct buffer_page *old_tail;
1045 unsigned long old_entries;
1046 unsigned long old_write;
1047 int ret = 0;
1048
1049 /*
1050 * The tail page now needs to be moved forward.
1051 *
1052 * We need to reset the tail page, but without messing
1053 * with possible erasing of data brought in by interrupts
1054 * that have moved the tail page and are currently on it.
1055 *
1056 * We add a counter to the write field to denote this.
1057 */
1058 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1059 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1060
1061 /*
1062 * Just make sure we have seen our old_write and synchronize
1063 * with any interrupts that come in.
1064 */
1065 barrier();
1066
1067 /*
1068 * If the tail page is still the same as what we think
1069 * it is, then it is up to us to update the tail
1070 * pointer.
1071 */
1072 if (tail_page == cpu_buffer->tail_page) {
1073 /* Zero the write counter */
1074 unsigned long val = old_write & ~RB_WRITE_MASK;
1075 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1076
1077 /*
1078 * This will only succeed if an interrupt did
1079 * not come in and change it. In which case, we
1080 * do not want to modify it.
1081 *
1082 * We add (void) to let the compiler know that we do not care
1083 * about the return value of these functions. We use the
1084 * cmpxchg to only update if an interrupt did not already
1085 * do it for us. If the cmpxchg fails, we don't care.
1086 */
1087 (void)local_cmpxchg(&next_page->write, old_write, val);
1088 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1089
1090 /*
1091 * No need to worry about races with clearing out the commit.
1092 * it only can increment when a commit takes place. But that
1093 * only happens in the outer most nested commit.
1094 */
1095 local_set(&next_page->page->commit, 0);
1096
1097 old_tail = cmpxchg(&cpu_buffer->tail_page,
1098 tail_page, next_page);
1099
1100 if (old_tail == tail_page)
1101 ret = 1;
1102 }
1103
1104 return ret;
1105 }
1106
rb_check_bpage(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * bpage)1107 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1108 struct buffer_page *bpage)
1109 {
1110 unsigned long val = (unsigned long)bpage;
1111
1112 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1113 return 1;
1114
1115 return 0;
1116 }
1117
1118 /**
1119 * rb_check_list - make sure a pointer to a list has the last bits zero
1120 */
rb_check_list(struct ring_buffer_per_cpu * cpu_buffer,struct list_head * list)1121 static int rb_check_list(struct ring_buffer_per_cpu *cpu_buffer,
1122 struct list_head *list)
1123 {
1124 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev) != list->prev))
1125 return 1;
1126 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->next) != list->next))
1127 return 1;
1128 return 0;
1129 }
1130
1131 /**
1132 * rb_check_pages - integrity check of buffer pages
1133 * @cpu_buffer: CPU buffer with pages to test
1134 *
1135 * As a safety measure we check to make sure the data pages have not
1136 * been corrupted.
1137 */
rb_check_pages(struct ring_buffer_per_cpu * cpu_buffer)1138 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1139 {
1140 struct list_head *head = cpu_buffer->pages;
1141 struct buffer_page *bpage, *tmp;
1142
1143 /* Reset the head page if it exists */
1144 if (cpu_buffer->head_page)
1145 rb_set_head_page(cpu_buffer);
1146
1147 rb_head_page_deactivate(cpu_buffer);
1148
1149 if (RB_WARN_ON(cpu_buffer, head->next->prev != head))
1150 return -1;
1151 if (RB_WARN_ON(cpu_buffer, head->prev->next != head))
1152 return -1;
1153
1154 if (rb_check_list(cpu_buffer, head))
1155 return -1;
1156
1157 list_for_each_entry_safe(bpage, tmp, head, list) {
1158 if (RB_WARN_ON(cpu_buffer,
1159 bpage->list.next->prev != &bpage->list))
1160 return -1;
1161 if (RB_WARN_ON(cpu_buffer,
1162 bpage->list.prev->next != &bpage->list))
1163 return -1;
1164 if (rb_check_list(cpu_buffer, &bpage->list))
1165 return -1;
1166 }
1167
1168 rb_head_page_activate(cpu_buffer);
1169
1170 return 0;
1171 }
1172
__rb_allocate_pages(long nr_pages,struct list_head * pages,int cpu)1173 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1174 {
1175 struct buffer_page *bpage, *tmp;
1176 long i;
1177
1178 for (i = 0; i < nr_pages; i++) {
1179 struct page *page;
1180 /*
1181 * __GFP_NORETRY flag makes sure that the allocation fails
1182 * gracefully without invoking oom-killer and the system is
1183 * not destabilized.
1184 */
1185 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1186 GFP_KERNEL | __GFP_NORETRY,
1187 cpu_to_node(cpu));
1188 if (!bpage)
1189 goto free_pages;
1190
1191 list_add(&bpage->list, pages);
1192
1193 page = alloc_pages_node(cpu_to_node(cpu),
1194 GFP_KERNEL | __GFP_NORETRY, 0);
1195 if (!page)
1196 goto free_pages;
1197 bpage->page = page_address(page);
1198 rb_init_page(bpage->page);
1199 }
1200
1201 return 0;
1202
1203 free_pages:
1204 list_for_each_entry_safe(bpage, tmp, pages, list) {
1205 list_del_init(&bpage->list);
1206 free_buffer_page(bpage);
1207 }
1208
1209 return -ENOMEM;
1210 }
1211
rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1212 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1213 unsigned long nr_pages)
1214 {
1215 LIST_HEAD(pages);
1216
1217 WARN_ON(!nr_pages);
1218
1219 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1220 return -ENOMEM;
1221
1222 /*
1223 * The ring buffer page list is a circular list that does not
1224 * start and end with a list head. All page list items point to
1225 * other pages.
1226 */
1227 cpu_buffer->pages = pages.next;
1228 list_del(&pages);
1229
1230 cpu_buffer->nr_pages = nr_pages;
1231
1232 rb_check_pages(cpu_buffer);
1233
1234 return 0;
1235 }
1236
1237 static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct ring_buffer * buffer,long nr_pages,int cpu)1238 rb_allocate_cpu_buffer(struct ring_buffer *buffer, long nr_pages, int cpu)
1239 {
1240 struct ring_buffer_per_cpu *cpu_buffer;
1241 struct buffer_page *bpage;
1242 struct page *page;
1243 int ret;
1244
1245 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1246 GFP_KERNEL, cpu_to_node(cpu));
1247 if (!cpu_buffer)
1248 return NULL;
1249
1250 cpu_buffer->cpu = cpu;
1251 cpu_buffer->buffer = buffer;
1252 raw_spin_lock_init(&cpu_buffer->reader_lock);
1253 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1254 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1255 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1256 init_completion(&cpu_buffer->update_done);
1257 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1258 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1259 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1260
1261 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1262 GFP_KERNEL, cpu_to_node(cpu));
1263 if (!bpage)
1264 goto fail_free_buffer;
1265
1266 rb_check_bpage(cpu_buffer, bpage);
1267
1268 cpu_buffer->reader_page = bpage;
1269 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1270 if (!page)
1271 goto fail_free_reader;
1272 bpage->page = page_address(page);
1273 rb_init_page(bpage->page);
1274
1275 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1276 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1277
1278 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1279 if (ret < 0)
1280 goto fail_free_reader;
1281
1282 cpu_buffer->head_page
1283 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1284 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1285
1286 rb_head_page_activate(cpu_buffer);
1287
1288 return cpu_buffer;
1289
1290 fail_free_reader:
1291 free_buffer_page(cpu_buffer->reader_page);
1292
1293 fail_free_buffer:
1294 kfree(cpu_buffer);
1295 return NULL;
1296 }
1297
rb_free_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)1298 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1299 {
1300 struct list_head *head = cpu_buffer->pages;
1301 struct buffer_page *bpage, *tmp;
1302
1303 free_buffer_page(cpu_buffer->reader_page);
1304
1305 rb_head_page_deactivate(cpu_buffer);
1306
1307 if (head) {
1308 list_for_each_entry_safe(bpage, tmp, head, list) {
1309 list_del_init(&bpage->list);
1310 free_buffer_page(bpage);
1311 }
1312 bpage = list_entry(head, struct buffer_page, list);
1313 free_buffer_page(bpage);
1314 }
1315
1316 kfree(cpu_buffer);
1317 }
1318
1319 #ifdef CONFIG_HOTPLUG_CPU
1320 static int rb_cpu_notify(struct notifier_block *self,
1321 unsigned long action, void *hcpu);
1322 #endif
1323
1324 /**
1325 * __ring_buffer_alloc - allocate a new ring_buffer
1326 * @size: the size in bytes per cpu that is needed.
1327 * @flags: attributes to set for the ring buffer.
1328 *
1329 * Currently the only flag that is available is the RB_FL_OVERWRITE
1330 * flag. This flag means that the buffer will overwrite old data
1331 * when the buffer wraps. If this flag is not set, the buffer will
1332 * drop data when the tail hits the head.
1333 */
__ring_buffer_alloc(unsigned long size,unsigned flags,struct lock_class_key * key)1334 struct ring_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1335 struct lock_class_key *key)
1336 {
1337 struct ring_buffer *buffer;
1338 long nr_pages;
1339 int bsize;
1340 int cpu;
1341
1342 /* keep it in its own cache line */
1343 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1344 GFP_KERNEL);
1345 if (!buffer)
1346 return NULL;
1347
1348 if (!alloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1349 goto fail_free_buffer;
1350
1351 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1352 buffer->flags = flags;
1353 buffer->clock = trace_clock_local;
1354 buffer->reader_lock_key = key;
1355
1356 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1357 init_waitqueue_head(&buffer->irq_work.waiters);
1358
1359 /* need at least two pages */
1360 if (nr_pages < 2)
1361 nr_pages = 2;
1362
1363 /*
1364 * In case of non-hotplug cpu, if the ring-buffer is allocated
1365 * in early initcall, it will not be notified of secondary cpus.
1366 * In that off case, we need to allocate for all possible cpus.
1367 */
1368 #ifdef CONFIG_HOTPLUG_CPU
1369 cpu_notifier_register_begin();
1370 cpumask_copy(buffer->cpumask, cpu_online_mask);
1371 #else
1372 cpumask_copy(buffer->cpumask, cpu_possible_mask);
1373 #endif
1374 buffer->cpus = nr_cpu_ids;
1375
1376 bsize = sizeof(void *) * nr_cpu_ids;
1377 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1378 GFP_KERNEL);
1379 if (!buffer->buffers)
1380 goto fail_free_cpumask;
1381
1382 for_each_buffer_cpu(buffer, cpu) {
1383 buffer->buffers[cpu] =
1384 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1385 if (!buffer->buffers[cpu])
1386 goto fail_free_buffers;
1387 }
1388
1389 #ifdef CONFIG_HOTPLUG_CPU
1390 buffer->cpu_notify.notifier_call = rb_cpu_notify;
1391 buffer->cpu_notify.priority = 0;
1392 __register_cpu_notifier(&buffer->cpu_notify);
1393 cpu_notifier_register_done();
1394 #endif
1395
1396 mutex_init(&buffer->mutex);
1397
1398 return buffer;
1399
1400 fail_free_buffers:
1401 for_each_buffer_cpu(buffer, cpu) {
1402 if (buffer->buffers[cpu])
1403 rb_free_cpu_buffer(buffer->buffers[cpu]);
1404 }
1405 kfree(buffer->buffers);
1406
1407 fail_free_cpumask:
1408 free_cpumask_var(buffer->cpumask);
1409 #ifdef CONFIG_HOTPLUG_CPU
1410 cpu_notifier_register_done();
1411 #endif
1412
1413 fail_free_buffer:
1414 kfree(buffer);
1415 return NULL;
1416 }
1417 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1418
1419 /**
1420 * ring_buffer_free - free a ring buffer.
1421 * @buffer: the buffer to free.
1422 */
1423 void
ring_buffer_free(struct ring_buffer * buffer)1424 ring_buffer_free(struct ring_buffer *buffer)
1425 {
1426 int cpu;
1427
1428 #ifdef CONFIG_HOTPLUG_CPU
1429 cpu_notifier_register_begin();
1430 __unregister_cpu_notifier(&buffer->cpu_notify);
1431 #endif
1432
1433 for_each_buffer_cpu(buffer, cpu)
1434 rb_free_cpu_buffer(buffer->buffers[cpu]);
1435
1436 #ifdef CONFIG_HOTPLUG_CPU
1437 cpu_notifier_register_done();
1438 #endif
1439
1440 kfree(buffer->buffers);
1441 free_cpumask_var(buffer->cpumask);
1442
1443 kfree(buffer);
1444 }
1445 EXPORT_SYMBOL_GPL(ring_buffer_free);
1446
ring_buffer_set_clock(struct ring_buffer * buffer,u64 (* clock)(void))1447 void ring_buffer_set_clock(struct ring_buffer *buffer,
1448 u64 (*clock)(void))
1449 {
1450 buffer->clock = clock;
1451 }
1452
1453 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1454
rb_page_entries(struct buffer_page * bpage)1455 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1456 {
1457 return local_read(&bpage->entries) & RB_WRITE_MASK;
1458 }
1459
rb_page_write(struct buffer_page * bpage)1460 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1461 {
1462 return local_read(&bpage->write) & RB_WRITE_MASK;
1463 }
1464
1465 static int
rb_remove_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1466 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1467 {
1468 struct list_head *tail_page, *to_remove, *next_page;
1469 struct buffer_page *to_remove_page, *tmp_iter_page;
1470 struct buffer_page *last_page, *first_page;
1471 unsigned long nr_removed;
1472 unsigned long head_bit;
1473 int page_entries;
1474
1475 head_bit = 0;
1476
1477 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1478 atomic_inc(&cpu_buffer->record_disabled);
1479 /*
1480 * We don't race with the readers since we have acquired the reader
1481 * lock. We also don't race with writers after disabling recording.
1482 * This makes it easy to figure out the first and the last page to be
1483 * removed from the list. We unlink all the pages in between including
1484 * the first and last pages. This is done in a busy loop so that we
1485 * lose the least number of traces.
1486 * The pages are freed after we restart recording and unlock readers.
1487 */
1488 tail_page = &cpu_buffer->tail_page->list;
1489
1490 /*
1491 * tail page might be on reader page, we remove the next page
1492 * from the ring buffer
1493 */
1494 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1495 tail_page = rb_list_head(tail_page->next);
1496 to_remove = tail_page;
1497
1498 /* start of pages to remove */
1499 first_page = list_entry(rb_list_head(to_remove->next),
1500 struct buffer_page, list);
1501
1502 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1503 to_remove = rb_list_head(to_remove)->next;
1504 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1505 }
1506
1507 next_page = rb_list_head(to_remove)->next;
1508
1509 /*
1510 * Now we remove all pages between tail_page and next_page.
1511 * Make sure that we have head_bit value preserved for the
1512 * next page
1513 */
1514 tail_page->next = (struct list_head *)((unsigned long)next_page |
1515 head_bit);
1516 next_page = rb_list_head(next_page);
1517 next_page->prev = tail_page;
1518
1519 /* make sure pages points to a valid page in the ring buffer */
1520 cpu_buffer->pages = next_page;
1521
1522 /* update head page */
1523 if (head_bit)
1524 cpu_buffer->head_page = list_entry(next_page,
1525 struct buffer_page, list);
1526
1527 /*
1528 * change read pointer to make sure any read iterators reset
1529 * themselves
1530 */
1531 cpu_buffer->read = 0;
1532
1533 /* pages are removed, resume tracing and then free the pages */
1534 atomic_dec(&cpu_buffer->record_disabled);
1535 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1536
1537 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1538
1539 /* last buffer page to remove */
1540 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1541 list);
1542 tmp_iter_page = first_page;
1543
1544 do {
1545 to_remove_page = tmp_iter_page;
1546 rb_inc_page(cpu_buffer, &tmp_iter_page);
1547
1548 /* update the counters */
1549 page_entries = rb_page_entries(to_remove_page);
1550 if (page_entries) {
1551 /*
1552 * If something was added to this page, it was full
1553 * since it is not the tail page. So we deduct the
1554 * bytes consumed in ring buffer from here.
1555 * Increment overrun to account for the lost events.
1556 */
1557 local_add(page_entries, &cpu_buffer->overrun);
1558 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
1559 }
1560
1561 /*
1562 * We have already removed references to this list item, just
1563 * free up the buffer_page and its page
1564 */
1565 free_buffer_page(to_remove_page);
1566 nr_removed--;
1567
1568 } while (to_remove_page != last_page);
1569
1570 RB_WARN_ON(cpu_buffer, nr_removed);
1571
1572 return nr_removed == 0;
1573 }
1574
1575 static int
rb_insert_pages(struct ring_buffer_per_cpu * cpu_buffer)1576 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1577 {
1578 struct list_head *pages = &cpu_buffer->new_pages;
1579 int retries, success;
1580
1581 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1582 /*
1583 * We are holding the reader lock, so the reader page won't be swapped
1584 * in the ring buffer. Now we are racing with the writer trying to
1585 * move head page and the tail page.
1586 * We are going to adapt the reader page update process where:
1587 * 1. We first splice the start and end of list of new pages between
1588 * the head page and its previous page.
1589 * 2. We cmpxchg the prev_page->next to point from head page to the
1590 * start of new pages list.
1591 * 3. Finally, we update the head->prev to the end of new list.
1592 *
1593 * We will try this process 10 times, to make sure that we don't keep
1594 * spinning.
1595 */
1596 retries = 10;
1597 success = 0;
1598 while (retries--) {
1599 struct list_head *head_page, *prev_page, *r;
1600 struct list_head *last_page, *first_page;
1601 struct list_head *head_page_with_bit;
1602
1603 head_page = &rb_set_head_page(cpu_buffer)->list;
1604 if (!head_page)
1605 break;
1606 prev_page = head_page->prev;
1607
1608 first_page = pages->next;
1609 last_page = pages->prev;
1610
1611 head_page_with_bit = (struct list_head *)
1612 ((unsigned long)head_page | RB_PAGE_HEAD);
1613
1614 last_page->next = head_page_with_bit;
1615 first_page->prev = prev_page;
1616
1617 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1618
1619 if (r == head_page_with_bit) {
1620 /*
1621 * yay, we replaced the page pointer to our new list,
1622 * now, we just have to update to head page's prev
1623 * pointer to point to end of list
1624 */
1625 head_page->prev = last_page;
1626 success = 1;
1627 break;
1628 }
1629 }
1630
1631 if (success)
1632 INIT_LIST_HEAD(pages);
1633 /*
1634 * If we weren't successful in adding in new pages, warn and stop
1635 * tracing
1636 */
1637 RB_WARN_ON(cpu_buffer, !success);
1638 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1639
1640 /* free pages if they weren't inserted */
1641 if (!success) {
1642 struct buffer_page *bpage, *tmp;
1643 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1644 list) {
1645 list_del_init(&bpage->list);
1646 free_buffer_page(bpage);
1647 }
1648 }
1649 return success;
1650 }
1651
rb_update_pages(struct ring_buffer_per_cpu * cpu_buffer)1652 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
1653 {
1654 int success;
1655
1656 if (cpu_buffer->nr_pages_to_update > 0)
1657 success = rb_insert_pages(cpu_buffer);
1658 else
1659 success = rb_remove_pages(cpu_buffer,
1660 -cpu_buffer->nr_pages_to_update);
1661
1662 if (success)
1663 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
1664 }
1665
update_pages_handler(struct work_struct * work)1666 static void update_pages_handler(struct work_struct *work)
1667 {
1668 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
1669 struct ring_buffer_per_cpu, update_pages_work);
1670 rb_update_pages(cpu_buffer);
1671 complete(&cpu_buffer->update_done);
1672 }
1673
1674 /**
1675 * ring_buffer_resize - resize the ring buffer
1676 * @buffer: the buffer to resize.
1677 * @size: the new size.
1678 * @cpu_id: the cpu buffer to resize
1679 *
1680 * Minimum size is 2 * BUF_PAGE_SIZE.
1681 *
1682 * Returns 0 on success and < 0 on failure.
1683 */
ring_buffer_resize(struct ring_buffer * buffer,unsigned long size,int cpu_id)1684 int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
1685 int cpu_id)
1686 {
1687 struct ring_buffer_per_cpu *cpu_buffer;
1688 unsigned long nr_pages;
1689 int cpu, err = 0;
1690
1691 /*
1692 * Always succeed at resizing a non-existent buffer:
1693 */
1694 if (!buffer)
1695 return size;
1696
1697 /* Make sure the requested buffer exists */
1698 if (cpu_id != RING_BUFFER_ALL_CPUS &&
1699 !cpumask_test_cpu(cpu_id, buffer->cpumask))
1700 return size;
1701
1702 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1703
1704 /* we need a minimum of two pages */
1705 if (nr_pages < 2)
1706 nr_pages = 2;
1707
1708 size = nr_pages * BUF_PAGE_SIZE;
1709
1710 /*
1711 * Don't succeed if resizing is disabled, as a reader might be
1712 * manipulating the ring buffer and is expecting a sane state while
1713 * this is true.
1714 */
1715 if (atomic_read(&buffer->resize_disabled))
1716 return -EBUSY;
1717
1718 /* prevent another thread from changing buffer sizes */
1719 mutex_lock(&buffer->mutex);
1720
1721 if (cpu_id == RING_BUFFER_ALL_CPUS) {
1722 /* calculate the pages to update */
1723 for_each_buffer_cpu(buffer, cpu) {
1724 cpu_buffer = buffer->buffers[cpu];
1725
1726 cpu_buffer->nr_pages_to_update = nr_pages -
1727 cpu_buffer->nr_pages;
1728 /*
1729 * nothing more to do for removing pages or no update
1730 */
1731 if (cpu_buffer->nr_pages_to_update <= 0)
1732 continue;
1733 /*
1734 * to add pages, make sure all new pages can be
1735 * allocated without receiving ENOMEM
1736 */
1737 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1738 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1739 &cpu_buffer->new_pages, cpu)) {
1740 /* not enough memory for new pages */
1741 err = -ENOMEM;
1742 goto out_err;
1743 }
1744 }
1745
1746 get_online_cpus();
1747 /*
1748 * Fire off all the required work handlers
1749 * We can't schedule on offline CPUs, but it's not necessary
1750 * since we can change their buffer sizes without any race.
1751 */
1752 for_each_buffer_cpu(buffer, cpu) {
1753 cpu_buffer = buffer->buffers[cpu];
1754 if (!cpu_buffer->nr_pages_to_update)
1755 continue;
1756
1757 /* Can't run something on an offline CPU. */
1758 if (!cpu_online(cpu)) {
1759 rb_update_pages(cpu_buffer);
1760 cpu_buffer->nr_pages_to_update = 0;
1761 } else {
1762 schedule_work_on(cpu,
1763 &cpu_buffer->update_pages_work);
1764 }
1765 }
1766
1767 /* wait for all the updates to complete */
1768 for_each_buffer_cpu(buffer, cpu) {
1769 cpu_buffer = buffer->buffers[cpu];
1770 if (!cpu_buffer->nr_pages_to_update)
1771 continue;
1772
1773 if (cpu_online(cpu))
1774 wait_for_completion(&cpu_buffer->update_done);
1775 cpu_buffer->nr_pages_to_update = 0;
1776 }
1777
1778 put_online_cpus();
1779 } else {
1780 /* Make sure this CPU has been intitialized */
1781 if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
1782 goto out;
1783
1784 cpu_buffer = buffer->buffers[cpu_id];
1785
1786 if (nr_pages == cpu_buffer->nr_pages)
1787 goto out;
1788
1789 cpu_buffer->nr_pages_to_update = nr_pages -
1790 cpu_buffer->nr_pages;
1791
1792 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1793 if (cpu_buffer->nr_pages_to_update > 0 &&
1794 __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
1795 &cpu_buffer->new_pages, cpu_id)) {
1796 err = -ENOMEM;
1797 goto out_err;
1798 }
1799
1800 get_online_cpus();
1801
1802 /* Can't run something on an offline CPU. */
1803 if (!cpu_online(cpu_id))
1804 rb_update_pages(cpu_buffer);
1805 else {
1806 schedule_work_on(cpu_id,
1807 &cpu_buffer->update_pages_work);
1808 wait_for_completion(&cpu_buffer->update_done);
1809 }
1810
1811 cpu_buffer->nr_pages_to_update = 0;
1812 put_online_cpus();
1813 }
1814
1815 out:
1816 /*
1817 * The ring buffer resize can happen with the ring buffer
1818 * enabled, so that the update disturbs the tracing as little
1819 * as possible. But if the buffer is disabled, we do not need
1820 * to worry about that, and we can take the time to verify
1821 * that the buffer is not corrupt.
1822 */
1823 if (atomic_read(&buffer->record_disabled)) {
1824 atomic_inc(&buffer->record_disabled);
1825 /*
1826 * Even though the buffer was disabled, we must make sure
1827 * that it is truly disabled before calling rb_check_pages.
1828 * There could have been a race between checking
1829 * record_disable and incrementing it.
1830 */
1831 synchronize_sched();
1832 for_each_buffer_cpu(buffer, cpu) {
1833 cpu_buffer = buffer->buffers[cpu];
1834 rb_check_pages(cpu_buffer);
1835 }
1836 atomic_dec(&buffer->record_disabled);
1837 }
1838
1839 mutex_unlock(&buffer->mutex);
1840 return size;
1841
1842 out_err:
1843 for_each_buffer_cpu(buffer, cpu) {
1844 struct buffer_page *bpage, *tmp;
1845
1846 cpu_buffer = buffer->buffers[cpu];
1847 cpu_buffer->nr_pages_to_update = 0;
1848
1849 if (list_empty(&cpu_buffer->new_pages))
1850 continue;
1851
1852 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1853 list) {
1854 list_del_init(&bpage->list);
1855 free_buffer_page(bpage);
1856 }
1857 }
1858 mutex_unlock(&buffer->mutex);
1859 return err;
1860 }
1861 EXPORT_SYMBOL_GPL(ring_buffer_resize);
1862
ring_buffer_change_overwrite(struct ring_buffer * buffer,int val)1863 void ring_buffer_change_overwrite(struct ring_buffer *buffer, int val)
1864 {
1865 mutex_lock(&buffer->mutex);
1866 if (val)
1867 buffer->flags |= RB_FL_OVERWRITE;
1868 else
1869 buffer->flags &= ~RB_FL_OVERWRITE;
1870 mutex_unlock(&buffer->mutex);
1871 }
1872 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
1873
1874 static inline void *
__rb_data_page_index(struct buffer_data_page * bpage,unsigned index)1875 __rb_data_page_index(struct buffer_data_page *bpage, unsigned index)
1876 {
1877 return bpage->data + index;
1878 }
1879
__rb_page_index(struct buffer_page * bpage,unsigned index)1880 static inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
1881 {
1882 return bpage->page->data + index;
1883 }
1884
1885 static inline struct ring_buffer_event *
rb_reader_event(struct ring_buffer_per_cpu * cpu_buffer)1886 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
1887 {
1888 return __rb_page_index(cpu_buffer->reader_page,
1889 cpu_buffer->reader_page->read);
1890 }
1891
1892 static inline struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter * iter)1893 rb_iter_head_event(struct ring_buffer_iter *iter)
1894 {
1895 return __rb_page_index(iter->head_page, iter->head);
1896 }
1897
rb_page_commit(struct buffer_page * bpage)1898 static inline unsigned rb_page_commit(struct buffer_page *bpage)
1899 {
1900 return local_read(&bpage->page->commit);
1901 }
1902
1903 /* Size is determined by what has been committed */
rb_page_size(struct buffer_page * bpage)1904 static inline unsigned rb_page_size(struct buffer_page *bpage)
1905 {
1906 return rb_page_commit(bpage);
1907 }
1908
1909 static inline unsigned
rb_commit_index(struct ring_buffer_per_cpu * cpu_buffer)1910 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
1911 {
1912 return rb_page_commit(cpu_buffer->commit_page);
1913 }
1914
1915 static inline unsigned
rb_event_index(struct ring_buffer_event * event)1916 rb_event_index(struct ring_buffer_event *event)
1917 {
1918 unsigned long addr = (unsigned long)event;
1919
1920 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
1921 }
1922
1923 static inline int
rb_event_is_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)1924 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
1925 struct ring_buffer_event *event)
1926 {
1927 unsigned long addr = (unsigned long)event;
1928 unsigned long index;
1929
1930 index = rb_event_index(event);
1931 addr &= PAGE_MASK;
1932
1933 return cpu_buffer->commit_page->page == (void *)addr &&
1934 rb_commit_index(cpu_buffer) == index;
1935 }
1936
1937 static void
rb_set_commit_to_write(struct ring_buffer_per_cpu * cpu_buffer)1938 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
1939 {
1940 unsigned long max_count;
1941
1942 /*
1943 * We only race with interrupts and NMIs on this CPU.
1944 * If we own the commit event, then we can commit
1945 * all others that interrupted us, since the interruptions
1946 * are in stack format (they finish before they come
1947 * back to us). This allows us to do a simple loop to
1948 * assign the commit to the tail.
1949 */
1950 again:
1951 max_count = cpu_buffer->nr_pages * 100;
1952
1953 while (cpu_buffer->commit_page != cpu_buffer->tail_page) {
1954 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
1955 return;
1956 if (RB_WARN_ON(cpu_buffer,
1957 rb_is_reader_page(cpu_buffer->tail_page)))
1958 return;
1959 local_set(&cpu_buffer->commit_page->page->commit,
1960 rb_page_write(cpu_buffer->commit_page));
1961 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
1962 cpu_buffer->write_stamp =
1963 cpu_buffer->commit_page->page->time_stamp;
1964 /* add barrier to keep gcc from optimizing too much */
1965 barrier();
1966 }
1967 while (rb_commit_index(cpu_buffer) !=
1968 rb_page_write(cpu_buffer->commit_page)) {
1969
1970 local_set(&cpu_buffer->commit_page->page->commit,
1971 rb_page_write(cpu_buffer->commit_page));
1972 RB_WARN_ON(cpu_buffer,
1973 local_read(&cpu_buffer->commit_page->page->commit) &
1974 ~RB_WRITE_MASK);
1975 barrier();
1976 }
1977
1978 /* again, keep gcc from optimizing */
1979 barrier();
1980
1981 /*
1982 * If an interrupt came in just after the first while loop
1983 * and pushed the tail page forward, we will be left with
1984 * a dangling commit that will never go forward.
1985 */
1986 if (unlikely(cpu_buffer->commit_page != cpu_buffer->tail_page))
1987 goto again;
1988 }
1989
rb_reset_reader_page(struct ring_buffer_per_cpu * cpu_buffer)1990 static void rb_reset_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
1991 {
1992 cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
1993 cpu_buffer->reader_page->read = 0;
1994 }
1995
rb_inc_iter(struct ring_buffer_iter * iter)1996 static void rb_inc_iter(struct ring_buffer_iter *iter)
1997 {
1998 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
1999
2000 /*
2001 * The iterator could be on the reader page (it starts there).
2002 * But the head could have moved, since the reader was
2003 * found. Check for this case and assign the iterator
2004 * to the head page instead of next.
2005 */
2006 if (iter->head_page == cpu_buffer->reader_page)
2007 iter->head_page = rb_set_head_page(cpu_buffer);
2008 else
2009 rb_inc_page(cpu_buffer, &iter->head_page);
2010
2011 iter->read_stamp = iter->head_page->page->time_stamp;
2012 iter->head = 0;
2013 }
2014
2015 /* Slow path, do not inline */
2016 static noinline struct ring_buffer_event *
rb_add_time_stamp(struct ring_buffer_event * event,u64 delta)2017 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta)
2018 {
2019 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2020
2021 /* Not the first event on the page? */
2022 if (rb_event_index(event)) {
2023 event->time_delta = delta & TS_MASK;
2024 event->array[0] = delta >> TS_SHIFT;
2025 } else {
2026 /* nope, just zero it */
2027 event->time_delta = 0;
2028 event->array[0] = 0;
2029 }
2030
2031 return skip_time_extend(event);
2032 }
2033
2034 /**
2035 * rb_update_event - update event type and data
2036 * @event: the event to update
2037 * @type: the type of event
2038 * @length: the size of the event field in the ring buffer
2039 *
2040 * Update the type and data fields of the event. The length
2041 * is the actual size that is written to the ring buffer,
2042 * and with this, we can determine what to place into the
2043 * data field.
2044 */
2045 static void
rb_update_event(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event,unsigned length,int add_timestamp,u64 delta)2046 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2047 struct ring_buffer_event *event, unsigned length,
2048 int add_timestamp, u64 delta)
2049 {
2050 /* Only a commit updates the timestamp */
2051 if (unlikely(!rb_event_is_commit(cpu_buffer, event)))
2052 delta = 0;
2053
2054 /*
2055 * If we need to add a timestamp, then we
2056 * add it to the start of the resevered space.
2057 */
2058 if (unlikely(add_timestamp)) {
2059 event = rb_add_time_stamp(event, delta);
2060 length -= RB_LEN_TIME_EXTEND;
2061 delta = 0;
2062 }
2063
2064 event->time_delta = delta;
2065 length -= RB_EVNT_HDR_SIZE;
2066 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2067 event->type_len = 0;
2068 event->array[0] = length;
2069 } else
2070 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2071 }
2072
2073 /*
2074 * rb_handle_head_page - writer hit the head page
2075 *
2076 * Returns: +1 to retry page
2077 * 0 to continue
2078 * -1 on error
2079 */
2080 static int
rb_handle_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)2081 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2082 struct buffer_page *tail_page,
2083 struct buffer_page *next_page)
2084 {
2085 struct buffer_page *new_head;
2086 int entries;
2087 int type;
2088 int ret;
2089
2090 entries = rb_page_entries(next_page);
2091
2092 /*
2093 * The hard part is here. We need to move the head
2094 * forward, and protect against both readers on
2095 * other CPUs and writers coming in via interrupts.
2096 */
2097 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2098 RB_PAGE_HEAD);
2099
2100 /*
2101 * type can be one of four:
2102 * NORMAL - an interrupt already moved it for us
2103 * HEAD - we are the first to get here.
2104 * UPDATE - we are the interrupt interrupting
2105 * a current move.
2106 * MOVED - a reader on another CPU moved the next
2107 * pointer to its reader page. Give up
2108 * and try again.
2109 */
2110
2111 switch (type) {
2112 case RB_PAGE_HEAD:
2113 /*
2114 * We changed the head to UPDATE, thus
2115 * it is our responsibility to update
2116 * the counters.
2117 */
2118 local_add(entries, &cpu_buffer->overrun);
2119 local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
2120
2121 /*
2122 * The entries will be zeroed out when we move the
2123 * tail page.
2124 */
2125
2126 /* still more to do */
2127 break;
2128
2129 case RB_PAGE_UPDATE:
2130 /*
2131 * This is an interrupt that interrupt the
2132 * previous update. Still more to do.
2133 */
2134 break;
2135 case RB_PAGE_NORMAL:
2136 /*
2137 * An interrupt came in before the update
2138 * and processed this for us.
2139 * Nothing left to do.
2140 */
2141 return 1;
2142 case RB_PAGE_MOVED:
2143 /*
2144 * The reader is on another CPU and just did
2145 * a swap with our next_page.
2146 * Try again.
2147 */
2148 return 1;
2149 default:
2150 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2151 return -1;
2152 }
2153
2154 /*
2155 * Now that we are here, the old head pointer is
2156 * set to UPDATE. This will keep the reader from
2157 * swapping the head page with the reader page.
2158 * The reader (on another CPU) will spin till
2159 * we are finished.
2160 *
2161 * We just need to protect against interrupts
2162 * doing the job. We will set the next pointer
2163 * to HEAD. After that, we set the old pointer
2164 * to NORMAL, but only if it was HEAD before.
2165 * otherwise we are an interrupt, and only
2166 * want the outer most commit to reset it.
2167 */
2168 new_head = next_page;
2169 rb_inc_page(cpu_buffer, &new_head);
2170
2171 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2172 RB_PAGE_NORMAL);
2173
2174 /*
2175 * Valid returns are:
2176 * HEAD - an interrupt came in and already set it.
2177 * NORMAL - One of two things:
2178 * 1) We really set it.
2179 * 2) A bunch of interrupts came in and moved
2180 * the page forward again.
2181 */
2182 switch (ret) {
2183 case RB_PAGE_HEAD:
2184 case RB_PAGE_NORMAL:
2185 /* OK */
2186 break;
2187 default:
2188 RB_WARN_ON(cpu_buffer, 1);
2189 return -1;
2190 }
2191
2192 /*
2193 * It is possible that an interrupt came in,
2194 * set the head up, then more interrupts came in
2195 * and moved it again. When we get back here,
2196 * the page would have been set to NORMAL but we
2197 * just set it back to HEAD.
2198 *
2199 * How do you detect this? Well, if that happened
2200 * the tail page would have moved.
2201 */
2202 if (ret == RB_PAGE_NORMAL) {
2203 /*
2204 * If the tail had moved passed next, then we need
2205 * to reset the pointer.
2206 */
2207 if (cpu_buffer->tail_page != tail_page &&
2208 cpu_buffer->tail_page != next_page)
2209 rb_head_page_set_normal(cpu_buffer, new_head,
2210 next_page,
2211 RB_PAGE_HEAD);
2212 }
2213
2214 /*
2215 * If this was the outer most commit (the one that
2216 * changed the original pointer from HEAD to UPDATE),
2217 * then it is up to us to reset it to NORMAL.
2218 */
2219 if (type == RB_PAGE_HEAD) {
2220 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2221 tail_page,
2222 RB_PAGE_UPDATE);
2223 if (RB_WARN_ON(cpu_buffer,
2224 ret != RB_PAGE_UPDATE))
2225 return -1;
2226 }
2227
2228 return 0;
2229 }
2230
rb_calculate_event_length(unsigned length)2231 static unsigned rb_calculate_event_length(unsigned length)
2232 {
2233 struct ring_buffer_event event; /* Used only for sizeof array */
2234
2235 /* zero length can cause confusions */
2236 if (!length)
2237 length = 1;
2238
2239 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2240 length += sizeof(event.array[0]);
2241
2242 length += RB_EVNT_HDR_SIZE;
2243 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2244
2245 return length;
2246 }
2247
2248 static inline void
rb_reset_tail(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,unsigned long tail,unsigned long length)2249 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2250 struct buffer_page *tail_page,
2251 unsigned long tail, unsigned long length)
2252 {
2253 struct ring_buffer_event *event;
2254
2255 /*
2256 * Only the event that crossed the page boundary
2257 * must fill the old tail_page with padding.
2258 */
2259 if (tail >= BUF_PAGE_SIZE) {
2260 /*
2261 * If the page was filled, then we still need
2262 * to update the real_end. Reset it to zero
2263 * and the reader will ignore it.
2264 */
2265 if (tail == BUF_PAGE_SIZE)
2266 tail_page->real_end = 0;
2267
2268 local_sub(length, &tail_page->write);
2269 return;
2270 }
2271
2272 event = __rb_page_index(tail_page, tail);
2273 kmemcheck_annotate_bitfield(event, bitfield);
2274
2275 /* account for padding bytes */
2276 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2277
2278 /*
2279 * Save the original length to the meta data.
2280 * This will be used by the reader to add lost event
2281 * counter.
2282 */
2283 tail_page->real_end = tail;
2284
2285 /*
2286 * If this event is bigger than the minimum size, then
2287 * we need to be careful that we don't subtract the
2288 * write counter enough to allow another writer to slip
2289 * in on this page.
2290 * We put in a discarded commit instead, to make sure
2291 * that this space is not used again.
2292 *
2293 * If we are less than the minimum size, we don't need to
2294 * worry about it.
2295 */
2296 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2297 /* No room for any events */
2298
2299 /* Mark the rest of the page with padding */
2300 rb_event_set_padding(event);
2301
2302 /* Set the write back to the previous setting */
2303 local_sub(length, &tail_page->write);
2304 return;
2305 }
2306
2307 /* Put in a discarded event */
2308 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2309 event->type_len = RINGBUF_TYPE_PADDING;
2310 /* time delta must be non zero */
2311 event->time_delta = 1;
2312
2313 /* Set write to end of buffer */
2314 length = (tail + length) - BUF_PAGE_SIZE;
2315 local_sub(length, &tail_page->write);
2316 }
2317
2318 /*
2319 * This is the slow path, force gcc not to inline it.
2320 */
2321 static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long length,unsigned long tail,struct buffer_page * tail_page,u64 ts)2322 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2323 unsigned long length, unsigned long tail,
2324 struct buffer_page *tail_page, u64 ts)
2325 {
2326 struct buffer_page *commit_page = cpu_buffer->commit_page;
2327 struct ring_buffer *buffer = cpu_buffer->buffer;
2328 struct buffer_page *next_page;
2329 int ret;
2330
2331 next_page = tail_page;
2332
2333 rb_inc_page(cpu_buffer, &next_page);
2334
2335 /*
2336 * If for some reason, we had an interrupt storm that made
2337 * it all the way around the buffer, bail, and warn
2338 * about it.
2339 */
2340 if (unlikely(next_page == commit_page)) {
2341 local_inc(&cpu_buffer->commit_overrun);
2342 goto out_reset;
2343 }
2344
2345 /*
2346 * This is where the fun begins!
2347 *
2348 * We are fighting against races between a reader that
2349 * could be on another CPU trying to swap its reader
2350 * page with the buffer head.
2351 *
2352 * We are also fighting against interrupts coming in and
2353 * moving the head or tail on us as well.
2354 *
2355 * If the next page is the head page then we have filled
2356 * the buffer, unless the commit page is still on the
2357 * reader page.
2358 */
2359 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2360
2361 /*
2362 * If the commit is not on the reader page, then
2363 * move the header page.
2364 */
2365 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2366 /*
2367 * If we are not in overwrite mode,
2368 * this is easy, just stop here.
2369 */
2370 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2371 local_inc(&cpu_buffer->dropped_events);
2372 goto out_reset;
2373 }
2374
2375 ret = rb_handle_head_page(cpu_buffer,
2376 tail_page,
2377 next_page);
2378 if (ret < 0)
2379 goto out_reset;
2380 if (ret)
2381 goto out_again;
2382 } else {
2383 /*
2384 * We need to be careful here too. The
2385 * commit page could still be on the reader
2386 * page. We could have a small buffer, and
2387 * have filled up the buffer with events
2388 * from interrupts and such, and wrapped.
2389 *
2390 * Note, if the tail page is also the on the
2391 * reader_page, we let it move out.
2392 */
2393 if (unlikely((cpu_buffer->commit_page !=
2394 cpu_buffer->tail_page) &&
2395 (cpu_buffer->commit_page ==
2396 cpu_buffer->reader_page))) {
2397 local_inc(&cpu_buffer->commit_overrun);
2398 goto out_reset;
2399 }
2400 }
2401 }
2402
2403 ret = rb_tail_page_update(cpu_buffer, tail_page, next_page);
2404 if (ret) {
2405 /*
2406 * Nested commits always have zero deltas, so
2407 * just reread the time stamp
2408 */
2409 ts = rb_time_stamp(buffer);
2410 next_page->page->time_stamp = ts;
2411 }
2412
2413 out_again:
2414
2415 rb_reset_tail(cpu_buffer, tail_page, tail, length);
2416
2417 /* fail and let the caller try again */
2418 return ERR_PTR(-EAGAIN);
2419
2420 out_reset:
2421 /* reset write */
2422 rb_reset_tail(cpu_buffer, tail_page, tail, length);
2423
2424 return NULL;
2425 }
2426
2427 static struct ring_buffer_event *
__rb_reserve_next(struct ring_buffer_per_cpu * cpu_buffer,unsigned long length,u64 ts,u64 delta,int add_timestamp)2428 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
2429 unsigned long length, u64 ts,
2430 u64 delta, int add_timestamp)
2431 {
2432 struct buffer_page *tail_page;
2433 struct ring_buffer_event *event;
2434 unsigned long tail, write;
2435
2436 /*
2437 * If the time delta since the last event is too big to
2438 * hold in the time field of the event, then we append a
2439 * TIME EXTEND event ahead of the data event.
2440 */
2441 if (unlikely(add_timestamp))
2442 length += RB_LEN_TIME_EXTEND;
2443
2444 tail_page = cpu_buffer->tail_page;
2445 write = local_add_return(length, &tail_page->write);
2446
2447 /* set write to only the index of the write */
2448 write &= RB_WRITE_MASK;
2449 tail = write - length;
2450
2451 /*
2452 * If this is the first commit on the page, then it has the same
2453 * timestamp as the page itself.
2454 */
2455 if (!tail)
2456 delta = 0;
2457
2458 /* See if we shot pass the end of this buffer page */
2459 if (unlikely(write > BUF_PAGE_SIZE))
2460 return rb_move_tail(cpu_buffer, length, tail,
2461 tail_page, ts);
2462
2463 /* We reserved something on the buffer */
2464
2465 event = __rb_page_index(tail_page, tail);
2466 kmemcheck_annotate_bitfield(event, bitfield);
2467 rb_update_event(cpu_buffer, event, length, add_timestamp, delta);
2468
2469 local_inc(&tail_page->entries);
2470
2471 /*
2472 * If this is the first commit on the page, then update
2473 * its timestamp.
2474 */
2475 if (!tail)
2476 tail_page->page->time_stamp = ts;
2477
2478 /* account for these added bytes */
2479 local_add(length, &cpu_buffer->entries_bytes);
2480
2481 return event;
2482 }
2483
2484 static inline int
rb_try_to_discard(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2485 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2486 struct ring_buffer_event *event)
2487 {
2488 unsigned long new_index, old_index;
2489 struct buffer_page *bpage;
2490 unsigned long index;
2491 unsigned long addr;
2492
2493 new_index = rb_event_index(event);
2494 old_index = new_index + rb_event_ts_length(event);
2495 addr = (unsigned long)event;
2496 addr &= PAGE_MASK;
2497
2498 bpage = cpu_buffer->tail_page;
2499
2500 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2501 unsigned long write_mask =
2502 local_read(&bpage->write) & ~RB_WRITE_MASK;
2503 unsigned long event_length = rb_event_length(event);
2504 /*
2505 * This is on the tail page. It is possible that
2506 * a write could come in and move the tail page
2507 * and write to the next page. That is fine
2508 * because we just shorten what is on this page.
2509 */
2510 old_index += write_mask;
2511 new_index += write_mask;
2512 index = local_cmpxchg(&bpage->write, old_index, new_index);
2513 if (index == old_index) {
2514 /* update counters */
2515 local_sub(event_length, &cpu_buffer->entries_bytes);
2516 return 1;
2517 }
2518 }
2519
2520 /* could not discard */
2521 return 0;
2522 }
2523
rb_start_commit(struct ring_buffer_per_cpu * cpu_buffer)2524 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2525 {
2526 local_inc(&cpu_buffer->committing);
2527 local_inc(&cpu_buffer->commits);
2528 }
2529
rb_end_commit(struct ring_buffer_per_cpu * cpu_buffer)2530 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
2531 {
2532 unsigned long commits;
2533
2534 if (RB_WARN_ON(cpu_buffer,
2535 !local_read(&cpu_buffer->committing)))
2536 return;
2537
2538 again:
2539 commits = local_read(&cpu_buffer->commits);
2540 /* synchronize with interrupts */
2541 barrier();
2542 if (local_read(&cpu_buffer->committing) == 1)
2543 rb_set_commit_to_write(cpu_buffer);
2544
2545 local_dec(&cpu_buffer->committing);
2546
2547 /* synchronize with interrupts */
2548 barrier();
2549
2550 /*
2551 * Need to account for interrupts coming in between the
2552 * updating of the commit page and the clearing of the
2553 * committing counter.
2554 */
2555 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
2556 !local_read(&cpu_buffer->committing)) {
2557 local_inc(&cpu_buffer->committing);
2558 goto again;
2559 }
2560 }
2561
2562 static struct ring_buffer_event *
rb_reserve_next_event(struct ring_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer,unsigned long length)2563 rb_reserve_next_event(struct ring_buffer *buffer,
2564 struct ring_buffer_per_cpu *cpu_buffer,
2565 unsigned long length)
2566 {
2567 struct ring_buffer_event *event;
2568 u64 ts, delta;
2569 int nr_loops = 0;
2570 int add_timestamp;
2571 u64 diff;
2572
2573 rb_start_commit(cpu_buffer);
2574
2575 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
2576 /*
2577 * Due to the ability to swap a cpu buffer from a buffer
2578 * it is possible it was swapped before we committed.
2579 * (committing stops a swap). We check for it here and
2580 * if it happened, we have to fail the write.
2581 */
2582 barrier();
2583 if (unlikely(ACCESS_ONCE(cpu_buffer->buffer) != buffer)) {
2584 local_dec(&cpu_buffer->committing);
2585 local_dec(&cpu_buffer->commits);
2586 return NULL;
2587 }
2588 #endif
2589
2590 length = rb_calculate_event_length(length);
2591 again:
2592 add_timestamp = 0;
2593 delta = 0;
2594
2595 /*
2596 * We allow for interrupts to reenter here and do a trace.
2597 * If one does, it will cause this original code to loop
2598 * back here. Even with heavy interrupts happening, this
2599 * should only happen a few times in a row. If this happens
2600 * 1000 times in a row, there must be either an interrupt
2601 * storm or we have something buggy.
2602 * Bail!
2603 */
2604 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
2605 goto out_fail;
2606
2607 ts = rb_time_stamp(cpu_buffer->buffer);
2608 diff = ts - cpu_buffer->write_stamp;
2609
2610 /* make sure this diff is calculated here */
2611 barrier();
2612
2613 /* Did the write stamp get updated already? */
2614 if (likely(ts >= cpu_buffer->write_stamp)) {
2615 delta = diff;
2616 if (unlikely(test_time_stamp(delta))) {
2617 int local_clock_stable = 1;
2618 #ifdef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
2619 local_clock_stable = sched_clock_stable();
2620 #endif
2621 WARN_ONCE(delta > (1ULL << 59),
2622 KERN_WARNING "Delta way too big! %llu ts=%llu write stamp = %llu\n%s",
2623 (unsigned long long)delta,
2624 (unsigned long long)ts,
2625 (unsigned long long)cpu_buffer->write_stamp,
2626 local_clock_stable ? "" :
2627 "If you just came from a suspend/resume,\n"
2628 "please switch to the trace global clock:\n"
2629 " echo global > /sys/kernel/debug/tracing/trace_clock\n");
2630 add_timestamp = 1;
2631 }
2632 }
2633
2634 event = __rb_reserve_next(cpu_buffer, length, ts,
2635 delta, add_timestamp);
2636 if (unlikely(PTR_ERR(event) == -EAGAIN))
2637 goto again;
2638
2639 if (!event)
2640 goto out_fail;
2641
2642 return event;
2643
2644 out_fail:
2645 rb_end_commit(cpu_buffer);
2646 return NULL;
2647 }
2648
2649 #ifdef CONFIG_TRACING
2650
2651 /*
2652 * The lock and unlock are done within a preempt disable section.
2653 * The current_context per_cpu variable can only be modified
2654 * by the current task between lock and unlock. But it can
2655 * be modified more than once via an interrupt. To pass this
2656 * information from the lock to the unlock without having to
2657 * access the 'in_interrupt()' functions again (which do show
2658 * a bit of overhead in something as critical as function tracing,
2659 * we use a bitmask trick.
2660 *
2661 * bit 0 = NMI context
2662 * bit 1 = IRQ context
2663 * bit 2 = SoftIRQ context
2664 * bit 3 = normal context.
2665 *
2666 * This works because this is the order of contexts that can
2667 * preempt other contexts. A SoftIRQ never preempts an IRQ
2668 * context.
2669 *
2670 * When the context is determined, the corresponding bit is
2671 * checked and set (if it was set, then a recursion of that context
2672 * happened).
2673 *
2674 * On unlock, we need to clear this bit. To do so, just subtract
2675 * 1 from the current_context and AND it to itself.
2676 *
2677 * (binary)
2678 * 101 - 1 = 100
2679 * 101 & 100 = 100 (clearing bit zero)
2680 *
2681 * 1010 - 1 = 1001
2682 * 1010 & 1001 = 1000 (clearing bit 1)
2683 *
2684 * The least significant bit can be cleared this way, and it
2685 * just so happens that it is the same bit corresponding to
2686 * the current context.
2687 */
2688
2689 static __always_inline int
trace_recursive_lock(struct ring_buffer_per_cpu * cpu_buffer)2690 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
2691 {
2692 unsigned int val = cpu_buffer->current_context;
2693 int bit;
2694
2695 if (in_interrupt()) {
2696 if (in_nmi())
2697 bit = 0;
2698 else if (in_irq())
2699 bit = 1;
2700 else
2701 bit = 2;
2702 } else
2703 bit = 3;
2704
2705 if (unlikely(val & (1 << bit)))
2706 return 1;
2707
2708 val |= (1 << bit);
2709 cpu_buffer->current_context = val;
2710
2711 return 0;
2712 }
2713
2714 static __always_inline void
trace_recursive_unlock(struct ring_buffer_per_cpu * cpu_buffer)2715 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
2716 {
2717 cpu_buffer->current_context &= cpu_buffer->current_context - 1;
2718 }
2719
2720 #else
2721
2722 #define trace_recursive_lock(cpu_buffer) (0)
2723 #define trace_recursive_unlock(cpu_buffer) do { } while (0)
2724
2725 #endif
2726
2727 /**
2728 * ring_buffer_lock_reserve - reserve a part of the buffer
2729 * @buffer: the ring buffer to reserve from
2730 * @length: the length of the data to reserve (excluding event header)
2731 *
2732 * Returns a reseverd event on the ring buffer to copy directly to.
2733 * The user of this interface will need to get the body to write into
2734 * and can use the ring_buffer_event_data() interface.
2735 *
2736 * The length is the length of the data needed, not the event length
2737 * which also includes the event header.
2738 *
2739 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
2740 * If NULL is returned, then nothing has been allocated or locked.
2741 */
2742 struct ring_buffer_event *
ring_buffer_lock_reserve(struct ring_buffer * buffer,unsigned long length)2743 ring_buffer_lock_reserve(struct ring_buffer *buffer, unsigned long length)
2744 {
2745 struct ring_buffer_per_cpu *cpu_buffer;
2746 struct ring_buffer_event *event;
2747 int cpu;
2748
2749 if (ring_buffer_flags != RB_BUFFERS_ON)
2750 return NULL;
2751
2752 /* If we are tracing schedule, we don't want to recurse */
2753 preempt_disable_notrace();
2754
2755 if (unlikely(atomic_read(&buffer->record_disabled)))
2756 goto out;
2757
2758 cpu = raw_smp_processor_id();
2759
2760 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
2761 goto out;
2762
2763 cpu_buffer = buffer->buffers[cpu];
2764
2765 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
2766 goto out;
2767
2768 if (unlikely(length > BUF_MAX_DATA_SIZE))
2769 goto out;
2770
2771 if (unlikely(trace_recursive_lock(cpu_buffer)))
2772 goto out;
2773
2774 event = rb_reserve_next_event(buffer, cpu_buffer, length);
2775 if (!event)
2776 goto out_unlock;
2777
2778 return event;
2779
2780 out_unlock:
2781 trace_recursive_unlock(cpu_buffer);
2782 out:
2783 preempt_enable_notrace();
2784 return NULL;
2785 }
2786 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
2787
2788 static void
rb_update_write_stamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2789 rb_update_write_stamp(struct ring_buffer_per_cpu *cpu_buffer,
2790 struct ring_buffer_event *event)
2791 {
2792 u64 delta;
2793
2794 /*
2795 * The event first in the commit queue updates the
2796 * time stamp.
2797 */
2798 if (rb_event_is_commit(cpu_buffer, event)) {
2799 /*
2800 * A commit event that is first on a page
2801 * updates the write timestamp with the page stamp
2802 */
2803 if (!rb_event_index(event))
2804 cpu_buffer->write_stamp =
2805 cpu_buffer->commit_page->page->time_stamp;
2806 else if (event->type_len == RINGBUF_TYPE_TIME_EXTEND) {
2807 delta = event->array[0];
2808 delta <<= TS_SHIFT;
2809 delta += event->time_delta;
2810 cpu_buffer->write_stamp += delta;
2811 } else
2812 cpu_buffer->write_stamp += event->time_delta;
2813 }
2814 }
2815
rb_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2816 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
2817 struct ring_buffer_event *event)
2818 {
2819 local_inc(&cpu_buffer->entries);
2820 rb_update_write_stamp(cpu_buffer, event);
2821 rb_end_commit(cpu_buffer);
2822 }
2823
2824 static __always_inline void
rb_wakeups(struct ring_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer)2825 rb_wakeups(struct ring_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
2826 {
2827 bool pagebusy;
2828
2829 if (buffer->irq_work.waiters_pending) {
2830 buffer->irq_work.waiters_pending = false;
2831 /* irq_work_queue() supplies it's own memory barriers */
2832 irq_work_queue(&buffer->irq_work.work);
2833 }
2834
2835 if (cpu_buffer->irq_work.waiters_pending) {
2836 cpu_buffer->irq_work.waiters_pending = false;
2837 /* irq_work_queue() supplies it's own memory barriers */
2838 irq_work_queue(&cpu_buffer->irq_work.work);
2839 }
2840
2841 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
2842
2843 if (!pagebusy && cpu_buffer->irq_work.full_waiters_pending) {
2844 cpu_buffer->irq_work.wakeup_full = true;
2845 cpu_buffer->irq_work.full_waiters_pending = false;
2846 /* irq_work_queue() supplies it's own memory barriers */
2847 irq_work_queue(&cpu_buffer->irq_work.work);
2848 }
2849 }
2850
2851 /**
2852 * ring_buffer_unlock_commit - commit a reserved
2853 * @buffer: The buffer to commit to
2854 * @event: The event pointer to commit.
2855 *
2856 * This commits the data to the ring buffer, and releases any locks held.
2857 *
2858 * Must be paired with ring_buffer_lock_reserve.
2859 */
ring_buffer_unlock_commit(struct ring_buffer * buffer,struct ring_buffer_event * event)2860 int ring_buffer_unlock_commit(struct ring_buffer *buffer,
2861 struct ring_buffer_event *event)
2862 {
2863 struct ring_buffer_per_cpu *cpu_buffer;
2864 int cpu = raw_smp_processor_id();
2865
2866 cpu_buffer = buffer->buffers[cpu];
2867
2868 rb_commit(cpu_buffer, event);
2869
2870 rb_wakeups(buffer, cpu_buffer);
2871
2872 trace_recursive_unlock(cpu_buffer);
2873
2874 preempt_enable_notrace();
2875
2876 return 0;
2877 }
2878 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
2879
rb_event_discard(struct ring_buffer_event * event)2880 static inline void rb_event_discard(struct ring_buffer_event *event)
2881 {
2882 if (event->type_len == RINGBUF_TYPE_TIME_EXTEND)
2883 event = skip_time_extend(event);
2884
2885 /* array[0] holds the actual length for the discarded event */
2886 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
2887 event->type_len = RINGBUF_TYPE_PADDING;
2888 /* time delta must be non zero */
2889 if (!event->time_delta)
2890 event->time_delta = 1;
2891 }
2892
2893 /*
2894 * Decrement the entries to the page that an event is on.
2895 * The event does not even need to exist, only the pointer
2896 * to the page it is on. This may only be called before the commit
2897 * takes place.
2898 */
2899 static inline void
rb_decrement_entry(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2900 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
2901 struct ring_buffer_event *event)
2902 {
2903 unsigned long addr = (unsigned long)event;
2904 struct buffer_page *bpage = cpu_buffer->commit_page;
2905 struct buffer_page *start;
2906
2907 addr &= PAGE_MASK;
2908
2909 /* Do the likely case first */
2910 if (likely(bpage->page == (void *)addr)) {
2911 local_dec(&bpage->entries);
2912 return;
2913 }
2914
2915 /*
2916 * Because the commit page may be on the reader page we
2917 * start with the next page and check the end loop there.
2918 */
2919 rb_inc_page(cpu_buffer, &bpage);
2920 start = bpage;
2921 do {
2922 if (bpage->page == (void *)addr) {
2923 local_dec(&bpage->entries);
2924 return;
2925 }
2926 rb_inc_page(cpu_buffer, &bpage);
2927 } while (bpage != start);
2928
2929 /* commit not part of this buffer?? */
2930 RB_WARN_ON(cpu_buffer, 1);
2931 }
2932
2933 /**
2934 * ring_buffer_commit_discard - discard an event that has not been committed
2935 * @buffer: the ring buffer
2936 * @event: non committed event to discard
2937 *
2938 * Sometimes an event that is in the ring buffer needs to be ignored.
2939 * This function lets the user discard an event in the ring buffer
2940 * and then that event will not be read later.
2941 *
2942 * This function only works if it is called before the the item has been
2943 * committed. It will try to free the event from the ring buffer
2944 * if another event has not been added behind it.
2945 *
2946 * If another event has been added behind it, it will set the event
2947 * up as discarded, and perform the commit.
2948 *
2949 * If this function is called, do not call ring_buffer_unlock_commit on
2950 * the event.
2951 */
ring_buffer_discard_commit(struct ring_buffer * buffer,struct ring_buffer_event * event)2952 void ring_buffer_discard_commit(struct ring_buffer *buffer,
2953 struct ring_buffer_event *event)
2954 {
2955 struct ring_buffer_per_cpu *cpu_buffer;
2956 int cpu;
2957
2958 /* The event is discarded regardless */
2959 rb_event_discard(event);
2960
2961 cpu = smp_processor_id();
2962 cpu_buffer = buffer->buffers[cpu];
2963
2964 /*
2965 * This must only be called if the event has not been
2966 * committed yet. Thus we can assume that preemption
2967 * is still disabled.
2968 */
2969 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
2970
2971 rb_decrement_entry(cpu_buffer, event);
2972 if (rb_try_to_discard(cpu_buffer, event))
2973 goto out;
2974
2975 /*
2976 * The commit is still visible by the reader, so we
2977 * must still update the timestamp.
2978 */
2979 rb_update_write_stamp(cpu_buffer, event);
2980 out:
2981 rb_end_commit(cpu_buffer);
2982
2983 trace_recursive_unlock(cpu_buffer);
2984
2985 preempt_enable_notrace();
2986
2987 }
2988 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
2989
2990 /**
2991 * ring_buffer_write - write data to the buffer without reserving
2992 * @buffer: The ring buffer to write to.
2993 * @length: The length of the data being written (excluding the event header)
2994 * @data: The data to write to the buffer.
2995 *
2996 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
2997 * one function. If you already have the data to write to the buffer, it
2998 * may be easier to simply call this function.
2999 *
3000 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3001 * and not the length of the event which would hold the header.
3002 */
ring_buffer_write(struct ring_buffer * buffer,unsigned long length,void * data)3003 int ring_buffer_write(struct ring_buffer *buffer,
3004 unsigned long length,
3005 void *data)
3006 {
3007 struct ring_buffer_per_cpu *cpu_buffer;
3008 struct ring_buffer_event *event;
3009 void *body;
3010 int ret = -EBUSY;
3011 int cpu;
3012
3013 if (ring_buffer_flags != RB_BUFFERS_ON)
3014 return -EBUSY;
3015
3016 preempt_disable_notrace();
3017
3018 if (atomic_read(&buffer->record_disabled))
3019 goto out;
3020
3021 cpu = raw_smp_processor_id();
3022
3023 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3024 goto out;
3025
3026 cpu_buffer = buffer->buffers[cpu];
3027
3028 if (atomic_read(&cpu_buffer->record_disabled))
3029 goto out;
3030
3031 if (length > BUF_MAX_DATA_SIZE)
3032 goto out;
3033
3034 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3035 if (!event)
3036 goto out;
3037
3038 body = rb_event_data(event);
3039
3040 memcpy(body, data, length);
3041
3042 rb_commit(cpu_buffer, event);
3043
3044 rb_wakeups(buffer, cpu_buffer);
3045
3046 ret = 0;
3047 out:
3048 preempt_enable_notrace();
3049
3050 return ret;
3051 }
3052 EXPORT_SYMBOL_GPL(ring_buffer_write);
3053
rb_per_cpu_empty(struct ring_buffer_per_cpu * cpu_buffer)3054 static int rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3055 {
3056 struct buffer_page *reader = cpu_buffer->reader_page;
3057 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3058 struct buffer_page *commit = cpu_buffer->commit_page;
3059
3060 /* In case of error, head will be NULL */
3061 if (unlikely(!head))
3062 return 1;
3063
3064 return reader->read == rb_page_commit(reader) &&
3065 (commit == reader ||
3066 (commit == head &&
3067 head->read == rb_page_commit(commit)));
3068 }
3069
3070 /**
3071 * ring_buffer_record_disable - stop all writes into the buffer
3072 * @buffer: The ring buffer to stop writes to.
3073 *
3074 * This prevents all writes to the buffer. Any attempt to write
3075 * to the buffer after this will fail and return NULL.
3076 *
3077 * The caller should call synchronize_sched() after this.
3078 */
ring_buffer_record_disable(struct ring_buffer * buffer)3079 void ring_buffer_record_disable(struct ring_buffer *buffer)
3080 {
3081 atomic_inc(&buffer->record_disabled);
3082 }
3083 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3084
3085 /**
3086 * ring_buffer_record_enable - enable writes to the buffer
3087 * @buffer: The ring buffer to enable writes
3088 *
3089 * Note, multiple disables will need the same number of enables
3090 * to truly enable the writing (much like preempt_disable).
3091 */
ring_buffer_record_enable(struct ring_buffer * buffer)3092 void ring_buffer_record_enable(struct ring_buffer *buffer)
3093 {
3094 atomic_dec(&buffer->record_disabled);
3095 }
3096 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3097
3098 /**
3099 * ring_buffer_record_off - stop all writes into the buffer
3100 * @buffer: The ring buffer to stop writes to.
3101 *
3102 * This prevents all writes to the buffer. Any attempt to write
3103 * to the buffer after this will fail and return NULL.
3104 *
3105 * This is different than ring_buffer_record_disable() as
3106 * it works like an on/off switch, where as the disable() version
3107 * must be paired with a enable().
3108 */
ring_buffer_record_off(struct ring_buffer * buffer)3109 void ring_buffer_record_off(struct ring_buffer *buffer)
3110 {
3111 unsigned int rd;
3112 unsigned int new_rd;
3113
3114 do {
3115 rd = atomic_read(&buffer->record_disabled);
3116 new_rd = rd | RB_BUFFER_OFF;
3117 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3118 }
3119 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3120
3121 /**
3122 * ring_buffer_record_on - restart writes into the buffer
3123 * @buffer: The ring buffer to start writes to.
3124 *
3125 * This enables all writes to the buffer that was disabled by
3126 * ring_buffer_record_off().
3127 *
3128 * This is different than ring_buffer_record_enable() as
3129 * it works like an on/off switch, where as the enable() version
3130 * must be paired with a disable().
3131 */
ring_buffer_record_on(struct ring_buffer * buffer)3132 void ring_buffer_record_on(struct ring_buffer *buffer)
3133 {
3134 unsigned int rd;
3135 unsigned int new_rd;
3136
3137 do {
3138 rd = atomic_read(&buffer->record_disabled);
3139 new_rd = rd & ~RB_BUFFER_OFF;
3140 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3141 }
3142 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3143
3144 /**
3145 * ring_buffer_record_is_on - return true if the ring buffer can write
3146 * @buffer: The ring buffer to see if write is enabled
3147 *
3148 * Returns true if the ring buffer is in a state that it accepts writes.
3149 */
ring_buffer_record_is_on(struct ring_buffer * buffer)3150 int ring_buffer_record_is_on(struct ring_buffer *buffer)
3151 {
3152 return !atomic_read(&buffer->record_disabled);
3153 }
3154
3155 /**
3156 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3157 * @buffer: The ring buffer to stop writes to.
3158 * @cpu: The CPU buffer to stop
3159 *
3160 * This prevents all writes to the buffer. Any attempt to write
3161 * to the buffer after this will fail and return NULL.
3162 *
3163 * The caller should call synchronize_sched() after this.
3164 */
ring_buffer_record_disable_cpu(struct ring_buffer * buffer,int cpu)3165 void ring_buffer_record_disable_cpu(struct ring_buffer *buffer, int cpu)
3166 {
3167 struct ring_buffer_per_cpu *cpu_buffer;
3168
3169 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3170 return;
3171
3172 cpu_buffer = buffer->buffers[cpu];
3173 atomic_inc(&cpu_buffer->record_disabled);
3174 }
3175 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3176
3177 /**
3178 * ring_buffer_record_enable_cpu - enable writes to the buffer
3179 * @buffer: The ring buffer to enable writes
3180 * @cpu: The CPU to enable.
3181 *
3182 * Note, multiple disables will need the same number of enables
3183 * to truly enable the writing (much like preempt_disable).
3184 */
ring_buffer_record_enable_cpu(struct ring_buffer * buffer,int cpu)3185 void ring_buffer_record_enable_cpu(struct ring_buffer *buffer, int cpu)
3186 {
3187 struct ring_buffer_per_cpu *cpu_buffer;
3188
3189 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3190 return;
3191
3192 cpu_buffer = buffer->buffers[cpu];
3193 atomic_dec(&cpu_buffer->record_disabled);
3194 }
3195 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3196
3197 /*
3198 * The total entries in the ring buffer is the running counter
3199 * of entries entered into the ring buffer, minus the sum of
3200 * the entries read from the ring buffer and the number of
3201 * entries that were overwritten.
3202 */
3203 static inline unsigned long
rb_num_of_entries(struct ring_buffer_per_cpu * cpu_buffer)3204 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3205 {
3206 return local_read(&cpu_buffer->entries) -
3207 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3208 }
3209
3210 /**
3211 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3212 * @buffer: The ring buffer
3213 * @cpu: The per CPU buffer to read from.
3214 */
ring_buffer_oldest_event_ts(struct ring_buffer * buffer,int cpu)3215 u64 ring_buffer_oldest_event_ts(struct ring_buffer *buffer, int cpu)
3216 {
3217 unsigned long flags;
3218 struct ring_buffer_per_cpu *cpu_buffer;
3219 struct buffer_page *bpage;
3220 u64 ret = 0;
3221
3222 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3223 return 0;
3224
3225 cpu_buffer = buffer->buffers[cpu];
3226 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3227 /*
3228 * if the tail is on reader_page, oldest time stamp is on the reader
3229 * page
3230 */
3231 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3232 bpage = cpu_buffer->reader_page;
3233 else
3234 bpage = rb_set_head_page(cpu_buffer);
3235 if (bpage)
3236 ret = bpage->page->time_stamp;
3237 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3238
3239 return ret;
3240 }
3241 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3242
3243 /**
3244 * ring_buffer_bytes_cpu - get the number of bytes consumed in a cpu buffer
3245 * @buffer: The ring buffer
3246 * @cpu: The per CPU buffer to read from.
3247 */
ring_buffer_bytes_cpu(struct ring_buffer * buffer,int cpu)3248 unsigned long ring_buffer_bytes_cpu(struct ring_buffer *buffer, int cpu)
3249 {
3250 struct ring_buffer_per_cpu *cpu_buffer;
3251 unsigned long ret;
3252
3253 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3254 return 0;
3255
3256 cpu_buffer = buffer->buffers[cpu];
3257 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3258
3259 return ret;
3260 }
3261 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3262
3263 /**
3264 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3265 * @buffer: The ring buffer
3266 * @cpu: The per CPU buffer to get the entries from.
3267 */
ring_buffer_entries_cpu(struct ring_buffer * buffer,int cpu)3268 unsigned long ring_buffer_entries_cpu(struct ring_buffer *buffer, int cpu)
3269 {
3270 struct ring_buffer_per_cpu *cpu_buffer;
3271
3272 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3273 return 0;
3274
3275 cpu_buffer = buffer->buffers[cpu];
3276
3277 return rb_num_of_entries(cpu_buffer);
3278 }
3279 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3280
3281 /**
3282 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3283 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3284 * @buffer: The ring buffer
3285 * @cpu: The per CPU buffer to get the number of overruns from
3286 */
ring_buffer_overrun_cpu(struct ring_buffer * buffer,int cpu)3287 unsigned long ring_buffer_overrun_cpu(struct ring_buffer *buffer, int cpu)
3288 {
3289 struct ring_buffer_per_cpu *cpu_buffer;
3290 unsigned long ret;
3291
3292 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3293 return 0;
3294
3295 cpu_buffer = buffer->buffers[cpu];
3296 ret = local_read(&cpu_buffer->overrun);
3297
3298 return ret;
3299 }
3300 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
3301
3302 /**
3303 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
3304 * commits failing due to the buffer wrapping around while there are uncommitted
3305 * events, such as during an interrupt storm.
3306 * @buffer: The ring buffer
3307 * @cpu: The per CPU buffer to get the number of overruns from
3308 */
3309 unsigned long
ring_buffer_commit_overrun_cpu(struct ring_buffer * buffer,int cpu)3310 ring_buffer_commit_overrun_cpu(struct ring_buffer *buffer, int cpu)
3311 {
3312 struct ring_buffer_per_cpu *cpu_buffer;
3313 unsigned long ret;
3314
3315 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3316 return 0;
3317
3318 cpu_buffer = buffer->buffers[cpu];
3319 ret = local_read(&cpu_buffer->commit_overrun);
3320
3321 return ret;
3322 }
3323 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
3324
3325 /**
3326 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
3327 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
3328 * @buffer: The ring buffer
3329 * @cpu: The per CPU buffer to get the number of overruns from
3330 */
3331 unsigned long
ring_buffer_dropped_events_cpu(struct ring_buffer * buffer,int cpu)3332 ring_buffer_dropped_events_cpu(struct ring_buffer *buffer, int cpu)
3333 {
3334 struct ring_buffer_per_cpu *cpu_buffer;
3335 unsigned long ret;
3336
3337 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3338 return 0;
3339
3340 cpu_buffer = buffer->buffers[cpu];
3341 ret = local_read(&cpu_buffer->dropped_events);
3342
3343 return ret;
3344 }
3345 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
3346
3347 /**
3348 * ring_buffer_read_events_cpu - get the number of events successfully read
3349 * @buffer: The ring buffer
3350 * @cpu: The per CPU buffer to get the number of events read
3351 */
3352 unsigned long
ring_buffer_read_events_cpu(struct ring_buffer * buffer,int cpu)3353 ring_buffer_read_events_cpu(struct ring_buffer *buffer, int cpu)
3354 {
3355 struct ring_buffer_per_cpu *cpu_buffer;
3356
3357 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3358 return 0;
3359
3360 cpu_buffer = buffer->buffers[cpu];
3361 return cpu_buffer->read;
3362 }
3363 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
3364
3365 /**
3366 * ring_buffer_entries - get the number of entries in a buffer
3367 * @buffer: The ring buffer
3368 *
3369 * Returns the total number of entries in the ring buffer
3370 * (all CPU entries)
3371 */
ring_buffer_entries(struct ring_buffer * buffer)3372 unsigned long ring_buffer_entries(struct ring_buffer *buffer)
3373 {
3374 struct ring_buffer_per_cpu *cpu_buffer;
3375 unsigned long entries = 0;
3376 int cpu;
3377
3378 /* if you care about this being correct, lock the buffer */
3379 for_each_buffer_cpu(buffer, cpu) {
3380 cpu_buffer = buffer->buffers[cpu];
3381 entries += rb_num_of_entries(cpu_buffer);
3382 }
3383
3384 return entries;
3385 }
3386 EXPORT_SYMBOL_GPL(ring_buffer_entries);
3387
3388 /**
3389 * ring_buffer_overruns - get the number of overruns in buffer
3390 * @buffer: The ring buffer
3391 *
3392 * Returns the total number of overruns in the ring buffer
3393 * (all CPU entries)
3394 */
ring_buffer_overruns(struct ring_buffer * buffer)3395 unsigned long ring_buffer_overruns(struct ring_buffer *buffer)
3396 {
3397 struct ring_buffer_per_cpu *cpu_buffer;
3398 unsigned long overruns = 0;
3399 int cpu;
3400
3401 /* if you care about this being correct, lock the buffer */
3402 for_each_buffer_cpu(buffer, cpu) {
3403 cpu_buffer = buffer->buffers[cpu];
3404 overruns += local_read(&cpu_buffer->overrun);
3405 }
3406
3407 return overruns;
3408 }
3409 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
3410
rb_iter_reset(struct ring_buffer_iter * iter)3411 static void rb_iter_reset(struct ring_buffer_iter *iter)
3412 {
3413 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3414
3415 /* Iterator usage is expected to have record disabled */
3416 iter->head_page = cpu_buffer->reader_page;
3417 iter->head = cpu_buffer->reader_page->read;
3418
3419 iter->cache_reader_page = iter->head_page;
3420 iter->cache_read = cpu_buffer->read;
3421
3422 if (iter->head)
3423 iter->read_stamp = cpu_buffer->read_stamp;
3424 else
3425 iter->read_stamp = iter->head_page->page->time_stamp;
3426 }
3427
3428 /**
3429 * ring_buffer_iter_reset - reset an iterator
3430 * @iter: The iterator to reset
3431 *
3432 * Resets the iterator, so that it will start from the beginning
3433 * again.
3434 */
ring_buffer_iter_reset(struct ring_buffer_iter * iter)3435 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
3436 {
3437 struct ring_buffer_per_cpu *cpu_buffer;
3438 unsigned long flags;
3439
3440 if (!iter)
3441 return;
3442
3443 cpu_buffer = iter->cpu_buffer;
3444
3445 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3446 rb_iter_reset(iter);
3447 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3448 }
3449 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
3450
3451 /**
3452 * ring_buffer_iter_empty - check if an iterator has no more to read
3453 * @iter: The iterator to check
3454 */
ring_buffer_iter_empty(struct ring_buffer_iter * iter)3455 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
3456 {
3457 struct ring_buffer_per_cpu *cpu_buffer;
3458 struct buffer_page *reader;
3459 struct buffer_page *head_page;
3460 struct buffer_page *commit_page;
3461 unsigned commit;
3462
3463 cpu_buffer = iter->cpu_buffer;
3464
3465 /* Remember, trace recording is off when iterator is in use */
3466 reader = cpu_buffer->reader_page;
3467 head_page = cpu_buffer->head_page;
3468 commit_page = cpu_buffer->commit_page;
3469 commit = rb_page_commit(commit_page);
3470
3471 return ((iter->head_page == commit_page && iter->head == commit) ||
3472 (iter->head_page == reader && commit_page == head_page &&
3473 head_page->read == commit &&
3474 iter->head == rb_page_commit(cpu_buffer->reader_page)));
3475 }
3476 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
3477
3478 static void
rb_update_read_stamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3479 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
3480 struct ring_buffer_event *event)
3481 {
3482 u64 delta;
3483
3484 switch (event->type_len) {
3485 case RINGBUF_TYPE_PADDING:
3486 return;
3487
3488 case RINGBUF_TYPE_TIME_EXTEND:
3489 delta = event->array[0];
3490 delta <<= TS_SHIFT;
3491 delta += event->time_delta;
3492 cpu_buffer->read_stamp += delta;
3493 return;
3494
3495 case RINGBUF_TYPE_TIME_STAMP:
3496 /* FIXME: not implemented */
3497 return;
3498
3499 case RINGBUF_TYPE_DATA:
3500 cpu_buffer->read_stamp += event->time_delta;
3501 return;
3502
3503 default:
3504 BUG();
3505 }
3506 return;
3507 }
3508
3509 static void
rb_update_iter_read_stamp(struct ring_buffer_iter * iter,struct ring_buffer_event * event)3510 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
3511 struct ring_buffer_event *event)
3512 {
3513 u64 delta;
3514
3515 switch (event->type_len) {
3516 case RINGBUF_TYPE_PADDING:
3517 return;
3518
3519 case RINGBUF_TYPE_TIME_EXTEND:
3520 delta = event->array[0];
3521 delta <<= TS_SHIFT;
3522 delta += event->time_delta;
3523 iter->read_stamp += delta;
3524 return;
3525
3526 case RINGBUF_TYPE_TIME_STAMP:
3527 /* FIXME: not implemented */
3528 return;
3529
3530 case RINGBUF_TYPE_DATA:
3531 iter->read_stamp += event->time_delta;
3532 return;
3533
3534 default:
3535 BUG();
3536 }
3537 return;
3538 }
3539
3540 static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu * cpu_buffer)3541 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
3542 {
3543 struct buffer_page *reader = NULL;
3544 unsigned long overwrite;
3545 unsigned long flags;
3546 int nr_loops = 0;
3547 int ret;
3548
3549 local_irq_save(flags);
3550 arch_spin_lock(&cpu_buffer->lock);
3551
3552 again:
3553 /*
3554 * This should normally only loop twice. But because the
3555 * start of the reader inserts an empty page, it causes
3556 * a case where we will loop three times. There should be no
3557 * reason to loop four times (that I know of).
3558 */
3559 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
3560 reader = NULL;
3561 goto out;
3562 }
3563
3564 reader = cpu_buffer->reader_page;
3565
3566 /* If there's more to read, return this page */
3567 if (cpu_buffer->reader_page->read < rb_page_size(reader))
3568 goto out;
3569
3570 /* Never should we have an index greater than the size */
3571 if (RB_WARN_ON(cpu_buffer,
3572 cpu_buffer->reader_page->read > rb_page_size(reader)))
3573 goto out;
3574
3575 /* check if we caught up to the tail */
3576 reader = NULL;
3577 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
3578 goto out;
3579
3580 /* Don't bother swapping if the ring buffer is empty */
3581 if (rb_num_of_entries(cpu_buffer) == 0)
3582 goto out;
3583
3584 /*
3585 * Reset the reader page to size zero.
3586 */
3587 local_set(&cpu_buffer->reader_page->write, 0);
3588 local_set(&cpu_buffer->reader_page->entries, 0);
3589 local_set(&cpu_buffer->reader_page->page->commit, 0);
3590 cpu_buffer->reader_page->real_end = 0;
3591
3592 spin:
3593 /*
3594 * Splice the empty reader page into the list around the head.
3595 */
3596 reader = rb_set_head_page(cpu_buffer);
3597 if (!reader)
3598 goto out;
3599 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
3600 cpu_buffer->reader_page->list.prev = reader->list.prev;
3601
3602 /*
3603 * cpu_buffer->pages just needs to point to the buffer, it
3604 * has no specific buffer page to point to. Lets move it out
3605 * of our way so we don't accidentally swap it.
3606 */
3607 cpu_buffer->pages = reader->list.prev;
3608
3609 /* The reader page will be pointing to the new head */
3610 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
3611
3612 /*
3613 * We want to make sure we read the overruns after we set up our
3614 * pointers to the next object. The writer side does a
3615 * cmpxchg to cross pages which acts as the mb on the writer
3616 * side. Note, the reader will constantly fail the swap
3617 * while the writer is updating the pointers, so this
3618 * guarantees that the overwrite recorded here is the one we
3619 * want to compare with the last_overrun.
3620 */
3621 smp_mb();
3622 overwrite = local_read(&(cpu_buffer->overrun));
3623
3624 /*
3625 * Here's the tricky part.
3626 *
3627 * We need to move the pointer past the header page.
3628 * But we can only do that if a writer is not currently
3629 * moving it. The page before the header page has the
3630 * flag bit '1' set if it is pointing to the page we want.
3631 * but if the writer is in the process of moving it
3632 * than it will be '2' or already moved '0'.
3633 */
3634
3635 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
3636
3637 /*
3638 * If we did not convert it, then we must try again.
3639 */
3640 if (!ret)
3641 goto spin;
3642
3643 /*
3644 * Yeah! We succeeded in replacing the page.
3645 *
3646 * Now make the new head point back to the reader page.
3647 */
3648 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
3649 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
3650
3651 /* Finally update the reader page to the new head */
3652 cpu_buffer->reader_page = reader;
3653 rb_reset_reader_page(cpu_buffer);
3654
3655 if (overwrite != cpu_buffer->last_overrun) {
3656 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
3657 cpu_buffer->last_overrun = overwrite;
3658 }
3659
3660 goto again;
3661
3662 out:
3663 arch_spin_unlock(&cpu_buffer->lock);
3664 local_irq_restore(flags);
3665
3666 return reader;
3667 }
3668
rb_advance_reader(struct ring_buffer_per_cpu * cpu_buffer)3669 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
3670 {
3671 struct ring_buffer_event *event;
3672 struct buffer_page *reader;
3673 unsigned length;
3674
3675 reader = rb_get_reader_page(cpu_buffer);
3676
3677 /* This function should not be called when buffer is empty */
3678 if (RB_WARN_ON(cpu_buffer, !reader))
3679 return;
3680
3681 event = rb_reader_event(cpu_buffer);
3682
3683 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
3684 cpu_buffer->read++;
3685
3686 rb_update_read_stamp(cpu_buffer, event);
3687
3688 length = rb_event_length(event);
3689 cpu_buffer->reader_page->read += length;
3690 }
3691
rb_advance_iter(struct ring_buffer_iter * iter)3692 static void rb_advance_iter(struct ring_buffer_iter *iter)
3693 {
3694 struct ring_buffer_per_cpu *cpu_buffer;
3695 struct ring_buffer_event *event;
3696 unsigned length;
3697
3698 cpu_buffer = iter->cpu_buffer;
3699
3700 /*
3701 * Check if we are at the end of the buffer.
3702 */
3703 if (iter->head >= rb_page_size(iter->head_page)) {
3704 /* discarded commits can make the page empty */
3705 if (iter->head_page == cpu_buffer->commit_page)
3706 return;
3707 rb_inc_iter(iter);
3708 return;
3709 }
3710
3711 event = rb_iter_head_event(iter);
3712
3713 length = rb_event_length(event);
3714
3715 /*
3716 * This should not be called to advance the header if we are
3717 * at the tail of the buffer.
3718 */
3719 if (RB_WARN_ON(cpu_buffer,
3720 (iter->head_page == cpu_buffer->commit_page) &&
3721 (iter->head + length > rb_commit_index(cpu_buffer))))
3722 return;
3723
3724 rb_update_iter_read_stamp(iter, event);
3725
3726 iter->head += length;
3727
3728 /* check for end of page padding */
3729 if ((iter->head >= rb_page_size(iter->head_page)) &&
3730 (iter->head_page != cpu_buffer->commit_page))
3731 rb_inc_iter(iter);
3732 }
3733
rb_lost_events(struct ring_buffer_per_cpu * cpu_buffer)3734 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
3735 {
3736 return cpu_buffer->lost_events;
3737 }
3738
3739 static struct ring_buffer_event *
rb_buffer_peek(struct ring_buffer_per_cpu * cpu_buffer,u64 * ts,unsigned long * lost_events)3740 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
3741 unsigned long *lost_events)
3742 {
3743 struct ring_buffer_event *event;
3744 struct buffer_page *reader;
3745 int nr_loops = 0;
3746
3747 again:
3748 /*
3749 * We repeat when a time extend is encountered.
3750 * Since the time extend is always attached to a data event,
3751 * we should never loop more than once.
3752 * (We never hit the following condition more than twice).
3753 */
3754 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
3755 return NULL;
3756
3757 reader = rb_get_reader_page(cpu_buffer);
3758 if (!reader)
3759 return NULL;
3760
3761 event = rb_reader_event(cpu_buffer);
3762
3763 switch (event->type_len) {
3764 case RINGBUF_TYPE_PADDING:
3765 if (rb_null_event(event))
3766 RB_WARN_ON(cpu_buffer, 1);
3767 /*
3768 * Because the writer could be discarding every
3769 * event it creates (which would probably be bad)
3770 * if we were to go back to "again" then we may never
3771 * catch up, and will trigger the warn on, or lock
3772 * the box. Return the padding, and we will release
3773 * the current locks, and try again.
3774 */
3775 return event;
3776
3777 case RINGBUF_TYPE_TIME_EXTEND:
3778 /* Internal data, OK to advance */
3779 rb_advance_reader(cpu_buffer);
3780 goto again;
3781
3782 case RINGBUF_TYPE_TIME_STAMP:
3783 /* FIXME: not implemented */
3784 rb_advance_reader(cpu_buffer);
3785 goto again;
3786
3787 case RINGBUF_TYPE_DATA:
3788 if (ts) {
3789 *ts = cpu_buffer->read_stamp + event->time_delta;
3790 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
3791 cpu_buffer->cpu, ts);
3792 }
3793 if (lost_events)
3794 *lost_events = rb_lost_events(cpu_buffer);
3795 return event;
3796
3797 default:
3798 BUG();
3799 }
3800
3801 return NULL;
3802 }
3803 EXPORT_SYMBOL_GPL(ring_buffer_peek);
3804
3805 static struct ring_buffer_event *
rb_iter_peek(struct ring_buffer_iter * iter,u64 * ts)3806 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3807 {
3808 struct ring_buffer *buffer;
3809 struct ring_buffer_per_cpu *cpu_buffer;
3810 struct ring_buffer_event *event;
3811 int nr_loops = 0;
3812
3813 cpu_buffer = iter->cpu_buffer;
3814 buffer = cpu_buffer->buffer;
3815
3816 /*
3817 * Check if someone performed a consuming read to
3818 * the buffer. A consuming read invalidates the iterator
3819 * and we need to reset the iterator in this case.
3820 */
3821 if (unlikely(iter->cache_read != cpu_buffer->read ||
3822 iter->cache_reader_page != cpu_buffer->reader_page))
3823 rb_iter_reset(iter);
3824
3825 again:
3826 if (ring_buffer_iter_empty(iter))
3827 return NULL;
3828
3829 /*
3830 * We repeat when a time extend is encountered or we hit
3831 * the end of the page. Since the time extend is always attached
3832 * to a data event, we should never loop more than three times.
3833 * Once for going to next page, once on time extend, and
3834 * finally once to get the event.
3835 * (We never hit the following condition more than thrice).
3836 */
3837 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3))
3838 return NULL;
3839
3840 if (rb_per_cpu_empty(cpu_buffer))
3841 return NULL;
3842
3843 if (iter->head >= rb_page_size(iter->head_page)) {
3844 rb_inc_iter(iter);
3845 goto again;
3846 }
3847
3848 event = rb_iter_head_event(iter);
3849
3850 switch (event->type_len) {
3851 case RINGBUF_TYPE_PADDING:
3852 if (rb_null_event(event)) {
3853 rb_inc_iter(iter);
3854 goto again;
3855 }
3856 rb_advance_iter(iter);
3857 return event;
3858
3859 case RINGBUF_TYPE_TIME_EXTEND:
3860 /* Internal data, OK to advance */
3861 rb_advance_iter(iter);
3862 goto again;
3863
3864 case RINGBUF_TYPE_TIME_STAMP:
3865 /* FIXME: not implemented */
3866 rb_advance_iter(iter);
3867 goto again;
3868
3869 case RINGBUF_TYPE_DATA:
3870 if (ts) {
3871 *ts = iter->read_stamp + event->time_delta;
3872 ring_buffer_normalize_time_stamp(buffer,
3873 cpu_buffer->cpu, ts);
3874 }
3875 return event;
3876
3877 default:
3878 BUG();
3879 }
3880
3881 return NULL;
3882 }
3883 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
3884
rb_ok_to_lock(void)3885 static inline int rb_ok_to_lock(void)
3886 {
3887 /*
3888 * If an NMI die dumps out the content of the ring buffer
3889 * do not grab locks. We also permanently disable the ring
3890 * buffer too. A one time deal is all you get from reading
3891 * the ring buffer from an NMI.
3892 */
3893 if (likely(!in_nmi()))
3894 return 1;
3895
3896 tracing_off_permanent();
3897 return 0;
3898 }
3899
3900 /**
3901 * ring_buffer_peek - peek at the next event to be read
3902 * @buffer: The ring buffer to read
3903 * @cpu: The cpu to peak at
3904 * @ts: The timestamp counter of this event.
3905 * @lost_events: a variable to store if events were lost (may be NULL)
3906 *
3907 * This will return the event that will be read next, but does
3908 * not consume the data.
3909 */
3910 struct ring_buffer_event *
ring_buffer_peek(struct ring_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)3911 ring_buffer_peek(struct ring_buffer *buffer, int cpu, u64 *ts,
3912 unsigned long *lost_events)
3913 {
3914 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
3915 struct ring_buffer_event *event;
3916 unsigned long flags;
3917 int dolock;
3918
3919 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3920 return NULL;
3921
3922 dolock = rb_ok_to_lock();
3923 again:
3924 local_irq_save(flags);
3925 if (dolock)
3926 raw_spin_lock(&cpu_buffer->reader_lock);
3927 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
3928 if (event && event->type_len == RINGBUF_TYPE_PADDING)
3929 rb_advance_reader(cpu_buffer);
3930 if (dolock)
3931 raw_spin_unlock(&cpu_buffer->reader_lock);
3932 local_irq_restore(flags);
3933
3934 if (event && event->type_len == RINGBUF_TYPE_PADDING)
3935 goto again;
3936
3937 return event;
3938 }
3939
3940 /**
3941 * ring_buffer_iter_peek - peek at the next event to be read
3942 * @iter: The ring buffer iterator
3943 * @ts: The timestamp counter of this event.
3944 *
3945 * This will return the event that will be read next, but does
3946 * not increment the iterator.
3947 */
3948 struct ring_buffer_event *
ring_buffer_iter_peek(struct ring_buffer_iter * iter,u64 * ts)3949 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
3950 {
3951 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
3952 struct ring_buffer_event *event;
3953 unsigned long flags;
3954
3955 again:
3956 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3957 event = rb_iter_peek(iter, ts);
3958 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3959
3960 if (event && event->type_len == RINGBUF_TYPE_PADDING)
3961 goto again;
3962
3963 return event;
3964 }
3965
3966 /**
3967 * ring_buffer_consume - return an event and consume it
3968 * @buffer: The ring buffer to get the next event from
3969 * @cpu: the cpu to read the buffer from
3970 * @ts: a variable to store the timestamp (may be NULL)
3971 * @lost_events: a variable to store if events were lost (may be NULL)
3972 *
3973 * Returns the next event in the ring buffer, and that event is consumed.
3974 * Meaning, that sequential reads will keep returning a different event,
3975 * and eventually empty the ring buffer if the producer is slower.
3976 */
3977 struct ring_buffer_event *
ring_buffer_consume(struct ring_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)3978 ring_buffer_consume(struct ring_buffer *buffer, int cpu, u64 *ts,
3979 unsigned long *lost_events)
3980 {
3981 struct ring_buffer_per_cpu *cpu_buffer;
3982 struct ring_buffer_event *event = NULL;
3983 unsigned long flags;
3984 int dolock;
3985
3986 dolock = rb_ok_to_lock();
3987
3988 again:
3989 /* might be called in atomic */
3990 preempt_disable();
3991
3992 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3993 goto out;
3994
3995 cpu_buffer = buffer->buffers[cpu];
3996 local_irq_save(flags);
3997 if (dolock)
3998 raw_spin_lock(&cpu_buffer->reader_lock);
3999
4000 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4001 if (event) {
4002 cpu_buffer->lost_events = 0;
4003 rb_advance_reader(cpu_buffer);
4004 }
4005
4006 if (dolock)
4007 raw_spin_unlock(&cpu_buffer->reader_lock);
4008 local_irq_restore(flags);
4009
4010 out:
4011 preempt_enable();
4012
4013 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4014 goto again;
4015
4016 return event;
4017 }
4018 EXPORT_SYMBOL_GPL(ring_buffer_consume);
4019
4020 /**
4021 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4022 * @buffer: The ring buffer to read from
4023 * @cpu: The cpu buffer to iterate over
4024 *
4025 * This performs the initial preparations necessary to iterate
4026 * through the buffer. Memory is allocated, buffer recording
4027 * is disabled, and the iterator pointer is returned to the caller.
4028 *
4029 * Disabling buffer recordng prevents the reading from being
4030 * corrupted. This is not a consuming read, so a producer is not
4031 * expected.
4032 *
4033 * After a sequence of ring_buffer_read_prepare calls, the user is
4034 * expected to make at least one call to ring_buffer_read_prepare_sync.
4035 * Afterwards, ring_buffer_read_start is invoked to get things going
4036 * for real.
4037 *
4038 * This overall must be paired with ring_buffer_read_finish.
4039 */
4040 struct ring_buffer_iter *
ring_buffer_read_prepare(struct ring_buffer * buffer,int cpu)4041 ring_buffer_read_prepare(struct ring_buffer *buffer, int cpu)
4042 {
4043 struct ring_buffer_per_cpu *cpu_buffer;
4044 struct ring_buffer_iter *iter;
4045
4046 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4047 return NULL;
4048
4049 iter = kmalloc(sizeof(*iter), GFP_KERNEL);
4050 if (!iter)
4051 return NULL;
4052
4053 cpu_buffer = buffer->buffers[cpu];
4054
4055 iter->cpu_buffer = cpu_buffer;
4056
4057 atomic_inc(&buffer->resize_disabled);
4058 atomic_inc(&cpu_buffer->record_disabled);
4059
4060 return iter;
4061 }
4062 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4063
4064 /**
4065 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4066 *
4067 * All previously invoked ring_buffer_read_prepare calls to prepare
4068 * iterators will be synchronized. Afterwards, read_buffer_read_start
4069 * calls on those iterators are allowed.
4070 */
4071 void
ring_buffer_read_prepare_sync(void)4072 ring_buffer_read_prepare_sync(void)
4073 {
4074 synchronize_sched();
4075 }
4076 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4077
4078 /**
4079 * ring_buffer_read_start - start a non consuming read of the buffer
4080 * @iter: The iterator returned by ring_buffer_read_prepare
4081 *
4082 * This finalizes the startup of an iteration through the buffer.
4083 * The iterator comes from a call to ring_buffer_read_prepare and
4084 * an intervening ring_buffer_read_prepare_sync must have been
4085 * performed.
4086 *
4087 * Must be paired with ring_buffer_read_finish.
4088 */
4089 void
ring_buffer_read_start(struct ring_buffer_iter * iter)4090 ring_buffer_read_start(struct ring_buffer_iter *iter)
4091 {
4092 struct ring_buffer_per_cpu *cpu_buffer;
4093 unsigned long flags;
4094
4095 if (!iter)
4096 return;
4097
4098 cpu_buffer = iter->cpu_buffer;
4099
4100 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4101 arch_spin_lock(&cpu_buffer->lock);
4102 rb_iter_reset(iter);
4103 arch_spin_unlock(&cpu_buffer->lock);
4104 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4105 }
4106 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4107
4108 /**
4109 * ring_buffer_read_finish - finish reading the iterator of the buffer
4110 * @iter: The iterator retrieved by ring_buffer_start
4111 *
4112 * This re-enables the recording to the buffer, and frees the
4113 * iterator.
4114 */
4115 void
ring_buffer_read_finish(struct ring_buffer_iter * iter)4116 ring_buffer_read_finish(struct ring_buffer_iter *iter)
4117 {
4118 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4119 unsigned long flags;
4120
4121 /*
4122 * Ring buffer is disabled from recording, here's a good place
4123 * to check the integrity of the ring buffer.
4124 * Must prevent readers from trying to read, as the check
4125 * clears the HEAD page and readers require it.
4126 */
4127 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4128 rb_check_pages(cpu_buffer);
4129 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4130
4131 atomic_dec(&cpu_buffer->record_disabled);
4132 atomic_dec(&cpu_buffer->buffer->resize_disabled);
4133 kfree(iter);
4134 }
4135 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4136
4137 /**
4138 * ring_buffer_read - read the next item in the ring buffer by the iterator
4139 * @iter: The ring buffer iterator
4140 * @ts: The time stamp of the event read.
4141 *
4142 * This reads the next event in the ring buffer and increments the iterator.
4143 */
4144 struct ring_buffer_event *
ring_buffer_read(struct ring_buffer_iter * iter,u64 * ts)4145 ring_buffer_read(struct ring_buffer_iter *iter, u64 *ts)
4146 {
4147 struct ring_buffer_event *event;
4148 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4149 unsigned long flags;
4150
4151 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4152 again:
4153 event = rb_iter_peek(iter, ts);
4154 if (!event)
4155 goto out;
4156
4157 if (event->type_len == RINGBUF_TYPE_PADDING)
4158 goto again;
4159
4160 rb_advance_iter(iter);
4161 out:
4162 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4163
4164 return event;
4165 }
4166 EXPORT_SYMBOL_GPL(ring_buffer_read);
4167
4168 /**
4169 * ring_buffer_size - return the size of the ring buffer (in bytes)
4170 * @buffer: The ring buffer.
4171 */
ring_buffer_size(struct ring_buffer * buffer,int cpu)4172 unsigned long ring_buffer_size(struct ring_buffer *buffer, int cpu)
4173 {
4174 /*
4175 * Earlier, this method returned
4176 * BUF_PAGE_SIZE * buffer->nr_pages
4177 * Since the nr_pages field is now removed, we have converted this to
4178 * return the per cpu buffer value.
4179 */
4180 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4181 return 0;
4182
4183 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4184 }
4185 EXPORT_SYMBOL_GPL(ring_buffer_size);
4186
4187 static void
rb_reset_cpu(struct ring_buffer_per_cpu * cpu_buffer)4188 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4189 {
4190 rb_head_page_deactivate(cpu_buffer);
4191
4192 cpu_buffer->head_page
4193 = list_entry(cpu_buffer->pages, struct buffer_page, list);
4194 local_set(&cpu_buffer->head_page->write, 0);
4195 local_set(&cpu_buffer->head_page->entries, 0);
4196 local_set(&cpu_buffer->head_page->page->commit, 0);
4197
4198 cpu_buffer->head_page->read = 0;
4199
4200 cpu_buffer->tail_page = cpu_buffer->head_page;
4201 cpu_buffer->commit_page = cpu_buffer->head_page;
4202
4203 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
4204 INIT_LIST_HEAD(&cpu_buffer->new_pages);
4205 local_set(&cpu_buffer->reader_page->write, 0);
4206 local_set(&cpu_buffer->reader_page->entries, 0);
4207 local_set(&cpu_buffer->reader_page->page->commit, 0);
4208 cpu_buffer->reader_page->read = 0;
4209
4210 local_set(&cpu_buffer->entries_bytes, 0);
4211 local_set(&cpu_buffer->overrun, 0);
4212 local_set(&cpu_buffer->commit_overrun, 0);
4213 local_set(&cpu_buffer->dropped_events, 0);
4214 local_set(&cpu_buffer->entries, 0);
4215 local_set(&cpu_buffer->committing, 0);
4216 local_set(&cpu_buffer->commits, 0);
4217 cpu_buffer->read = 0;
4218 cpu_buffer->read_bytes = 0;
4219
4220 cpu_buffer->write_stamp = 0;
4221 cpu_buffer->read_stamp = 0;
4222
4223 cpu_buffer->lost_events = 0;
4224 cpu_buffer->last_overrun = 0;
4225
4226 rb_head_page_activate(cpu_buffer);
4227 }
4228
4229 /**
4230 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
4231 * @buffer: The ring buffer to reset a per cpu buffer of
4232 * @cpu: The CPU buffer to be reset
4233 */
ring_buffer_reset_cpu(struct ring_buffer * buffer,int cpu)4234 void ring_buffer_reset_cpu(struct ring_buffer *buffer, int cpu)
4235 {
4236 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4237 unsigned long flags;
4238
4239 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4240 return;
4241
4242 atomic_inc(&buffer->resize_disabled);
4243 atomic_inc(&cpu_buffer->record_disabled);
4244
4245 /* Make sure all commits have finished */
4246 synchronize_sched();
4247
4248 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4249
4250 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
4251 goto out;
4252
4253 arch_spin_lock(&cpu_buffer->lock);
4254
4255 rb_reset_cpu(cpu_buffer);
4256
4257 arch_spin_unlock(&cpu_buffer->lock);
4258
4259 out:
4260 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4261
4262 atomic_dec(&cpu_buffer->record_disabled);
4263 atomic_dec(&buffer->resize_disabled);
4264 }
4265 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
4266
4267 /**
4268 * ring_buffer_reset - reset a ring buffer
4269 * @buffer: The ring buffer to reset all cpu buffers
4270 */
ring_buffer_reset(struct ring_buffer * buffer)4271 void ring_buffer_reset(struct ring_buffer *buffer)
4272 {
4273 int cpu;
4274
4275 for_each_buffer_cpu(buffer, cpu)
4276 ring_buffer_reset_cpu(buffer, cpu);
4277 }
4278 EXPORT_SYMBOL_GPL(ring_buffer_reset);
4279
4280 /**
4281 * rind_buffer_empty - is the ring buffer empty?
4282 * @buffer: The ring buffer to test
4283 */
ring_buffer_empty(struct ring_buffer * buffer)4284 int ring_buffer_empty(struct ring_buffer *buffer)
4285 {
4286 struct ring_buffer_per_cpu *cpu_buffer;
4287 unsigned long flags;
4288 int dolock;
4289 int cpu;
4290 int ret;
4291
4292 dolock = rb_ok_to_lock();
4293
4294 /* yes this is racy, but if you don't like the race, lock the buffer */
4295 for_each_buffer_cpu(buffer, cpu) {
4296 cpu_buffer = buffer->buffers[cpu];
4297 local_irq_save(flags);
4298 if (dolock)
4299 raw_spin_lock(&cpu_buffer->reader_lock);
4300 ret = rb_per_cpu_empty(cpu_buffer);
4301 if (dolock)
4302 raw_spin_unlock(&cpu_buffer->reader_lock);
4303 local_irq_restore(flags);
4304
4305 if (!ret)
4306 return 0;
4307 }
4308
4309 return 1;
4310 }
4311 EXPORT_SYMBOL_GPL(ring_buffer_empty);
4312
4313 /**
4314 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
4315 * @buffer: The ring buffer
4316 * @cpu: The CPU buffer to test
4317 */
ring_buffer_empty_cpu(struct ring_buffer * buffer,int cpu)4318 int ring_buffer_empty_cpu(struct ring_buffer *buffer, int cpu)
4319 {
4320 struct ring_buffer_per_cpu *cpu_buffer;
4321 unsigned long flags;
4322 int dolock;
4323 int ret;
4324
4325 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4326 return 1;
4327
4328 dolock = rb_ok_to_lock();
4329
4330 cpu_buffer = buffer->buffers[cpu];
4331 local_irq_save(flags);
4332 if (dolock)
4333 raw_spin_lock(&cpu_buffer->reader_lock);
4334 ret = rb_per_cpu_empty(cpu_buffer);
4335 if (dolock)
4336 raw_spin_unlock(&cpu_buffer->reader_lock);
4337 local_irq_restore(flags);
4338
4339 return ret;
4340 }
4341 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
4342
4343 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
4344 /**
4345 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
4346 * @buffer_a: One buffer to swap with
4347 * @buffer_b: The other buffer to swap with
4348 *
4349 * This function is useful for tracers that want to take a "snapshot"
4350 * of a CPU buffer and has another back up buffer lying around.
4351 * it is expected that the tracer handles the cpu buffer not being
4352 * used at the moment.
4353 */
ring_buffer_swap_cpu(struct ring_buffer * buffer_a,struct ring_buffer * buffer_b,int cpu)4354 int ring_buffer_swap_cpu(struct ring_buffer *buffer_a,
4355 struct ring_buffer *buffer_b, int cpu)
4356 {
4357 struct ring_buffer_per_cpu *cpu_buffer_a;
4358 struct ring_buffer_per_cpu *cpu_buffer_b;
4359 int ret = -EINVAL;
4360
4361 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
4362 !cpumask_test_cpu(cpu, buffer_b->cpumask))
4363 goto out;
4364
4365 cpu_buffer_a = buffer_a->buffers[cpu];
4366 cpu_buffer_b = buffer_b->buffers[cpu];
4367
4368 /* At least make sure the two buffers are somewhat the same */
4369 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
4370 goto out;
4371
4372 ret = -EAGAIN;
4373
4374 if (ring_buffer_flags != RB_BUFFERS_ON)
4375 goto out;
4376
4377 if (atomic_read(&buffer_a->record_disabled))
4378 goto out;
4379
4380 if (atomic_read(&buffer_b->record_disabled))
4381 goto out;
4382
4383 if (atomic_read(&cpu_buffer_a->record_disabled))
4384 goto out;
4385
4386 if (atomic_read(&cpu_buffer_b->record_disabled))
4387 goto out;
4388
4389 /*
4390 * We can't do a synchronize_sched here because this
4391 * function can be called in atomic context.
4392 * Normally this will be called from the same CPU as cpu.
4393 * If not it's up to the caller to protect this.
4394 */
4395 atomic_inc(&cpu_buffer_a->record_disabled);
4396 atomic_inc(&cpu_buffer_b->record_disabled);
4397
4398 ret = -EBUSY;
4399 if (local_read(&cpu_buffer_a->committing))
4400 goto out_dec;
4401 if (local_read(&cpu_buffer_b->committing))
4402 goto out_dec;
4403
4404 buffer_a->buffers[cpu] = cpu_buffer_b;
4405 buffer_b->buffers[cpu] = cpu_buffer_a;
4406
4407 cpu_buffer_b->buffer = buffer_a;
4408 cpu_buffer_a->buffer = buffer_b;
4409
4410 ret = 0;
4411
4412 out_dec:
4413 atomic_dec(&cpu_buffer_a->record_disabled);
4414 atomic_dec(&cpu_buffer_b->record_disabled);
4415 out:
4416 return ret;
4417 }
4418 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
4419 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
4420
4421 /**
4422 * ring_buffer_alloc_read_page - allocate a page to read from buffer
4423 * @buffer: the buffer to allocate for.
4424 * @cpu: the cpu buffer to allocate.
4425 *
4426 * This function is used in conjunction with ring_buffer_read_page.
4427 * When reading a full page from the ring buffer, these functions
4428 * can be used to speed up the process. The calling function should
4429 * allocate a few pages first with this function. Then when it
4430 * needs to get pages from the ring buffer, it passes the result
4431 * of this function into ring_buffer_read_page, which will swap
4432 * the page that was allocated, with the read page of the buffer.
4433 *
4434 * Returns:
4435 * The page allocated, or NULL on error.
4436 */
ring_buffer_alloc_read_page(struct ring_buffer * buffer,int cpu)4437 void *ring_buffer_alloc_read_page(struct ring_buffer *buffer, int cpu)
4438 {
4439 struct buffer_data_page *bpage;
4440 struct page *page;
4441
4442 page = alloc_pages_node(cpu_to_node(cpu),
4443 GFP_KERNEL | __GFP_NORETRY, 0);
4444 if (!page)
4445 return NULL;
4446
4447 bpage = page_address(page);
4448
4449 rb_init_page(bpage);
4450
4451 return bpage;
4452 }
4453 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
4454
4455 /**
4456 * ring_buffer_free_read_page - free an allocated read page
4457 * @buffer: the buffer the page was allocate for
4458 * @data: the page to free
4459 *
4460 * Free a page allocated from ring_buffer_alloc_read_page.
4461 */
ring_buffer_free_read_page(struct ring_buffer * buffer,void * data)4462 void ring_buffer_free_read_page(struct ring_buffer *buffer, void *data)
4463 {
4464 free_page((unsigned long)data);
4465 }
4466 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
4467
4468 /**
4469 * ring_buffer_read_page - extract a page from the ring buffer
4470 * @buffer: buffer to extract from
4471 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
4472 * @len: amount to extract
4473 * @cpu: the cpu of the buffer to extract
4474 * @full: should the extraction only happen when the page is full.
4475 *
4476 * This function will pull out a page from the ring buffer and consume it.
4477 * @data_page must be the address of the variable that was returned
4478 * from ring_buffer_alloc_read_page. This is because the page might be used
4479 * to swap with a page in the ring buffer.
4480 *
4481 * for example:
4482 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
4483 * if (!rpage)
4484 * return error;
4485 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
4486 * if (ret >= 0)
4487 * process_page(rpage, ret);
4488 *
4489 * When @full is set, the function will not return true unless
4490 * the writer is off the reader page.
4491 *
4492 * Note: it is up to the calling functions to handle sleeps and wakeups.
4493 * The ring buffer can be used anywhere in the kernel and can not
4494 * blindly call wake_up. The layer that uses the ring buffer must be
4495 * responsible for that.
4496 *
4497 * Returns:
4498 * >=0 if data has been transferred, returns the offset of consumed data.
4499 * <0 if no data has been transferred.
4500 */
ring_buffer_read_page(struct ring_buffer * buffer,void ** data_page,size_t len,int cpu,int full)4501 int ring_buffer_read_page(struct ring_buffer *buffer,
4502 void **data_page, size_t len, int cpu, int full)
4503 {
4504 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4505 struct ring_buffer_event *event;
4506 struct buffer_data_page *bpage;
4507 struct buffer_page *reader;
4508 unsigned long missed_events;
4509 unsigned long flags;
4510 unsigned int commit;
4511 unsigned int read;
4512 u64 save_timestamp;
4513 int ret = -1;
4514
4515 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4516 goto out;
4517
4518 /*
4519 * If len is not big enough to hold the page header, then
4520 * we can not copy anything.
4521 */
4522 if (len <= BUF_PAGE_HDR_SIZE)
4523 goto out;
4524
4525 len -= BUF_PAGE_HDR_SIZE;
4526
4527 if (!data_page)
4528 goto out;
4529
4530 bpage = *data_page;
4531 if (!bpage)
4532 goto out;
4533
4534 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4535
4536 reader = rb_get_reader_page(cpu_buffer);
4537 if (!reader)
4538 goto out_unlock;
4539
4540 event = rb_reader_event(cpu_buffer);
4541
4542 read = reader->read;
4543 commit = rb_page_commit(reader);
4544
4545 /* Check if any events were dropped */
4546 missed_events = cpu_buffer->lost_events;
4547
4548 /*
4549 * If this page has been partially read or
4550 * if len is not big enough to read the rest of the page or
4551 * a writer is still on the page, then
4552 * we must copy the data from the page to the buffer.
4553 * Otherwise, we can simply swap the page with the one passed in.
4554 */
4555 if (read || (len < (commit - read)) ||
4556 cpu_buffer->reader_page == cpu_buffer->commit_page) {
4557 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
4558 unsigned int rpos = read;
4559 unsigned int pos = 0;
4560 unsigned int size;
4561
4562 if (full)
4563 goto out_unlock;
4564
4565 if (len > (commit - read))
4566 len = (commit - read);
4567
4568 /* Always keep the time extend and data together */
4569 size = rb_event_ts_length(event);
4570
4571 if (len < size)
4572 goto out_unlock;
4573
4574 /* save the current timestamp, since the user will need it */
4575 save_timestamp = cpu_buffer->read_stamp;
4576
4577 /* Need to copy one event at a time */
4578 do {
4579 /* We need the size of one event, because
4580 * rb_advance_reader only advances by one event,
4581 * whereas rb_event_ts_length may include the size of
4582 * one or two events.
4583 * We have already ensured there's enough space if this
4584 * is a time extend. */
4585 size = rb_event_length(event);
4586 memcpy(bpage->data + pos, rpage->data + rpos, size);
4587
4588 len -= size;
4589
4590 rb_advance_reader(cpu_buffer);
4591 rpos = reader->read;
4592 pos += size;
4593
4594 if (rpos >= commit)
4595 break;
4596
4597 event = rb_reader_event(cpu_buffer);
4598 /* Always keep the time extend and data together */
4599 size = rb_event_ts_length(event);
4600 } while (len >= size);
4601
4602 /* update bpage */
4603 local_set(&bpage->commit, pos);
4604 bpage->time_stamp = save_timestamp;
4605
4606 /* we copied everything to the beginning */
4607 read = 0;
4608 } else {
4609 /* update the entry counter */
4610 cpu_buffer->read += rb_page_entries(reader);
4611 cpu_buffer->read_bytes += BUF_PAGE_SIZE;
4612
4613 /* swap the pages */
4614 rb_init_page(bpage);
4615 bpage = reader->page;
4616 reader->page = *data_page;
4617 local_set(&reader->write, 0);
4618 local_set(&reader->entries, 0);
4619 reader->read = 0;
4620 *data_page = bpage;
4621
4622 /*
4623 * Use the real_end for the data size,
4624 * This gives us a chance to store the lost events
4625 * on the page.
4626 */
4627 if (reader->real_end)
4628 local_set(&bpage->commit, reader->real_end);
4629 }
4630 ret = read;
4631
4632 cpu_buffer->lost_events = 0;
4633
4634 commit = local_read(&bpage->commit);
4635 /*
4636 * Set a flag in the commit field if we lost events
4637 */
4638 if (missed_events) {
4639 /* If there is room at the end of the page to save the
4640 * missed events, then record it there.
4641 */
4642 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
4643 memcpy(&bpage->data[commit], &missed_events,
4644 sizeof(missed_events));
4645 local_add(RB_MISSED_STORED, &bpage->commit);
4646 commit += sizeof(missed_events);
4647 }
4648 local_add(RB_MISSED_EVENTS, &bpage->commit);
4649 }
4650
4651 /*
4652 * This page may be off to user land. Zero it out here.
4653 */
4654 if (commit < BUF_PAGE_SIZE)
4655 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
4656
4657 out_unlock:
4658 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4659
4660 out:
4661 return ret;
4662 }
4663 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
4664
4665 #ifdef CONFIG_HOTPLUG_CPU
rb_cpu_notify(struct notifier_block * self,unsigned long action,void * hcpu)4666 static int rb_cpu_notify(struct notifier_block *self,
4667 unsigned long action, void *hcpu)
4668 {
4669 struct ring_buffer *buffer =
4670 container_of(self, struct ring_buffer, cpu_notify);
4671 long cpu = (long)hcpu;
4672 long nr_pages_same;
4673 int cpu_i;
4674 unsigned long nr_pages;
4675
4676 switch (action) {
4677 case CPU_UP_PREPARE:
4678 case CPU_UP_PREPARE_FROZEN:
4679 if (cpumask_test_cpu(cpu, buffer->cpumask))
4680 return NOTIFY_OK;
4681
4682 nr_pages = 0;
4683 nr_pages_same = 1;
4684 /* check if all cpu sizes are same */
4685 for_each_buffer_cpu(buffer, cpu_i) {
4686 /* fill in the size from first enabled cpu */
4687 if (nr_pages == 0)
4688 nr_pages = buffer->buffers[cpu_i]->nr_pages;
4689 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
4690 nr_pages_same = 0;
4691 break;
4692 }
4693 }
4694 /* allocate minimum pages, user can later expand it */
4695 if (!nr_pages_same)
4696 nr_pages = 2;
4697 buffer->buffers[cpu] =
4698 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
4699 if (!buffer->buffers[cpu]) {
4700 WARN(1, "failed to allocate ring buffer on CPU %ld\n",
4701 cpu);
4702 return NOTIFY_OK;
4703 }
4704 smp_wmb();
4705 cpumask_set_cpu(cpu, buffer->cpumask);
4706 break;
4707 case CPU_DOWN_PREPARE:
4708 case CPU_DOWN_PREPARE_FROZEN:
4709 /*
4710 * Do nothing.
4711 * If we were to free the buffer, then the user would
4712 * lose any trace that was in the buffer.
4713 */
4714 break;
4715 default:
4716 break;
4717 }
4718 return NOTIFY_OK;
4719 }
4720 #endif
4721
4722 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
4723 /*
4724 * This is a basic integrity check of the ring buffer.
4725 * Late in the boot cycle this test will run when configured in.
4726 * It will kick off a thread per CPU that will go into a loop
4727 * writing to the per cpu ring buffer various sizes of data.
4728 * Some of the data will be large items, some small.
4729 *
4730 * Another thread is created that goes into a spin, sending out
4731 * IPIs to the other CPUs to also write into the ring buffer.
4732 * this is to test the nesting ability of the buffer.
4733 *
4734 * Basic stats are recorded and reported. If something in the
4735 * ring buffer should happen that's not expected, a big warning
4736 * is displayed and all ring buffers are disabled.
4737 */
4738 static struct task_struct *rb_threads[NR_CPUS] __initdata;
4739
4740 struct rb_test_data {
4741 struct ring_buffer *buffer;
4742 unsigned long events;
4743 unsigned long bytes_written;
4744 unsigned long bytes_alloc;
4745 unsigned long bytes_dropped;
4746 unsigned long events_nested;
4747 unsigned long bytes_written_nested;
4748 unsigned long bytes_alloc_nested;
4749 unsigned long bytes_dropped_nested;
4750 int min_size_nested;
4751 int max_size_nested;
4752 int max_size;
4753 int min_size;
4754 int cpu;
4755 int cnt;
4756 };
4757
4758 static struct rb_test_data rb_data[NR_CPUS] __initdata;
4759
4760 /* 1 meg per cpu */
4761 #define RB_TEST_BUFFER_SIZE 1048576
4762
4763 static char rb_string[] __initdata =
4764 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
4765 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
4766 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
4767
4768 static bool rb_test_started __initdata;
4769
4770 struct rb_item {
4771 int size;
4772 char str[];
4773 };
4774
rb_write_something(struct rb_test_data * data,bool nested)4775 static __init int rb_write_something(struct rb_test_data *data, bool nested)
4776 {
4777 struct ring_buffer_event *event;
4778 struct rb_item *item;
4779 bool started;
4780 int event_len;
4781 int size;
4782 int len;
4783 int cnt;
4784
4785 /* Have nested writes different that what is written */
4786 cnt = data->cnt + (nested ? 27 : 0);
4787
4788 /* Multiply cnt by ~e, to make some unique increment */
4789 size = (data->cnt * 68 / 25) % (sizeof(rb_string) - 1);
4790
4791 len = size + sizeof(struct rb_item);
4792
4793 started = rb_test_started;
4794 /* read rb_test_started before checking buffer enabled */
4795 smp_rmb();
4796
4797 event = ring_buffer_lock_reserve(data->buffer, len);
4798 if (!event) {
4799 /* Ignore dropped events before test starts. */
4800 if (started) {
4801 if (nested)
4802 data->bytes_dropped += len;
4803 else
4804 data->bytes_dropped_nested += len;
4805 }
4806 return len;
4807 }
4808
4809 event_len = ring_buffer_event_length(event);
4810
4811 if (RB_WARN_ON(data->buffer, event_len < len))
4812 goto out;
4813
4814 item = ring_buffer_event_data(event);
4815 item->size = size;
4816 memcpy(item->str, rb_string, size);
4817
4818 if (nested) {
4819 data->bytes_alloc_nested += event_len;
4820 data->bytes_written_nested += len;
4821 data->events_nested++;
4822 if (!data->min_size_nested || len < data->min_size_nested)
4823 data->min_size_nested = len;
4824 if (len > data->max_size_nested)
4825 data->max_size_nested = len;
4826 } else {
4827 data->bytes_alloc += event_len;
4828 data->bytes_written += len;
4829 data->events++;
4830 if (!data->min_size || len < data->min_size)
4831 data->max_size = len;
4832 if (len > data->max_size)
4833 data->max_size = len;
4834 }
4835
4836 out:
4837 ring_buffer_unlock_commit(data->buffer, event);
4838
4839 return 0;
4840 }
4841
rb_test(void * arg)4842 static __init int rb_test(void *arg)
4843 {
4844 struct rb_test_data *data = arg;
4845
4846 while (!kthread_should_stop()) {
4847 rb_write_something(data, false);
4848 data->cnt++;
4849
4850 set_current_state(TASK_INTERRUPTIBLE);
4851 /* Now sleep between a min of 100-300us and a max of 1ms */
4852 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
4853 }
4854
4855 return 0;
4856 }
4857
rb_ipi(void * ignore)4858 static __init void rb_ipi(void *ignore)
4859 {
4860 struct rb_test_data *data;
4861 int cpu = smp_processor_id();
4862
4863 data = &rb_data[cpu];
4864 rb_write_something(data, true);
4865 }
4866
rb_hammer_test(void * arg)4867 static __init int rb_hammer_test(void *arg)
4868 {
4869 while (!kthread_should_stop()) {
4870
4871 /* Send an IPI to all cpus to write data! */
4872 smp_call_function(rb_ipi, NULL, 1);
4873 /* No sleep, but for non preempt, let others run */
4874 schedule();
4875 }
4876
4877 return 0;
4878 }
4879
test_ringbuffer(void)4880 static __init int test_ringbuffer(void)
4881 {
4882 struct task_struct *rb_hammer;
4883 struct ring_buffer *buffer;
4884 int cpu;
4885 int ret = 0;
4886
4887 pr_info("Running ring buffer tests...\n");
4888
4889 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
4890 if (WARN_ON(!buffer))
4891 return 0;
4892
4893 /* Disable buffer so that threads can't write to it yet */
4894 ring_buffer_record_off(buffer);
4895
4896 for_each_online_cpu(cpu) {
4897 rb_data[cpu].buffer = buffer;
4898 rb_data[cpu].cpu = cpu;
4899 rb_data[cpu].cnt = cpu;
4900 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
4901 "rbtester/%d", cpu);
4902 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
4903 pr_cont("FAILED\n");
4904 ret = PTR_ERR(rb_threads[cpu]);
4905 goto out_free;
4906 }
4907
4908 kthread_bind(rb_threads[cpu], cpu);
4909 wake_up_process(rb_threads[cpu]);
4910 }
4911
4912 /* Now create the rb hammer! */
4913 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
4914 if (WARN_ON(IS_ERR(rb_hammer))) {
4915 pr_cont("FAILED\n");
4916 ret = PTR_ERR(rb_hammer);
4917 goto out_free;
4918 }
4919
4920 ring_buffer_record_on(buffer);
4921 /*
4922 * Show buffer is enabled before setting rb_test_started.
4923 * Yes there's a small race window where events could be
4924 * dropped and the thread wont catch it. But when a ring
4925 * buffer gets enabled, there will always be some kind of
4926 * delay before other CPUs see it. Thus, we don't care about
4927 * those dropped events. We care about events dropped after
4928 * the threads see that the buffer is active.
4929 */
4930 smp_wmb();
4931 rb_test_started = true;
4932
4933 set_current_state(TASK_INTERRUPTIBLE);
4934 /* Just run for 10 seconds */;
4935 schedule_timeout(10 * HZ);
4936
4937 kthread_stop(rb_hammer);
4938
4939 out_free:
4940 for_each_online_cpu(cpu) {
4941 if (!rb_threads[cpu])
4942 break;
4943 kthread_stop(rb_threads[cpu]);
4944 }
4945 if (ret) {
4946 ring_buffer_free(buffer);
4947 return ret;
4948 }
4949
4950 /* Report! */
4951 pr_info("finished\n");
4952 for_each_online_cpu(cpu) {
4953 struct ring_buffer_event *event;
4954 struct rb_test_data *data = &rb_data[cpu];
4955 struct rb_item *item;
4956 unsigned long total_events;
4957 unsigned long total_dropped;
4958 unsigned long total_written;
4959 unsigned long total_alloc;
4960 unsigned long total_read = 0;
4961 unsigned long total_size = 0;
4962 unsigned long total_len = 0;
4963 unsigned long total_lost = 0;
4964 unsigned long lost;
4965 int big_event_size;
4966 int small_event_size;
4967
4968 ret = -1;
4969
4970 total_events = data->events + data->events_nested;
4971 total_written = data->bytes_written + data->bytes_written_nested;
4972 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
4973 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
4974
4975 big_event_size = data->max_size + data->max_size_nested;
4976 small_event_size = data->min_size + data->min_size_nested;
4977
4978 pr_info("CPU %d:\n", cpu);
4979 pr_info(" events: %ld\n", total_events);
4980 pr_info(" dropped bytes: %ld\n", total_dropped);
4981 pr_info(" alloced bytes: %ld\n", total_alloc);
4982 pr_info(" written bytes: %ld\n", total_written);
4983 pr_info(" biggest event: %d\n", big_event_size);
4984 pr_info(" smallest event: %d\n", small_event_size);
4985
4986 if (RB_WARN_ON(buffer, total_dropped))
4987 break;
4988
4989 ret = 0;
4990
4991 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
4992 total_lost += lost;
4993 item = ring_buffer_event_data(event);
4994 total_len += ring_buffer_event_length(event);
4995 total_size += item->size + sizeof(struct rb_item);
4996 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
4997 pr_info("FAILED!\n");
4998 pr_info("buffer had: %.*s\n", item->size, item->str);
4999 pr_info("expected: %.*s\n", item->size, rb_string);
5000 RB_WARN_ON(buffer, 1);
5001 ret = -1;
5002 break;
5003 }
5004 total_read++;
5005 }
5006 if (ret)
5007 break;
5008
5009 ret = -1;
5010
5011 pr_info(" read events: %ld\n", total_read);
5012 pr_info(" lost events: %ld\n", total_lost);
5013 pr_info(" total events: %ld\n", total_lost + total_read);
5014 pr_info(" recorded len bytes: %ld\n", total_len);
5015 pr_info(" recorded size bytes: %ld\n", total_size);
5016 if (total_lost)
5017 pr_info(" With dropped events, record len and size may not match\n"
5018 " alloced and written from above\n");
5019 if (!total_lost) {
5020 if (RB_WARN_ON(buffer, total_len != total_alloc ||
5021 total_size != total_written))
5022 break;
5023 }
5024 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5025 break;
5026
5027 ret = 0;
5028 }
5029 if (!ret)
5030 pr_info("Ring buffer PASSED!\n");
5031
5032 ring_buffer_free(buffer);
5033 return 0;
5034 }
5035
5036 late_initcall(test_ringbuffer);
5037 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
5038