1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
7 #include <linux/trace_recursion.h>
8 #include <linux/trace_events.h>
9 #include <linux/ring_buffer.h>
10 #include <linux/trace_clock.h>
11 #include <linux/sched/clock.h>
12 #include <linux/trace_seq.h>
13 #include <linux/spinlock.h>
14 #include <linux/irq_work.h>
15 #include <linux/security.h>
16 #include <linux/uaccess.h>
17 #include <linux/hardirq.h>
18 #include <linux/kthread.h> /* for self test */
19 #include <linux/module.h>
20 #include <linux/percpu.h>
21 #include <linux/mutex.h>
22 #include <linux/delay.h>
23 #include <linux/slab.h>
24 #include <linux/init.h>
25 #include <linux/hash.h>
26 #include <linux/list.h>
27 #include <linux/cpu.h>
28 #include <linux/oom.h>
29
30 #include <asm/local.h>
31
32 static void update_pages_handler(struct work_struct *work);
33
34 /*
35 * The ring buffer header is special. We must manually up keep it.
36 */
ring_buffer_print_entry_header(struct trace_seq * s)37 int ring_buffer_print_entry_header(struct trace_seq *s)
38 {
39 trace_seq_puts(s, "# compressed entry header\n");
40 trace_seq_puts(s, "\ttype_len : 5 bits\n");
41 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
42 trace_seq_puts(s, "\tarray : 32 bits\n");
43 trace_seq_putc(s, '\n');
44 trace_seq_printf(s, "\tpadding : type == %d\n",
45 RINGBUF_TYPE_PADDING);
46 trace_seq_printf(s, "\ttime_extend : type == %d\n",
47 RINGBUF_TYPE_TIME_EXTEND);
48 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
49 RINGBUF_TYPE_TIME_STAMP);
50 trace_seq_printf(s, "\tdata max type_len == %d\n",
51 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
52
53 return !trace_seq_has_overflowed(s);
54 }
55
56 /*
57 * The ring buffer is made up of a list of pages. A separate list of pages is
58 * allocated for each CPU. A writer may only write to a buffer that is
59 * associated with the CPU it is currently executing on. A reader may read
60 * from any per cpu buffer.
61 *
62 * The reader is special. For each per cpu buffer, the reader has its own
63 * reader page. When a reader has read the entire reader page, this reader
64 * page is swapped with another page in the ring buffer.
65 *
66 * Now, as long as the writer is off the reader page, the reader can do what
67 * ever it wants with that page. The writer will never write to that page
68 * again (as long as it is out of the ring buffer).
69 *
70 * Here's some silly ASCII art.
71 *
72 * +------+
73 * |reader| RING BUFFER
74 * |page |
75 * +------+ +---+ +---+ +---+
76 * | |-->| |-->| |
77 * +---+ +---+ +---+
78 * ^ |
79 * | |
80 * +---------------+
81 *
82 *
83 * +------+
84 * |reader| RING BUFFER
85 * |page |------------------v
86 * +------+ +---+ +---+ +---+
87 * | |-->| |-->| |
88 * +---+ +---+ +---+
89 * ^ |
90 * | |
91 * +---------------+
92 *
93 *
94 * +------+
95 * |reader| RING BUFFER
96 * |page |------------------v
97 * +------+ +---+ +---+ +---+
98 * ^ | |-->| |-->| |
99 * | +---+ +---+ +---+
100 * | |
101 * | |
102 * +------------------------------+
103 *
104 *
105 * +------+
106 * |buffer| RING BUFFER
107 * |page |------------------v
108 * +------+ +---+ +---+ +---+
109 * ^ | | | |-->| |
110 * | New +---+ +---+ +---+
111 * | Reader------^ |
112 * | page |
113 * +------------------------------+
114 *
115 *
116 * After we make this swap, the reader can hand this page off to the splice
117 * code and be done with it. It can even allocate a new page if it needs to
118 * and swap that into the ring buffer.
119 *
120 * We will be using cmpxchg soon to make all this lockless.
121 *
122 */
123
124 /* Used for individual buffers (after the counter) */
125 #define RB_BUFFER_OFF (1 << 20)
126
127 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
128
129 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
130 #define RB_ALIGNMENT 4U
131 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
132 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
133
134 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
135 # define RB_FORCE_8BYTE_ALIGNMENT 0
136 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT
137 #else
138 # define RB_FORCE_8BYTE_ALIGNMENT 1
139 # define RB_ARCH_ALIGNMENT 8U
140 #endif
141
142 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
143
144 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
145 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
146
147 enum {
148 RB_LEN_TIME_EXTEND = 8,
149 RB_LEN_TIME_STAMP = 8,
150 };
151
152 #define skip_time_extend(event) \
153 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
154
155 #define extended_time(event) \
156 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
157
rb_null_event(struct ring_buffer_event * event)158 static inline int rb_null_event(struct ring_buffer_event *event)
159 {
160 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
161 }
162
rb_event_set_padding(struct ring_buffer_event * event)163 static void rb_event_set_padding(struct ring_buffer_event *event)
164 {
165 /* padding has a NULL time_delta */
166 event->type_len = RINGBUF_TYPE_PADDING;
167 event->time_delta = 0;
168 }
169
170 static unsigned
rb_event_data_length(struct ring_buffer_event * event)171 rb_event_data_length(struct ring_buffer_event *event)
172 {
173 unsigned length;
174
175 if (event->type_len)
176 length = event->type_len * RB_ALIGNMENT;
177 else
178 length = event->array[0];
179 return length + RB_EVNT_HDR_SIZE;
180 }
181
182 /*
183 * Return the length of the given event. Will return
184 * the length of the time extend if the event is a
185 * time extend.
186 */
187 static inline unsigned
rb_event_length(struct ring_buffer_event * event)188 rb_event_length(struct ring_buffer_event *event)
189 {
190 switch (event->type_len) {
191 case RINGBUF_TYPE_PADDING:
192 if (rb_null_event(event))
193 /* undefined */
194 return -1;
195 return event->array[0] + RB_EVNT_HDR_SIZE;
196
197 case RINGBUF_TYPE_TIME_EXTEND:
198 return RB_LEN_TIME_EXTEND;
199
200 case RINGBUF_TYPE_TIME_STAMP:
201 return RB_LEN_TIME_STAMP;
202
203 case RINGBUF_TYPE_DATA:
204 return rb_event_data_length(event);
205 default:
206 WARN_ON_ONCE(1);
207 }
208 /* not hit */
209 return 0;
210 }
211
212 /*
213 * Return total length of time extend and data,
214 * or just the event length for all other events.
215 */
216 static inline unsigned
rb_event_ts_length(struct ring_buffer_event * event)217 rb_event_ts_length(struct ring_buffer_event *event)
218 {
219 unsigned len = 0;
220
221 if (extended_time(event)) {
222 /* time extends include the data event after it */
223 len = RB_LEN_TIME_EXTEND;
224 event = skip_time_extend(event);
225 }
226 return len + rb_event_length(event);
227 }
228
229 /**
230 * ring_buffer_event_length - return the length of the event
231 * @event: the event to get the length of
232 *
233 * Returns the size of the data load of a data event.
234 * If the event is something other than a data event, it
235 * returns the size of the event itself. With the exception
236 * of a TIME EXTEND, where it still returns the size of the
237 * data load of the data event after it.
238 */
ring_buffer_event_length(struct ring_buffer_event * event)239 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
240 {
241 unsigned length;
242
243 if (extended_time(event))
244 event = skip_time_extend(event);
245
246 length = rb_event_length(event);
247 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
248 return length;
249 length -= RB_EVNT_HDR_SIZE;
250 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
251 length -= sizeof(event->array[0]);
252 return length;
253 }
254 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
255
256 /* inline for ring buffer fast paths */
257 static __always_inline void *
rb_event_data(struct ring_buffer_event * event)258 rb_event_data(struct ring_buffer_event *event)
259 {
260 if (extended_time(event))
261 event = skip_time_extend(event);
262 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
263 /* If length is in len field, then array[0] has the data */
264 if (event->type_len)
265 return (void *)&event->array[0];
266 /* Otherwise length is in array[0] and array[1] has the data */
267 return (void *)&event->array[1];
268 }
269
270 /**
271 * ring_buffer_event_data - return the data of the event
272 * @event: the event to get the data from
273 */
ring_buffer_event_data(struct ring_buffer_event * event)274 void *ring_buffer_event_data(struct ring_buffer_event *event)
275 {
276 return rb_event_data(event);
277 }
278 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
279
280 #define for_each_buffer_cpu(buffer, cpu) \
281 for_each_cpu(cpu, buffer->cpumask)
282
283 #define for_each_online_buffer_cpu(buffer, cpu) \
284 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
285
286 #define TS_SHIFT 27
287 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
288 #define TS_DELTA_TEST (~TS_MASK)
289
rb_event_time_stamp(struct ring_buffer_event * event)290 static u64 rb_event_time_stamp(struct ring_buffer_event *event)
291 {
292 u64 ts;
293
294 ts = event->array[0];
295 ts <<= TS_SHIFT;
296 ts += event->time_delta;
297
298 return ts;
299 }
300
301 /* Flag when events were overwritten */
302 #define RB_MISSED_EVENTS (1 << 31)
303 /* Missed count stored at end */
304 #define RB_MISSED_STORED (1 << 30)
305
306 struct buffer_data_page {
307 u64 time_stamp; /* page time stamp */
308 local_t commit; /* write committed index */
309 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
310 };
311
312 /*
313 * Note, the buffer_page list must be first. The buffer pages
314 * are allocated in cache lines, which means that each buffer
315 * page will be at the beginning of a cache line, and thus
316 * the least significant bits will be zero. We use this to
317 * add flags in the list struct pointers, to make the ring buffer
318 * lockless.
319 */
320 struct buffer_page {
321 struct list_head list; /* list of buffer pages */
322 local_t write; /* index for next write */
323 unsigned read; /* index for next read */
324 local_t entries; /* entries on this page */
325 unsigned long real_end; /* real end of data */
326 struct buffer_data_page *page; /* Actual data page */
327 };
328
329 /*
330 * The buffer page counters, write and entries, must be reset
331 * atomically when crossing page boundaries. To synchronize this
332 * update, two counters are inserted into the number. One is
333 * the actual counter for the write position or count on the page.
334 *
335 * The other is a counter of updaters. Before an update happens
336 * the update partition of the counter is incremented. This will
337 * allow the updater to update the counter atomically.
338 *
339 * The counter is 20 bits, and the state data is 12.
340 */
341 #define RB_WRITE_MASK 0xfffff
342 #define RB_WRITE_INTCNT (1 << 20)
343
rb_init_page(struct buffer_data_page * bpage)344 static void rb_init_page(struct buffer_data_page *bpage)
345 {
346 local_set(&bpage->commit, 0);
347 }
348
rb_page_commit(struct buffer_page * bpage)349 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
350 {
351 return local_read(&bpage->page->commit);
352 }
353
free_buffer_page(struct buffer_page * bpage)354 static void free_buffer_page(struct buffer_page *bpage)
355 {
356 free_page((unsigned long)bpage->page);
357 kfree(bpage);
358 }
359
360 /*
361 * We need to fit the time_stamp delta into 27 bits.
362 */
test_time_stamp(u64 delta)363 static inline int test_time_stamp(u64 delta)
364 {
365 if (delta & TS_DELTA_TEST)
366 return 1;
367 return 0;
368 }
369
370 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
371
372 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
373 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
374
ring_buffer_print_page_header(struct trace_seq * s)375 int ring_buffer_print_page_header(struct trace_seq *s)
376 {
377 struct buffer_data_page field;
378
379 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
380 "offset:0;\tsize:%u;\tsigned:%u;\n",
381 (unsigned int)sizeof(field.time_stamp),
382 (unsigned int)is_signed_type(u64));
383
384 trace_seq_printf(s, "\tfield: local_t commit;\t"
385 "offset:%u;\tsize:%u;\tsigned:%u;\n",
386 (unsigned int)offsetof(typeof(field), commit),
387 (unsigned int)sizeof(field.commit),
388 (unsigned int)is_signed_type(long));
389
390 trace_seq_printf(s, "\tfield: int overwrite;\t"
391 "offset:%u;\tsize:%u;\tsigned:%u;\n",
392 (unsigned int)offsetof(typeof(field), commit),
393 1,
394 (unsigned int)is_signed_type(long));
395
396 trace_seq_printf(s, "\tfield: char data;\t"
397 "offset:%u;\tsize:%u;\tsigned:%u;\n",
398 (unsigned int)offsetof(typeof(field), data),
399 (unsigned int)BUF_PAGE_SIZE,
400 (unsigned int)is_signed_type(char));
401
402 return !trace_seq_has_overflowed(s);
403 }
404
405 struct rb_irq_work {
406 struct irq_work work;
407 wait_queue_head_t waiters;
408 wait_queue_head_t full_waiters;
409 long wait_index;
410 bool waiters_pending;
411 bool full_waiters_pending;
412 bool wakeup_full;
413 };
414
415 /*
416 * Structure to hold event state and handle nested events.
417 */
418 struct rb_event_info {
419 u64 ts;
420 u64 delta;
421 u64 before;
422 u64 after;
423 unsigned long length;
424 struct buffer_page *tail_page;
425 int add_timestamp;
426 };
427
428 /*
429 * Used for the add_timestamp
430 * NONE
431 * EXTEND - wants a time extend
432 * ABSOLUTE - the buffer requests all events to have absolute time stamps
433 * FORCE - force a full time stamp.
434 */
435 enum {
436 RB_ADD_STAMP_NONE = 0,
437 RB_ADD_STAMP_EXTEND = BIT(1),
438 RB_ADD_STAMP_ABSOLUTE = BIT(2),
439 RB_ADD_STAMP_FORCE = BIT(3)
440 };
441 /*
442 * Used for which event context the event is in.
443 * TRANSITION = 0
444 * NMI = 1
445 * IRQ = 2
446 * SOFTIRQ = 3
447 * NORMAL = 4
448 *
449 * See trace_recursive_lock() comment below for more details.
450 */
451 enum {
452 RB_CTX_TRANSITION,
453 RB_CTX_NMI,
454 RB_CTX_IRQ,
455 RB_CTX_SOFTIRQ,
456 RB_CTX_NORMAL,
457 RB_CTX_MAX
458 };
459
460 #if BITS_PER_LONG == 32
461 #define RB_TIME_32
462 #endif
463
464 /* To test on 64 bit machines */
465 //#define RB_TIME_32
466
467 #ifdef RB_TIME_32
468
469 struct rb_time_struct {
470 local_t cnt;
471 local_t top;
472 local_t bottom;
473 };
474 #else
475 #include <asm/local64.h>
476 struct rb_time_struct {
477 local64_t time;
478 };
479 #endif
480 typedef struct rb_time_struct rb_time_t;
481
482 #define MAX_NEST 5
483
484 /*
485 * head_page == tail_page && head == tail then buffer is empty.
486 */
487 struct ring_buffer_per_cpu {
488 int cpu;
489 atomic_t record_disabled;
490 atomic_t resize_disabled;
491 struct trace_buffer *buffer;
492 raw_spinlock_t reader_lock; /* serialize readers */
493 arch_spinlock_t lock;
494 struct lock_class_key lock_key;
495 struct buffer_data_page *free_page;
496 unsigned long nr_pages;
497 unsigned int current_context;
498 struct list_head *pages;
499 struct buffer_page *head_page; /* read from head */
500 struct buffer_page *tail_page; /* write to tail */
501 struct buffer_page *commit_page; /* committed pages */
502 struct buffer_page *reader_page;
503 unsigned long lost_events;
504 unsigned long last_overrun;
505 unsigned long nest;
506 local_t entries_bytes;
507 local_t entries;
508 local_t overrun;
509 local_t commit_overrun;
510 local_t dropped_events;
511 local_t committing;
512 local_t commits;
513 local_t pages_touched;
514 local_t pages_lost;
515 local_t pages_read;
516 long last_pages_touch;
517 size_t shortest_full;
518 unsigned long read;
519 unsigned long read_bytes;
520 rb_time_t write_stamp;
521 rb_time_t before_stamp;
522 u64 event_stamp[MAX_NEST];
523 u64 read_stamp;
524 /* pages removed since last reset */
525 unsigned long pages_removed;
526 /* ring buffer pages to update, > 0 to add, < 0 to remove */
527 long nr_pages_to_update;
528 struct list_head new_pages; /* new pages to add */
529 struct work_struct update_pages_work;
530 struct completion update_done;
531
532 struct rb_irq_work irq_work;
533 };
534
535 struct trace_buffer {
536 unsigned flags;
537 int cpus;
538 atomic_t record_disabled;
539 atomic_t resizing;
540 cpumask_var_t cpumask;
541
542 struct lock_class_key *reader_lock_key;
543
544 struct mutex mutex;
545
546 struct ring_buffer_per_cpu **buffers;
547
548 struct hlist_node node;
549 u64 (*clock)(void);
550
551 struct rb_irq_work irq_work;
552 bool time_stamp_abs;
553 };
554
555 struct ring_buffer_iter {
556 struct ring_buffer_per_cpu *cpu_buffer;
557 unsigned long head;
558 unsigned long next_event;
559 struct buffer_page *head_page;
560 struct buffer_page *cache_reader_page;
561 unsigned long cache_read;
562 unsigned long cache_pages_removed;
563 u64 read_stamp;
564 u64 page_stamp;
565 struct ring_buffer_event *event;
566 int missed_events;
567 };
568
569 #ifdef RB_TIME_32
570
571 /*
572 * On 32 bit machines, local64_t is very expensive. As the ring
573 * buffer doesn't need all the features of a true 64 bit atomic,
574 * on 32 bit, it uses these functions (64 still uses local64_t).
575 *
576 * For the ring buffer, 64 bit required operations for the time is
577 * the following:
578 *
579 * - Only need 59 bits (uses 60 to make it even).
580 * - Reads may fail if it interrupted a modification of the time stamp.
581 * It will succeed if it did not interrupt another write even if
582 * the read itself is interrupted by a write.
583 * It returns whether it was successful or not.
584 *
585 * - Writes always succeed and will overwrite other writes and writes
586 * that were done by events interrupting the current write.
587 *
588 * - A write followed by a read of the same time stamp will always succeed,
589 * but may not contain the same value.
590 *
591 * - A cmpxchg will fail if it interrupted another write or cmpxchg.
592 * Other than that, it acts like a normal cmpxchg.
593 *
594 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
595 * (bottom being the least significant 30 bits of the 60 bit time stamp).
596 *
597 * The two most significant bits of each half holds a 2 bit counter (0-3).
598 * Each update will increment this counter by one.
599 * When reading the top and bottom, if the two counter bits match then the
600 * top and bottom together make a valid 60 bit number.
601 */
602 #define RB_TIME_SHIFT 30
603 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
604
rb_time_cnt(unsigned long val)605 static inline int rb_time_cnt(unsigned long val)
606 {
607 return (val >> RB_TIME_SHIFT) & 3;
608 }
609
rb_time_val(unsigned long top,unsigned long bottom)610 static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
611 {
612 u64 val;
613
614 val = top & RB_TIME_VAL_MASK;
615 val <<= RB_TIME_SHIFT;
616 val |= bottom & RB_TIME_VAL_MASK;
617
618 return val;
619 }
620
__rb_time_read(rb_time_t * t,u64 * ret,unsigned long * cnt)621 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
622 {
623 unsigned long top, bottom;
624 unsigned long c;
625
626 /*
627 * If the read is interrupted by a write, then the cnt will
628 * be different. Loop until both top and bottom have been read
629 * without interruption.
630 */
631 do {
632 c = local_read(&t->cnt);
633 top = local_read(&t->top);
634 bottom = local_read(&t->bottom);
635 } while (c != local_read(&t->cnt));
636
637 *cnt = rb_time_cnt(top);
638
639 /* If top and bottom counts don't match, this interrupted a write */
640 if (*cnt != rb_time_cnt(bottom))
641 return false;
642
643 *ret = rb_time_val(top, bottom);
644 return true;
645 }
646
rb_time_read(rb_time_t * t,u64 * ret)647 static bool rb_time_read(rb_time_t *t, u64 *ret)
648 {
649 unsigned long cnt;
650
651 return __rb_time_read(t, ret, &cnt);
652 }
653
rb_time_val_cnt(unsigned long val,unsigned long cnt)654 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
655 {
656 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
657 }
658
rb_time_split(u64 val,unsigned long * top,unsigned long * bottom)659 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
660 {
661 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
662 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
663 }
664
rb_time_val_set(local_t * t,unsigned long val,unsigned long cnt)665 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
666 {
667 val = rb_time_val_cnt(val, cnt);
668 local_set(t, val);
669 }
670
rb_time_set(rb_time_t * t,u64 val)671 static void rb_time_set(rb_time_t *t, u64 val)
672 {
673 unsigned long cnt, top, bottom;
674
675 rb_time_split(val, &top, &bottom);
676
677 /* Writes always succeed with a valid number even if it gets interrupted. */
678 do {
679 cnt = local_inc_return(&t->cnt);
680 rb_time_val_set(&t->top, top, cnt);
681 rb_time_val_set(&t->bottom, bottom, cnt);
682 } while (cnt != local_read(&t->cnt));
683 }
684
685 static inline bool
rb_time_read_cmpxchg(local_t * l,unsigned long expect,unsigned long set)686 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
687 {
688 unsigned long ret;
689
690 ret = local_cmpxchg(l, expect, set);
691 return ret == expect;
692 }
693
694 #else /* 64 bits */
695
696 /* local64_t always succeeds */
697
rb_time_read(rb_time_t * t,u64 * ret)698 static inline bool rb_time_read(rb_time_t *t, u64 *ret)
699 {
700 *ret = local64_read(&t->time);
701 return true;
702 }
rb_time_set(rb_time_t * t,u64 val)703 static void rb_time_set(rb_time_t *t, u64 val)
704 {
705 local64_set(&t->time, val);
706 }
707 #endif
708
709 /*
710 * Enable this to make sure that the event passed to
711 * ring_buffer_event_time_stamp() is not committed and also
712 * is on the buffer that it passed in.
713 */
714 //#define RB_VERIFY_EVENT
715 #ifdef RB_VERIFY_EVENT
716 static struct list_head *rb_list_head(struct list_head *list);
verify_event(struct ring_buffer_per_cpu * cpu_buffer,void * event)717 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
718 void *event)
719 {
720 struct buffer_page *page = cpu_buffer->commit_page;
721 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
722 struct list_head *next;
723 long commit, write;
724 unsigned long addr = (unsigned long)event;
725 bool done = false;
726 int stop = 0;
727
728 /* Make sure the event exists and is not committed yet */
729 do {
730 if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
731 done = true;
732 commit = local_read(&page->page->commit);
733 write = local_read(&page->write);
734 if (addr >= (unsigned long)&page->page->data[commit] &&
735 addr < (unsigned long)&page->page->data[write])
736 return;
737
738 next = rb_list_head(page->list.next);
739 page = list_entry(next, struct buffer_page, list);
740 } while (!done);
741 WARN_ON_ONCE(1);
742 }
743 #else
verify_event(struct ring_buffer_per_cpu * cpu_buffer,void * event)744 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
745 void *event)
746 {
747 }
748 #endif
749
750
751 static inline u64 rb_time_stamp(struct trace_buffer *buffer);
752
753 /**
754 * ring_buffer_event_time_stamp - return the event's current time stamp
755 * @buffer: The buffer that the event is on
756 * @event: the event to get the time stamp of
757 *
758 * Note, this must be called after @event is reserved, and before it is
759 * committed to the ring buffer. And must be called from the same
760 * context where the event was reserved (normal, softirq, irq, etc).
761 *
762 * Returns the time stamp associated with the current event.
763 * If the event has an extended time stamp, then that is used as
764 * the time stamp to return.
765 * In the highly unlikely case that the event was nested more than
766 * the max nesting, then the write_stamp of the buffer is returned,
767 * otherwise current time is returned, but that really neither of
768 * the last two cases should ever happen.
769 */
ring_buffer_event_time_stamp(struct trace_buffer * buffer,struct ring_buffer_event * event)770 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
771 struct ring_buffer_event *event)
772 {
773 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
774 unsigned int nest;
775 u64 ts;
776
777 /* If the event includes an absolute time, then just use that */
778 if (event->type_len == RINGBUF_TYPE_TIME_STAMP)
779 return rb_event_time_stamp(event);
780
781 nest = local_read(&cpu_buffer->committing);
782 verify_event(cpu_buffer, event);
783 if (WARN_ON_ONCE(!nest))
784 goto fail;
785
786 /* Read the current saved nesting level time stamp */
787 if (likely(--nest < MAX_NEST))
788 return cpu_buffer->event_stamp[nest];
789
790 /* Shouldn't happen, warn if it does */
791 WARN_ONCE(1, "nest (%d) greater than max", nest);
792
793 fail:
794 /* Can only fail on 32 bit */
795 if (!rb_time_read(&cpu_buffer->write_stamp, &ts))
796 /* Screw it, just read the current time */
797 ts = rb_time_stamp(cpu_buffer->buffer);
798
799 return ts;
800 }
801
802 /**
803 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
804 * @buffer: The ring_buffer to get the number of pages from
805 * @cpu: The cpu of the ring_buffer to get the number of pages from
806 *
807 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
808 */
ring_buffer_nr_pages(struct trace_buffer * buffer,int cpu)809 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
810 {
811 return buffer->buffers[cpu]->nr_pages;
812 }
813
814 /**
815 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
816 * @buffer: The ring_buffer to get the number of pages from
817 * @cpu: The cpu of the ring_buffer to get the number of pages from
818 *
819 * Returns the number of pages that have content in the ring buffer.
820 */
ring_buffer_nr_dirty_pages(struct trace_buffer * buffer,int cpu)821 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
822 {
823 size_t read;
824 size_t lost;
825 size_t cnt;
826
827 read = local_read(&buffer->buffers[cpu]->pages_read);
828 lost = local_read(&buffer->buffers[cpu]->pages_lost);
829 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
830
831 if (WARN_ON_ONCE(cnt < lost))
832 return 0;
833
834 cnt -= lost;
835
836 /* The reader can read an empty page, but not more than that */
837 if (cnt < read) {
838 WARN_ON_ONCE(read > cnt + 1);
839 return 0;
840 }
841
842 return cnt - read;
843 }
844
full_hit(struct trace_buffer * buffer,int cpu,int full)845 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
846 {
847 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
848 size_t nr_pages;
849 size_t dirty;
850
851 nr_pages = cpu_buffer->nr_pages;
852 if (!nr_pages || !full)
853 return true;
854
855 /*
856 * Add one as dirty will never equal nr_pages, as the sub-buffer
857 * that the writer is on is not counted as dirty.
858 * This is needed if "buffer_percent" is set to 100.
859 */
860 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
861
862 return (dirty * 100) >= (full * nr_pages);
863 }
864
865 /*
866 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
867 *
868 * Schedules a delayed work to wake up any task that is blocked on the
869 * ring buffer waiters queue.
870 */
rb_wake_up_waiters(struct irq_work * work)871 static void rb_wake_up_waiters(struct irq_work *work)
872 {
873 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
874
875 wake_up_all(&rbwork->waiters);
876 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
877 rbwork->wakeup_full = false;
878 rbwork->full_waiters_pending = false;
879 wake_up_all(&rbwork->full_waiters);
880 }
881 }
882
883 /**
884 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
885 * @buffer: The ring buffer to wake waiters on
886 *
887 * In the case of a file that represents a ring buffer is closing,
888 * it is prudent to wake up any waiters that are on this.
889 */
ring_buffer_wake_waiters(struct trace_buffer * buffer,int cpu)890 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
891 {
892 struct ring_buffer_per_cpu *cpu_buffer;
893 struct rb_irq_work *rbwork;
894
895 if (!buffer)
896 return;
897
898 if (cpu == RING_BUFFER_ALL_CPUS) {
899
900 /* Wake up individual ones too. One level recursion */
901 for_each_buffer_cpu(buffer, cpu)
902 ring_buffer_wake_waiters(buffer, cpu);
903
904 rbwork = &buffer->irq_work;
905 } else {
906 if (WARN_ON_ONCE(!buffer->buffers))
907 return;
908 if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
909 return;
910
911 cpu_buffer = buffer->buffers[cpu];
912 /* The CPU buffer may not have been initialized yet */
913 if (!cpu_buffer)
914 return;
915 rbwork = &cpu_buffer->irq_work;
916 }
917
918 rbwork->wait_index++;
919 /* make sure the waiters see the new index */
920 smp_wmb();
921
922 /* This can be called in any context */
923 irq_work_queue(&rbwork->work);
924 }
925
926 /**
927 * ring_buffer_wait - wait for input to the ring buffer
928 * @buffer: buffer to wait on
929 * @cpu: the cpu buffer to wait on
930 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
931 *
932 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
933 * as data is added to any of the @buffer's cpu buffers. Otherwise
934 * it will wait for data to be added to a specific cpu buffer.
935 */
ring_buffer_wait(struct trace_buffer * buffer,int cpu,int full)936 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
937 {
938 struct ring_buffer_per_cpu *cpu_buffer;
939 DEFINE_WAIT(wait);
940 struct rb_irq_work *work;
941 long wait_index;
942 int ret = 0;
943
944 /*
945 * Depending on what the caller is waiting for, either any
946 * data in any cpu buffer, or a specific buffer, put the
947 * caller on the appropriate wait queue.
948 */
949 if (cpu == RING_BUFFER_ALL_CPUS) {
950 work = &buffer->irq_work;
951 /* Full only makes sense on per cpu reads */
952 full = 0;
953 } else {
954 if (!cpumask_test_cpu(cpu, buffer->cpumask))
955 return -ENODEV;
956 cpu_buffer = buffer->buffers[cpu];
957 work = &cpu_buffer->irq_work;
958 }
959
960 wait_index = READ_ONCE(work->wait_index);
961
962 while (true) {
963 if (full)
964 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
965 else
966 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
967
968 /*
969 * The events can happen in critical sections where
970 * checking a work queue can cause deadlocks.
971 * After adding a task to the queue, this flag is set
972 * only to notify events to try to wake up the queue
973 * using irq_work.
974 *
975 * We don't clear it even if the buffer is no longer
976 * empty. The flag only causes the next event to run
977 * irq_work to do the work queue wake up. The worse
978 * that can happen if we race with !trace_empty() is that
979 * an event will cause an irq_work to try to wake up
980 * an empty queue.
981 *
982 * There's no reason to protect this flag either, as
983 * the work queue and irq_work logic will do the necessary
984 * synchronization for the wake ups. The only thing
985 * that is necessary is that the wake up happens after
986 * a task has been queued. It's OK for spurious wake ups.
987 */
988 if (full)
989 work->full_waiters_pending = true;
990 else
991 work->waiters_pending = true;
992
993 if (signal_pending(current)) {
994 ret = -EINTR;
995 break;
996 }
997
998 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
999 break;
1000
1001 if (cpu != RING_BUFFER_ALL_CPUS &&
1002 !ring_buffer_empty_cpu(buffer, cpu)) {
1003 unsigned long flags;
1004 bool pagebusy;
1005 bool done;
1006
1007 if (!full)
1008 break;
1009
1010 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
1011 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
1012 done = !pagebusy && full_hit(buffer, cpu, full);
1013
1014 if (!cpu_buffer->shortest_full ||
1015 cpu_buffer->shortest_full > full)
1016 cpu_buffer->shortest_full = full;
1017 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
1018 if (done)
1019 break;
1020 }
1021
1022 schedule();
1023
1024 /* Make sure to see the new wait index */
1025 smp_rmb();
1026 if (wait_index != work->wait_index)
1027 break;
1028 }
1029
1030 if (full)
1031 finish_wait(&work->full_waiters, &wait);
1032 else
1033 finish_wait(&work->waiters, &wait);
1034
1035 return ret;
1036 }
1037
1038 /**
1039 * ring_buffer_poll_wait - poll on buffer input
1040 * @buffer: buffer to wait on
1041 * @cpu: the cpu buffer to wait on
1042 * @filp: the file descriptor
1043 * @poll_table: The poll descriptor
1044 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
1045 *
1046 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
1047 * as data is added to any of the @buffer's cpu buffers. Otherwise
1048 * it will wait for data to be added to a specific cpu buffer.
1049 *
1050 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
1051 * zero otherwise.
1052 */
ring_buffer_poll_wait(struct trace_buffer * buffer,int cpu,struct file * filp,poll_table * poll_table,int full)1053 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
1054 struct file *filp, poll_table *poll_table, int full)
1055 {
1056 struct ring_buffer_per_cpu *cpu_buffer;
1057 struct rb_irq_work *work;
1058
1059 if (cpu == RING_BUFFER_ALL_CPUS) {
1060 work = &buffer->irq_work;
1061 full = 0;
1062 } else {
1063 if (!cpumask_test_cpu(cpu, buffer->cpumask))
1064 return EPOLLERR;
1065
1066 cpu_buffer = buffer->buffers[cpu];
1067 work = &cpu_buffer->irq_work;
1068 }
1069
1070 if (full) {
1071 poll_wait(filp, &work->full_waiters, poll_table);
1072 work->full_waiters_pending = true;
1073 if (!cpu_buffer->shortest_full ||
1074 cpu_buffer->shortest_full > full)
1075 cpu_buffer->shortest_full = full;
1076 } else {
1077 poll_wait(filp, &work->waiters, poll_table);
1078 work->waiters_pending = true;
1079 }
1080
1081 /*
1082 * There's a tight race between setting the waiters_pending and
1083 * checking if the ring buffer is empty. Once the waiters_pending bit
1084 * is set, the next event will wake the task up, but we can get stuck
1085 * if there's only a single event in.
1086 *
1087 * FIXME: Ideally, we need a memory barrier on the writer side as well,
1088 * but adding a memory barrier to all events will cause too much of a
1089 * performance hit in the fast path. We only need a memory barrier when
1090 * the buffer goes from empty to having content. But as this race is
1091 * extremely small, and it's not a problem if another event comes in, we
1092 * will fix it later.
1093 */
1094 smp_mb();
1095
1096 if (full)
1097 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
1098
1099 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1100 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
1101 return EPOLLIN | EPOLLRDNORM;
1102 return 0;
1103 }
1104
1105 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
1106 #define RB_WARN_ON(b, cond) \
1107 ({ \
1108 int _____ret = unlikely(cond); \
1109 if (_____ret) { \
1110 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1111 struct ring_buffer_per_cpu *__b = \
1112 (void *)b; \
1113 atomic_inc(&__b->buffer->record_disabled); \
1114 } else \
1115 atomic_inc(&b->record_disabled); \
1116 WARN_ON(1); \
1117 } \
1118 _____ret; \
1119 })
1120
1121 /* Up this if you want to test the TIME_EXTENTS and normalization */
1122 #define DEBUG_SHIFT 0
1123
rb_time_stamp(struct trace_buffer * buffer)1124 static inline u64 rb_time_stamp(struct trace_buffer *buffer)
1125 {
1126 u64 ts;
1127
1128 /* Skip retpolines :-( */
1129 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local))
1130 ts = trace_clock_local();
1131 else
1132 ts = buffer->clock();
1133
1134 /* shift to debug/test normalization and TIME_EXTENTS */
1135 return ts << DEBUG_SHIFT;
1136 }
1137
ring_buffer_time_stamp(struct trace_buffer * buffer)1138 u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
1139 {
1140 u64 time;
1141
1142 preempt_disable_notrace();
1143 time = rb_time_stamp(buffer);
1144 preempt_enable_notrace();
1145
1146 return time;
1147 }
1148 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1149
ring_buffer_normalize_time_stamp(struct trace_buffer * buffer,int cpu,u64 * ts)1150 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
1151 int cpu, u64 *ts)
1152 {
1153 /* Just stupid testing the normalize function and deltas */
1154 *ts >>= DEBUG_SHIFT;
1155 }
1156 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1157
1158 /*
1159 * Making the ring buffer lockless makes things tricky.
1160 * Although writes only happen on the CPU that they are on,
1161 * and they only need to worry about interrupts. Reads can
1162 * happen on any CPU.
1163 *
1164 * The reader page is always off the ring buffer, but when the
1165 * reader finishes with a page, it needs to swap its page with
1166 * a new one from the buffer. The reader needs to take from
1167 * the head (writes go to the tail). But if a writer is in overwrite
1168 * mode and wraps, it must push the head page forward.
1169 *
1170 * Here lies the problem.
1171 *
1172 * The reader must be careful to replace only the head page, and
1173 * not another one. As described at the top of the file in the
1174 * ASCII art, the reader sets its old page to point to the next
1175 * page after head. It then sets the page after head to point to
1176 * the old reader page. But if the writer moves the head page
1177 * during this operation, the reader could end up with the tail.
1178 *
1179 * We use cmpxchg to help prevent this race. We also do something
1180 * special with the page before head. We set the LSB to 1.
1181 *
1182 * When the writer must push the page forward, it will clear the
1183 * bit that points to the head page, move the head, and then set
1184 * the bit that points to the new head page.
1185 *
1186 * We also don't want an interrupt coming in and moving the head
1187 * page on another writer. Thus we use the second LSB to catch
1188 * that too. Thus:
1189 *
1190 * head->list->prev->next bit 1 bit 0
1191 * ------- -------
1192 * Normal page 0 0
1193 * Points to head page 0 1
1194 * New head page 1 0
1195 *
1196 * Note we can not trust the prev pointer of the head page, because:
1197 *
1198 * +----+ +-----+ +-----+
1199 * | |------>| T |---X--->| N |
1200 * | |<------| | | |
1201 * +----+ +-----+ +-----+
1202 * ^ ^ |
1203 * | +-----+ | |
1204 * +----------| R |----------+ |
1205 * | |<-----------+
1206 * +-----+
1207 *
1208 * Key: ---X--> HEAD flag set in pointer
1209 * T Tail page
1210 * R Reader page
1211 * N Next page
1212 *
1213 * (see __rb_reserve_next() to see where this happens)
1214 *
1215 * What the above shows is that the reader just swapped out
1216 * the reader page with a page in the buffer, but before it
1217 * could make the new header point back to the new page added
1218 * it was preempted by a writer. The writer moved forward onto
1219 * the new page added by the reader and is about to move forward
1220 * again.
1221 *
1222 * You can see, it is legitimate for the previous pointer of
1223 * the head (or any page) not to point back to itself. But only
1224 * temporarily.
1225 */
1226
1227 #define RB_PAGE_NORMAL 0UL
1228 #define RB_PAGE_HEAD 1UL
1229 #define RB_PAGE_UPDATE 2UL
1230
1231
1232 #define RB_FLAG_MASK 3UL
1233
1234 /* PAGE_MOVED is not part of the mask */
1235 #define RB_PAGE_MOVED 4UL
1236
1237 /*
1238 * rb_list_head - remove any bit
1239 */
rb_list_head(struct list_head * list)1240 static struct list_head *rb_list_head(struct list_head *list)
1241 {
1242 unsigned long val = (unsigned long)list;
1243
1244 return (struct list_head *)(val & ~RB_FLAG_MASK);
1245 }
1246
1247 /*
1248 * rb_is_head_page - test if the given page is the head page
1249 *
1250 * Because the reader may move the head_page pointer, we can
1251 * not trust what the head page is (it may be pointing to
1252 * the reader page). But if the next page is a header page,
1253 * its flags will be non zero.
1254 */
1255 static inline int
rb_is_head_page(struct buffer_page * page,struct list_head * list)1256 rb_is_head_page(struct buffer_page *page, struct list_head *list)
1257 {
1258 unsigned long val;
1259
1260 val = (unsigned long)list->next;
1261
1262 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1263 return RB_PAGE_MOVED;
1264
1265 return val & RB_FLAG_MASK;
1266 }
1267
1268 /*
1269 * rb_is_reader_page
1270 *
1271 * The unique thing about the reader page, is that, if the
1272 * writer is ever on it, the previous pointer never points
1273 * back to the reader page.
1274 */
rb_is_reader_page(struct buffer_page * page)1275 static bool rb_is_reader_page(struct buffer_page *page)
1276 {
1277 struct list_head *list = page->list.prev;
1278
1279 return rb_list_head(list->next) != &page->list;
1280 }
1281
1282 /*
1283 * rb_set_list_to_head - set a list_head to be pointing to head.
1284 */
rb_set_list_to_head(struct list_head * list)1285 static void rb_set_list_to_head(struct list_head *list)
1286 {
1287 unsigned long *ptr;
1288
1289 ptr = (unsigned long *)&list->next;
1290 *ptr |= RB_PAGE_HEAD;
1291 *ptr &= ~RB_PAGE_UPDATE;
1292 }
1293
1294 /*
1295 * rb_head_page_activate - sets up head page
1296 */
rb_head_page_activate(struct ring_buffer_per_cpu * cpu_buffer)1297 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1298 {
1299 struct buffer_page *head;
1300
1301 head = cpu_buffer->head_page;
1302 if (!head)
1303 return;
1304
1305 /*
1306 * Set the previous list pointer to have the HEAD flag.
1307 */
1308 rb_set_list_to_head(head->list.prev);
1309 }
1310
rb_list_head_clear(struct list_head * list)1311 static void rb_list_head_clear(struct list_head *list)
1312 {
1313 unsigned long *ptr = (unsigned long *)&list->next;
1314
1315 *ptr &= ~RB_FLAG_MASK;
1316 }
1317
1318 /*
1319 * rb_head_page_deactivate - clears head page ptr (for free list)
1320 */
1321 static void
rb_head_page_deactivate(struct ring_buffer_per_cpu * cpu_buffer)1322 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1323 {
1324 struct list_head *hd;
1325
1326 /* Go through the whole list and clear any pointers found. */
1327 rb_list_head_clear(cpu_buffer->pages);
1328
1329 list_for_each(hd, cpu_buffer->pages)
1330 rb_list_head_clear(hd);
1331 }
1332
rb_head_page_set(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag,int new_flag)1333 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1334 struct buffer_page *head,
1335 struct buffer_page *prev,
1336 int old_flag, int new_flag)
1337 {
1338 struct list_head *list;
1339 unsigned long val = (unsigned long)&head->list;
1340 unsigned long ret;
1341
1342 list = &prev->list;
1343
1344 val &= ~RB_FLAG_MASK;
1345
1346 ret = cmpxchg((unsigned long *)&list->next,
1347 val | old_flag, val | new_flag);
1348
1349 /* check if the reader took the page */
1350 if ((ret & ~RB_FLAG_MASK) != val)
1351 return RB_PAGE_MOVED;
1352
1353 return ret & RB_FLAG_MASK;
1354 }
1355
rb_head_page_set_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1356 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1357 struct buffer_page *head,
1358 struct buffer_page *prev,
1359 int old_flag)
1360 {
1361 return rb_head_page_set(cpu_buffer, head, prev,
1362 old_flag, RB_PAGE_UPDATE);
1363 }
1364
rb_head_page_set_head(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1365 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1366 struct buffer_page *head,
1367 struct buffer_page *prev,
1368 int old_flag)
1369 {
1370 return rb_head_page_set(cpu_buffer, head, prev,
1371 old_flag, RB_PAGE_HEAD);
1372 }
1373
rb_head_page_set_normal(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1374 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1375 struct buffer_page *head,
1376 struct buffer_page *prev,
1377 int old_flag)
1378 {
1379 return rb_head_page_set(cpu_buffer, head, prev,
1380 old_flag, RB_PAGE_NORMAL);
1381 }
1382
rb_inc_page(struct buffer_page ** bpage)1383 static inline void rb_inc_page(struct buffer_page **bpage)
1384 {
1385 struct list_head *p = rb_list_head((*bpage)->list.next);
1386
1387 *bpage = list_entry(p, struct buffer_page, list);
1388 }
1389
1390 static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu * cpu_buffer)1391 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1392 {
1393 struct buffer_page *head;
1394 struct buffer_page *page;
1395 struct list_head *list;
1396 int i;
1397
1398 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1399 return NULL;
1400
1401 /* sanity check */
1402 list = cpu_buffer->pages;
1403 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1404 return NULL;
1405
1406 page = head = cpu_buffer->head_page;
1407 /*
1408 * It is possible that the writer moves the header behind
1409 * where we started, and we miss in one loop.
1410 * A second loop should grab the header, but we'll do
1411 * three loops just because I'm paranoid.
1412 */
1413 for (i = 0; i < 3; i++) {
1414 do {
1415 if (rb_is_head_page(page, page->list.prev)) {
1416 cpu_buffer->head_page = page;
1417 return page;
1418 }
1419 rb_inc_page(&page);
1420 } while (page != head);
1421 }
1422
1423 RB_WARN_ON(cpu_buffer, 1);
1424
1425 return NULL;
1426 }
1427
rb_head_page_replace(struct buffer_page * old,struct buffer_page * new)1428 static int rb_head_page_replace(struct buffer_page *old,
1429 struct buffer_page *new)
1430 {
1431 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1432 unsigned long val;
1433 unsigned long ret;
1434
1435 val = *ptr & ~RB_FLAG_MASK;
1436 val |= RB_PAGE_HEAD;
1437
1438 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1439
1440 return ret == val;
1441 }
1442
1443 /*
1444 * rb_tail_page_update - move the tail page forward
1445 */
rb_tail_page_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)1446 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1447 struct buffer_page *tail_page,
1448 struct buffer_page *next_page)
1449 {
1450 unsigned long old_entries;
1451 unsigned long old_write;
1452
1453 /*
1454 * The tail page now needs to be moved forward.
1455 *
1456 * We need to reset the tail page, but without messing
1457 * with possible erasing of data brought in by interrupts
1458 * that have moved the tail page and are currently on it.
1459 *
1460 * We add a counter to the write field to denote this.
1461 */
1462 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1463 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1464
1465 local_inc(&cpu_buffer->pages_touched);
1466 /*
1467 * Just make sure we have seen our old_write and synchronize
1468 * with any interrupts that come in.
1469 */
1470 barrier();
1471
1472 /*
1473 * If the tail page is still the same as what we think
1474 * it is, then it is up to us to update the tail
1475 * pointer.
1476 */
1477 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1478 /* Zero the write counter */
1479 unsigned long val = old_write & ~RB_WRITE_MASK;
1480 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1481
1482 /*
1483 * This will only succeed if an interrupt did
1484 * not come in and change it. In which case, we
1485 * do not want to modify it.
1486 *
1487 * We add (void) to let the compiler know that we do not care
1488 * about the return value of these functions. We use the
1489 * cmpxchg to only update if an interrupt did not already
1490 * do it for us. If the cmpxchg fails, we don't care.
1491 */
1492 (void)local_cmpxchg(&next_page->write, old_write, val);
1493 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1494
1495 /*
1496 * No need to worry about races with clearing out the commit.
1497 * it only can increment when a commit takes place. But that
1498 * only happens in the outer most nested commit.
1499 */
1500 local_set(&next_page->page->commit, 0);
1501
1502 /* Again, either we update tail_page or an interrupt does */
1503 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1504 }
1505 }
1506
rb_check_bpage(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * bpage)1507 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1508 struct buffer_page *bpage)
1509 {
1510 unsigned long val = (unsigned long)bpage;
1511
1512 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1513 return 1;
1514
1515 return 0;
1516 }
1517
1518 /**
1519 * rb_check_pages - integrity check of buffer pages
1520 * @cpu_buffer: CPU buffer with pages to test
1521 *
1522 * As a safety measure we check to make sure the data pages have not
1523 * been corrupted.
1524 */
rb_check_pages(struct ring_buffer_per_cpu * cpu_buffer)1525 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1526 {
1527 struct list_head *head = rb_list_head(cpu_buffer->pages);
1528 struct list_head *tmp;
1529
1530 if (RB_WARN_ON(cpu_buffer,
1531 rb_list_head(rb_list_head(head->next)->prev) != head))
1532 return -1;
1533
1534 if (RB_WARN_ON(cpu_buffer,
1535 rb_list_head(rb_list_head(head->prev)->next) != head))
1536 return -1;
1537
1538 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
1539 if (RB_WARN_ON(cpu_buffer,
1540 rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
1541 return -1;
1542
1543 if (RB_WARN_ON(cpu_buffer,
1544 rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
1545 return -1;
1546 }
1547
1548 return 0;
1549 }
1550
__rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,long nr_pages,struct list_head * pages)1551 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1552 long nr_pages, struct list_head *pages)
1553 {
1554 struct buffer_page *bpage, *tmp;
1555 bool user_thread = current->mm != NULL;
1556 gfp_t mflags;
1557 long i;
1558
1559 /*
1560 * Check if the available memory is there first.
1561 * Note, si_mem_available() only gives us a rough estimate of available
1562 * memory. It may not be accurate. But we don't care, we just want
1563 * to prevent doing any allocation when it is obvious that it is
1564 * not going to succeed.
1565 */
1566 i = si_mem_available();
1567 if (i < nr_pages)
1568 return -ENOMEM;
1569
1570 /*
1571 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1572 * gracefully without invoking oom-killer and the system is not
1573 * destabilized.
1574 */
1575 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1576
1577 /*
1578 * If a user thread allocates too much, and si_mem_available()
1579 * reports there's enough memory, even though there is not.
1580 * Make sure the OOM killer kills this thread. This can happen
1581 * even with RETRY_MAYFAIL because another task may be doing
1582 * an allocation after this task has taken all memory.
1583 * This is the task the OOM killer needs to take out during this
1584 * loop, even if it was triggered by an allocation somewhere else.
1585 */
1586 if (user_thread)
1587 set_current_oom_origin();
1588 for (i = 0; i < nr_pages; i++) {
1589 struct page *page;
1590
1591 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1592 mflags, cpu_to_node(cpu_buffer->cpu));
1593 if (!bpage)
1594 goto free_pages;
1595
1596 rb_check_bpage(cpu_buffer, bpage);
1597
1598 list_add(&bpage->list, pages);
1599
1600 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0);
1601 if (!page)
1602 goto free_pages;
1603 bpage->page = page_address(page);
1604 rb_init_page(bpage->page);
1605
1606 if (user_thread && fatal_signal_pending(current))
1607 goto free_pages;
1608 }
1609 if (user_thread)
1610 clear_current_oom_origin();
1611
1612 return 0;
1613
1614 free_pages:
1615 list_for_each_entry_safe(bpage, tmp, pages, list) {
1616 list_del_init(&bpage->list);
1617 free_buffer_page(bpage);
1618 }
1619 if (user_thread)
1620 clear_current_oom_origin();
1621
1622 return -ENOMEM;
1623 }
1624
rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1625 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1626 unsigned long nr_pages)
1627 {
1628 LIST_HEAD(pages);
1629
1630 WARN_ON(!nr_pages);
1631
1632 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
1633 return -ENOMEM;
1634
1635 /*
1636 * The ring buffer page list is a circular list that does not
1637 * start and end with a list head. All page list items point to
1638 * other pages.
1639 */
1640 cpu_buffer->pages = pages.next;
1641 list_del(&pages);
1642
1643 cpu_buffer->nr_pages = nr_pages;
1644
1645 rb_check_pages(cpu_buffer);
1646
1647 return 0;
1648 }
1649
1650 static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer * buffer,long nr_pages,int cpu)1651 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
1652 {
1653 struct ring_buffer_per_cpu *cpu_buffer;
1654 struct buffer_page *bpage;
1655 struct page *page;
1656 int ret;
1657
1658 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1659 GFP_KERNEL, cpu_to_node(cpu));
1660 if (!cpu_buffer)
1661 return NULL;
1662
1663 cpu_buffer->cpu = cpu;
1664 cpu_buffer->buffer = buffer;
1665 raw_spin_lock_init(&cpu_buffer->reader_lock);
1666 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1667 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1668 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1669 init_completion(&cpu_buffer->update_done);
1670 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1671 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1672 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1673
1674 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1675 GFP_KERNEL, cpu_to_node(cpu));
1676 if (!bpage)
1677 goto fail_free_buffer;
1678
1679 rb_check_bpage(cpu_buffer, bpage);
1680
1681 cpu_buffer->reader_page = bpage;
1682 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1683 if (!page)
1684 goto fail_free_reader;
1685 bpage->page = page_address(page);
1686 rb_init_page(bpage->page);
1687
1688 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1689 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1690
1691 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1692 if (ret < 0)
1693 goto fail_free_reader;
1694
1695 cpu_buffer->head_page
1696 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1697 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1698
1699 rb_head_page_activate(cpu_buffer);
1700
1701 return cpu_buffer;
1702
1703 fail_free_reader:
1704 free_buffer_page(cpu_buffer->reader_page);
1705
1706 fail_free_buffer:
1707 kfree(cpu_buffer);
1708 return NULL;
1709 }
1710
rb_free_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)1711 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1712 {
1713 struct list_head *head = cpu_buffer->pages;
1714 struct buffer_page *bpage, *tmp;
1715
1716 irq_work_sync(&cpu_buffer->irq_work.work);
1717
1718 free_buffer_page(cpu_buffer->reader_page);
1719
1720 if (head) {
1721 rb_head_page_deactivate(cpu_buffer);
1722
1723 list_for_each_entry_safe(bpage, tmp, head, list) {
1724 list_del_init(&bpage->list);
1725 free_buffer_page(bpage);
1726 }
1727 bpage = list_entry(head, struct buffer_page, list);
1728 free_buffer_page(bpage);
1729 }
1730
1731 free_page((unsigned long)cpu_buffer->free_page);
1732
1733 kfree(cpu_buffer);
1734 }
1735
1736 /**
1737 * __ring_buffer_alloc - allocate a new ring_buffer
1738 * @size: the size in bytes per cpu that is needed.
1739 * @flags: attributes to set for the ring buffer.
1740 * @key: ring buffer reader_lock_key.
1741 *
1742 * Currently the only flag that is available is the RB_FL_OVERWRITE
1743 * flag. This flag means that the buffer will overwrite old data
1744 * when the buffer wraps. If this flag is not set, the buffer will
1745 * drop data when the tail hits the head.
1746 */
__ring_buffer_alloc(unsigned long size,unsigned flags,struct lock_class_key * key)1747 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1748 struct lock_class_key *key)
1749 {
1750 struct trace_buffer *buffer;
1751 long nr_pages;
1752 int bsize;
1753 int cpu;
1754 int ret;
1755
1756 /* keep it in its own cache line */
1757 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1758 GFP_KERNEL);
1759 if (!buffer)
1760 return NULL;
1761
1762 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1763 goto fail_free_buffer;
1764
1765 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1766 buffer->flags = flags;
1767 buffer->clock = trace_clock_local;
1768 buffer->reader_lock_key = key;
1769
1770 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1771 init_waitqueue_head(&buffer->irq_work.waiters);
1772
1773 /* need at least two pages */
1774 if (nr_pages < 2)
1775 nr_pages = 2;
1776
1777 buffer->cpus = nr_cpu_ids;
1778
1779 bsize = sizeof(void *) * nr_cpu_ids;
1780 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1781 GFP_KERNEL);
1782 if (!buffer->buffers)
1783 goto fail_free_cpumask;
1784
1785 cpu = raw_smp_processor_id();
1786 cpumask_set_cpu(cpu, buffer->cpumask);
1787 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1788 if (!buffer->buffers[cpu])
1789 goto fail_free_buffers;
1790
1791 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1792 if (ret < 0)
1793 goto fail_free_buffers;
1794
1795 mutex_init(&buffer->mutex);
1796
1797 return buffer;
1798
1799 fail_free_buffers:
1800 for_each_buffer_cpu(buffer, cpu) {
1801 if (buffer->buffers[cpu])
1802 rb_free_cpu_buffer(buffer->buffers[cpu]);
1803 }
1804 kfree(buffer->buffers);
1805
1806 fail_free_cpumask:
1807 free_cpumask_var(buffer->cpumask);
1808
1809 fail_free_buffer:
1810 kfree(buffer);
1811 return NULL;
1812 }
1813 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1814
1815 /**
1816 * ring_buffer_free - free a ring buffer.
1817 * @buffer: the buffer to free.
1818 */
1819 void
ring_buffer_free(struct trace_buffer * buffer)1820 ring_buffer_free(struct trace_buffer *buffer)
1821 {
1822 int cpu;
1823
1824 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1825
1826 irq_work_sync(&buffer->irq_work.work);
1827
1828 for_each_buffer_cpu(buffer, cpu)
1829 rb_free_cpu_buffer(buffer->buffers[cpu]);
1830
1831 kfree(buffer->buffers);
1832 free_cpumask_var(buffer->cpumask);
1833
1834 kfree(buffer);
1835 }
1836 EXPORT_SYMBOL_GPL(ring_buffer_free);
1837
ring_buffer_set_clock(struct trace_buffer * buffer,u64 (* clock)(void))1838 void ring_buffer_set_clock(struct trace_buffer *buffer,
1839 u64 (*clock)(void))
1840 {
1841 buffer->clock = clock;
1842 }
1843
ring_buffer_set_time_stamp_abs(struct trace_buffer * buffer,bool abs)1844 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
1845 {
1846 buffer->time_stamp_abs = abs;
1847 }
1848
ring_buffer_time_stamp_abs(struct trace_buffer * buffer)1849 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
1850 {
1851 return buffer->time_stamp_abs;
1852 }
1853
1854 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1855
rb_page_entries(struct buffer_page * bpage)1856 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1857 {
1858 return local_read(&bpage->entries) & RB_WRITE_MASK;
1859 }
1860
rb_page_write(struct buffer_page * bpage)1861 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1862 {
1863 return local_read(&bpage->write) & RB_WRITE_MASK;
1864 }
1865
1866 static int
rb_remove_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1867 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1868 {
1869 struct list_head *tail_page, *to_remove, *next_page;
1870 struct buffer_page *to_remove_page, *tmp_iter_page;
1871 struct buffer_page *last_page, *first_page;
1872 unsigned long nr_removed;
1873 unsigned long head_bit;
1874 int page_entries;
1875
1876 head_bit = 0;
1877
1878 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1879 atomic_inc(&cpu_buffer->record_disabled);
1880 /*
1881 * We don't race with the readers since we have acquired the reader
1882 * lock. We also don't race with writers after disabling recording.
1883 * This makes it easy to figure out the first and the last page to be
1884 * removed from the list. We unlink all the pages in between including
1885 * the first and last pages. This is done in a busy loop so that we
1886 * lose the least number of traces.
1887 * The pages are freed after we restart recording and unlock readers.
1888 */
1889 tail_page = &cpu_buffer->tail_page->list;
1890
1891 /*
1892 * tail page might be on reader page, we remove the next page
1893 * from the ring buffer
1894 */
1895 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1896 tail_page = rb_list_head(tail_page->next);
1897 to_remove = tail_page;
1898
1899 /* start of pages to remove */
1900 first_page = list_entry(rb_list_head(to_remove->next),
1901 struct buffer_page, list);
1902
1903 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1904 to_remove = rb_list_head(to_remove)->next;
1905 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1906 }
1907 /* Read iterators need to reset themselves when some pages removed */
1908 cpu_buffer->pages_removed += nr_removed;
1909
1910 next_page = rb_list_head(to_remove)->next;
1911
1912 /*
1913 * Now we remove all pages between tail_page and next_page.
1914 * Make sure that we have head_bit value preserved for the
1915 * next page
1916 */
1917 tail_page->next = (struct list_head *)((unsigned long)next_page |
1918 head_bit);
1919 next_page = rb_list_head(next_page);
1920 next_page->prev = tail_page;
1921
1922 /* make sure pages points to a valid page in the ring buffer */
1923 cpu_buffer->pages = next_page;
1924
1925 /* update head page */
1926 if (head_bit)
1927 cpu_buffer->head_page = list_entry(next_page,
1928 struct buffer_page, list);
1929
1930 /* pages are removed, resume tracing and then free the pages */
1931 atomic_dec(&cpu_buffer->record_disabled);
1932 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1933
1934 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1935
1936 /* last buffer page to remove */
1937 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1938 list);
1939 tmp_iter_page = first_page;
1940
1941 do {
1942 cond_resched();
1943
1944 to_remove_page = tmp_iter_page;
1945 rb_inc_page(&tmp_iter_page);
1946
1947 /* update the counters */
1948 page_entries = rb_page_entries(to_remove_page);
1949 if (page_entries) {
1950 /*
1951 * If something was added to this page, it was full
1952 * since it is not the tail page. So we deduct the
1953 * bytes consumed in ring buffer from here.
1954 * Increment overrun to account for the lost events.
1955 */
1956 local_add(page_entries, &cpu_buffer->overrun);
1957 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
1958 local_inc(&cpu_buffer->pages_lost);
1959 }
1960
1961 /*
1962 * We have already removed references to this list item, just
1963 * free up the buffer_page and its page
1964 */
1965 free_buffer_page(to_remove_page);
1966 nr_removed--;
1967
1968 } while (to_remove_page != last_page);
1969
1970 RB_WARN_ON(cpu_buffer, nr_removed);
1971
1972 return nr_removed == 0;
1973 }
1974
1975 static int
rb_insert_pages(struct ring_buffer_per_cpu * cpu_buffer)1976 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1977 {
1978 struct list_head *pages = &cpu_buffer->new_pages;
1979 int retries, success;
1980
1981 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1982 /*
1983 * We are holding the reader lock, so the reader page won't be swapped
1984 * in the ring buffer. Now we are racing with the writer trying to
1985 * move head page and the tail page.
1986 * We are going to adapt the reader page update process where:
1987 * 1. We first splice the start and end of list of new pages between
1988 * the head page and its previous page.
1989 * 2. We cmpxchg the prev_page->next to point from head page to the
1990 * start of new pages list.
1991 * 3. Finally, we update the head->prev to the end of new list.
1992 *
1993 * We will try this process 10 times, to make sure that we don't keep
1994 * spinning.
1995 */
1996 retries = 10;
1997 success = 0;
1998 while (retries--) {
1999 struct list_head *head_page, *prev_page, *r;
2000 struct list_head *last_page, *first_page;
2001 struct list_head *head_page_with_bit;
2002
2003 head_page = &rb_set_head_page(cpu_buffer)->list;
2004 if (!head_page)
2005 break;
2006 prev_page = head_page->prev;
2007
2008 first_page = pages->next;
2009 last_page = pages->prev;
2010
2011 head_page_with_bit = (struct list_head *)
2012 ((unsigned long)head_page | RB_PAGE_HEAD);
2013
2014 last_page->next = head_page_with_bit;
2015 first_page->prev = prev_page;
2016
2017 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
2018
2019 if (r == head_page_with_bit) {
2020 /*
2021 * yay, we replaced the page pointer to our new list,
2022 * now, we just have to update to head page's prev
2023 * pointer to point to end of list
2024 */
2025 head_page->prev = last_page;
2026 success = 1;
2027 break;
2028 }
2029 }
2030
2031 if (success)
2032 INIT_LIST_HEAD(pages);
2033 /*
2034 * If we weren't successful in adding in new pages, warn and stop
2035 * tracing
2036 */
2037 RB_WARN_ON(cpu_buffer, !success);
2038 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
2039
2040 /* free pages if they weren't inserted */
2041 if (!success) {
2042 struct buffer_page *bpage, *tmp;
2043 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2044 list) {
2045 list_del_init(&bpage->list);
2046 free_buffer_page(bpage);
2047 }
2048 }
2049 return success;
2050 }
2051
rb_update_pages(struct ring_buffer_per_cpu * cpu_buffer)2052 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
2053 {
2054 int success;
2055
2056 if (cpu_buffer->nr_pages_to_update > 0)
2057 success = rb_insert_pages(cpu_buffer);
2058 else
2059 success = rb_remove_pages(cpu_buffer,
2060 -cpu_buffer->nr_pages_to_update);
2061
2062 if (success)
2063 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
2064 }
2065
update_pages_handler(struct work_struct * work)2066 static void update_pages_handler(struct work_struct *work)
2067 {
2068 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2069 struct ring_buffer_per_cpu, update_pages_work);
2070 rb_update_pages(cpu_buffer);
2071 complete(&cpu_buffer->update_done);
2072 }
2073
2074 /**
2075 * ring_buffer_resize - resize the ring buffer
2076 * @buffer: the buffer to resize.
2077 * @size: the new size.
2078 * @cpu_id: the cpu buffer to resize
2079 *
2080 * Minimum size is 2 * BUF_PAGE_SIZE.
2081 *
2082 * Returns 0 on success and < 0 on failure.
2083 */
ring_buffer_resize(struct trace_buffer * buffer,unsigned long size,int cpu_id)2084 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
2085 int cpu_id)
2086 {
2087 struct ring_buffer_per_cpu *cpu_buffer;
2088 unsigned long nr_pages;
2089 int cpu, err;
2090
2091 /*
2092 * Always succeed at resizing a non-existent buffer:
2093 */
2094 if (!buffer)
2095 return 0;
2096
2097 /* Make sure the requested buffer exists */
2098 if (cpu_id != RING_BUFFER_ALL_CPUS &&
2099 !cpumask_test_cpu(cpu_id, buffer->cpumask))
2100 return 0;
2101
2102 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
2103
2104 /* we need a minimum of two pages */
2105 if (nr_pages < 2)
2106 nr_pages = 2;
2107
2108 /* prevent another thread from changing buffer sizes */
2109 mutex_lock(&buffer->mutex);
2110 atomic_inc(&buffer->resizing);
2111
2112 if (cpu_id == RING_BUFFER_ALL_CPUS) {
2113 /*
2114 * Don't succeed if resizing is disabled, as a reader might be
2115 * manipulating the ring buffer and is expecting a sane state while
2116 * this is true.
2117 */
2118 for_each_buffer_cpu(buffer, cpu) {
2119 cpu_buffer = buffer->buffers[cpu];
2120 if (atomic_read(&cpu_buffer->resize_disabled)) {
2121 err = -EBUSY;
2122 goto out_err_unlock;
2123 }
2124 }
2125
2126 /* calculate the pages to update */
2127 for_each_buffer_cpu(buffer, cpu) {
2128 cpu_buffer = buffer->buffers[cpu];
2129
2130 cpu_buffer->nr_pages_to_update = nr_pages -
2131 cpu_buffer->nr_pages;
2132 /*
2133 * nothing more to do for removing pages or no update
2134 */
2135 if (cpu_buffer->nr_pages_to_update <= 0)
2136 continue;
2137 /*
2138 * to add pages, make sure all new pages can be
2139 * allocated without receiving ENOMEM
2140 */
2141 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2142 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2143 &cpu_buffer->new_pages)) {
2144 /* not enough memory for new pages */
2145 err = -ENOMEM;
2146 goto out_err;
2147 }
2148
2149 cond_resched();
2150 }
2151
2152 cpus_read_lock();
2153 /*
2154 * Fire off all the required work handlers
2155 * We can't schedule on offline CPUs, but it's not necessary
2156 * since we can change their buffer sizes without any race.
2157 */
2158 for_each_buffer_cpu(buffer, cpu) {
2159 cpu_buffer = buffer->buffers[cpu];
2160 if (!cpu_buffer->nr_pages_to_update)
2161 continue;
2162
2163 /* Can't run something on an offline CPU. */
2164 if (!cpu_online(cpu)) {
2165 rb_update_pages(cpu_buffer);
2166 cpu_buffer->nr_pages_to_update = 0;
2167 } else {
2168 schedule_work_on(cpu,
2169 &cpu_buffer->update_pages_work);
2170 }
2171 }
2172
2173 /* wait for all the updates to complete */
2174 for_each_buffer_cpu(buffer, cpu) {
2175 cpu_buffer = buffer->buffers[cpu];
2176 if (!cpu_buffer->nr_pages_to_update)
2177 continue;
2178
2179 if (cpu_online(cpu))
2180 wait_for_completion(&cpu_buffer->update_done);
2181 cpu_buffer->nr_pages_to_update = 0;
2182 }
2183
2184 cpus_read_unlock();
2185 } else {
2186 cpu_buffer = buffer->buffers[cpu_id];
2187
2188 if (nr_pages == cpu_buffer->nr_pages)
2189 goto out;
2190
2191 /*
2192 * Don't succeed if resizing is disabled, as a reader might be
2193 * manipulating the ring buffer and is expecting a sane state while
2194 * this is true.
2195 */
2196 if (atomic_read(&cpu_buffer->resize_disabled)) {
2197 err = -EBUSY;
2198 goto out_err_unlock;
2199 }
2200
2201 cpu_buffer->nr_pages_to_update = nr_pages -
2202 cpu_buffer->nr_pages;
2203
2204 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2205 if (cpu_buffer->nr_pages_to_update > 0 &&
2206 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2207 &cpu_buffer->new_pages)) {
2208 err = -ENOMEM;
2209 goto out_err;
2210 }
2211
2212 cpus_read_lock();
2213
2214 /* Can't run something on an offline CPU. */
2215 if (!cpu_online(cpu_id))
2216 rb_update_pages(cpu_buffer);
2217 else {
2218 schedule_work_on(cpu_id,
2219 &cpu_buffer->update_pages_work);
2220 wait_for_completion(&cpu_buffer->update_done);
2221 }
2222
2223 cpu_buffer->nr_pages_to_update = 0;
2224 cpus_read_unlock();
2225 }
2226
2227 out:
2228 /*
2229 * The ring buffer resize can happen with the ring buffer
2230 * enabled, so that the update disturbs the tracing as little
2231 * as possible. But if the buffer is disabled, we do not need
2232 * to worry about that, and we can take the time to verify
2233 * that the buffer is not corrupt.
2234 */
2235 if (atomic_read(&buffer->record_disabled)) {
2236 atomic_inc(&buffer->record_disabled);
2237 /*
2238 * Even though the buffer was disabled, we must make sure
2239 * that it is truly disabled before calling rb_check_pages.
2240 * There could have been a race between checking
2241 * record_disable and incrementing it.
2242 */
2243 synchronize_rcu();
2244 for_each_buffer_cpu(buffer, cpu) {
2245 cpu_buffer = buffer->buffers[cpu];
2246 rb_check_pages(cpu_buffer);
2247 }
2248 atomic_dec(&buffer->record_disabled);
2249 }
2250
2251 atomic_dec(&buffer->resizing);
2252 mutex_unlock(&buffer->mutex);
2253 return 0;
2254
2255 out_err:
2256 for_each_buffer_cpu(buffer, cpu) {
2257 struct buffer_page *bpage, *tmp;
2258
2259 cpu_buffer = buffer->buffers[cpu];
2260 cpu_buffer->nr_pages_to_update = 0;
2261
2262 if (list_empty(&cpu_buffer->new_pages))
2263 continue;
2264
2265 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2266 list) {
2267 list_del_init(&bpage->list);
2268 free_buffer_page(bpage);
2269 }
2270 }
2271 out_err_unlock:
2272 atomic_dec(&buffer->resizing);
2273 mutex_unlock(&buffer->mutex);
2274 return err;
2275 }
2276 EXPORT_SYMBOL_GPL(ring_buffer_resize);
2277
ring_buffer_change_overwrite(struct trace_buffer * buffer,int val)2278 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
2279 {
2280 mutex_lock(&buffer->mutex);
2281 if (val)
2282 buffer->flags |= RB_FL_OVERWRITE;
2283 else
2284 buffer->flags &= ~RB_FL_OVERWRITE;
2285 mutex_unlock(&buffer->mutex);
2286 }
2287 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2288
__rb_page_index(struct buffer_page * bpage,unsigned index)2289 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2290 {
2291 return bpage->page->data + index;
2292 }
2293
2294 static __always_inline struct ring_buffer_event *
rb_reader_event(struct ring_buffer_per_cpu * cpu_buffer)2295 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2296 {
2297 return __rb_page_index(cpu_buffer->reader_page,
2298 cpu_buffer->reader_page->read);
2299 }
2300
2301 static struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter * iter)2302 rb_iter_head_event(struct ring_buffer_iter *iter)
2303 {
2304 struct ring_buffer_event *event;
2305 struct buffer_page *iter_head_page = iter->head_page;
2306 unsigned long commit;
2307 unsigned length;
2308
2309 if (iter->head != iter->next_event)
2310 return iter->event;
2311
2312 /*
2313 * When the writer goes across pages, it issues a cmpxchg which
2314 * is a mb(), which will synchronize with the rmb here.
2315 * (see rb_tail_page_update() and __rb_reserve_next())
2316 */
2317 commit = rb_page_commit(iter_head_page);
2318 smp_rmb();
2319
2320 /* An event needs to be at least 8 bytes in size */
2321 if (iter->head > commit - 8)
2322 goto reset;
2323
2324 event = __rb_page_index(iter_head_page, iter->head);
2325 length = rb_event_length(event);
2326
2327 /*
2328 * READ_ONCE() doesn't work on functions and we don't want the
2329 * compiler doing any crazy optimizations with length.
2330 */
2331 barrier();
2332
2333 if ((iter->head + length) > commit || length > BUF_PAGE_SIZE)
2334 /* Writer corrupted the read? */
2335 goto reset;
2336
2337 memcpy(iter->event, event, length);
2338 /*
2339 * If the page stamp is still the same after this rmb() then the
2340 * event was safely copied without the writer entering the page.
2341 */
2342 smp_rmb();
2343
2344 /* Make sure the page didn't change since we read this */
2345 if (iter->page_stamp != iter_head_page->page->time_stamp ||
2346 commit > rb_page_commit(iter_head_page))
2347 goto reset;
2348
2349 iter->next_event = iter->head + length;
2350 return iter->event;
2351 reset:
2352 /* Reset to the beginning */
2353 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2354 iter->head = 0;
2355 iter->next_event = 0;
2356 iter->missed_events = 1;
2357 return NULL;
2358 }
2359
2360 /* Size is determined by what has been committed */
rb_page_size(struct buffer_page * bpage)2361 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2362 {
2363 return rb_page_commit(bpage);
2364 }
2365
2366 static __always_inline unsigned
rb_commit_index(struct ring_buffer_per_cpu * cpu_buffer)2367 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2368 {
2369 return rb_page_commit(cpu_buffer->commit_page);
2370 }
2371
2372 static __always_inline unsigned
rb_event_index(struct ring_buffer_event * event)2373 rb_event_index(struct ring_buffer_event *event)
2374 {
2375 unsigned long addr = (unsigned long)event;
2376
2377 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
2378 }
2379
rb_inc_iter(struct ring_buffer_iter * iter)2380 static void rb_inc_iter(struct ring_buffer_iter *iter)
2381 {
2382 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2383
2384 /*
2385 * The iterator could be on the reader page (it starts there).
2386 * But the head could have moved, since the reader was
2387 * found. Check for this case and assign the iterator
2388 * to the head page instead of next.
2389 */
2390 if (iter->head_page == cpu_buffer->reader_page)
2391 iter->head_page = rb_set_head_page(cpu_buffer);
2392 else
2393 rb_inc_page(&iter->head_page);
2394
2395 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2396 iter->head = 0;
2397 iter->next_event = 0;
2398 }
2399
2400 /*
2401 * rb_handle_head_page - writer hit the head page
2402 *
2403 * Returns: +1 to retry page
2404 * 0 to continue
2405 * -1 on error
2406 */
2407 static int
rb_handle_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)2408 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2409 struct buffer_page *tail_page,
2410 struct buffer_page *next_page)
2411 {
2412 struct buffer_page *new_head;
2413 int entries;
2414 int type;
2415 int ret;
2416
2417 entries = rb_page_entries(next_page);
2418
2419 /*
2420 * The hard part is here. We need to move the head
2421 * forward, and protect against both readers on
2422 * other CPUs and writers coming in via interrupts.
2423 */
2424 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2425 RB_PAGE_HEAD);
2426
2427 /*
2428 * type can be one of four:
2429 * NORMAL - an interrupt already moved it for us
2430 * HEAD - we are the first to get here.
2431 * UPDATE - we are the interrupt interrupting
2432 * a current move.
2433 * MOVED - a reader on another CPU moved the next
2434 * pointer to its reader page. Give up
2435 * and try again.
2436 */
2437
2438 switch (type) {
2439 case RB_PAGE_HEAD:
2440 /*
2441 * We changed the head to UPDATE, thus
2442 * it is our responsibility to update
2443 * the counters.
2444 */
2445 local_add(entries, &cpu_buffer->overrun);
2446 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
2447 local_inc(&cpu_buffer->pages_lost);
2448
2449 /*
2450 * The entries will be zeroed out when we move the
2451 * tail page.
2452 */
2453
2454 /* still more to do */
2455 break;
2456
2457 case RB_PAGE_UPDATE:
2458 /*
2459 * This is an interrupt that interrupt the
2460 * previous update. Still more to do.
2461 */
2462 break;
2463 case RB_PAGE_NORMAL:
2464 /*
2465 * An interrupt came in before the update
2466 * and processed this for us.
2467 * Nothing left to do.
2468 */
2469 return 1;
2470 case RB_PAGE_MOVED:
2471 /*
2472 * The reader is on another CPU and just did
2473 * a swap with our next_page.
2474 * Try again.
2475 */
2476 return 1;
2477 default:
2478 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2479 return -1;
2480 }
2481
2482 /*
2483 * Now that we are here, the old head pointer is
2484 * set to UPDATE. This will keep the reader from
2485 * swapping the head page with the reader page.
2486 * The reader (on another CPU) will spin till
2487 * we are finished.
2488 *
2489 * We just need to protect against interrupts
2490 * doing the job. We will set the next pointer
2491 * to HEAD. After that, we set the old pointer
2492 * to NORMAL, but only if it was HEAD before.
2493 * otherwise we are an interrupt, and only
2494 * want the outer most commit to reset it.
2495 */
2496 new_head = next_page;
2497 rb_inc_page(&new_head);
2498
2499 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2500 RB_PAGE_NORMAL);
2501
2502 /*
2503 * Valid returns are:
2504 * HEAD - an interrupt came in and already set it.
2505 * NORMAL - One of two things:
2506 * 1) We really set it.
2507 * 2) A bunch of interrupts came in and moved
2508 * the page forward again.
2509 */
2510 switch (ret) {
2511 case RB_PAGE_HEAD:
2512 case RB_PAGE_NORMAL:
2513 /* OK */
2514 break;
2515 default:
2516 RB_WARN_ON(cpu_buffer, 1);
2517 return -1;
2518 }
2519
2520 /*
2521 * It is possible that an interrupt came in,
2522 * set the head up, then more interrupts came in
2523 * and moved it again. When we get back here,
2524 * the page would have been set to NORMAL but we
2525 * just set it back to HEAD.
2526 *
2527 * How do you detect this? Well, if that happened
2528 * the tail page would have moved.
2529 */
2530 if (ret == RB_PAGE_NORMAL) {
2531 struct buffer_page *buffer_tail_page;
2532
2533 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2534 /*
2535 * If the tail had moved passed next, then we need
2536 * to reset the pointer.
2537 */
2538 if (buffer_tail_page != tail_page &&
2539 buffer_tail_page != next_page)
2540 rb_head_page_set_normal(cpu_buffer, new_head,
2541 next_page,
2542 RB_PAGE_HEAD);
2543 }
2544
2545 /*
2546 * If this was the outer most commit (the one that
2547 * changed the original pointer from HEAD to UPDATE),
2548 * then it is up to us to reset it to NORMAL.
2549 */
2550 if (type == RB_PAGE_HEAD) {
2551 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2552 tail_page,
2553 RB_PAGE_UPDATE);
2554 if (RB_WARN_ON(cpu_buffer,
2555 ret != RB_PAGE_UPDATE))
2556 return -1;
2557 }
2558
2559 return 0;
2560 }
2561
2562 static inline void
rb_reset_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2563 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2564 unsigned long tail, struct rb_event_info *info)
2565 {
2566 struct buffer_page *tail_page = info->tail_page;
2567 struct ring_buffer_event *event;
2568 unsigned long length = info->length;
2569
2570 /*
2571 * Only the event that crossed the page boundary
2572 * must fill the old tail_page with padding.
2573 */
2574 if (tail >= BUF_PAGE_SIZE) {
2575 /*
2576 * If the page was filled, then we still need
2577 * to update the real_end. Reset it to zero
2578 * and the reader will ignore it.
2579 */
2580 if (tail == BUF_PAGE_SIZE)
2581 tail_page->real_end = 0;
2582
2583 local_sub(length, &tail_page->write);
2584 return;
2585 }
2586
2587 event = __rb_page_index(tail_page, tail);
2588
2589 /*
2590 * Save the original length to the meta data.
2591 * This will be used by the reader to add lost event
2592 * counter.
2593 */
2594 tail_page->real_end = tail;
2595
2596 /*
2597 * If this event is bigger than the minimum size, then
2598 * we need to be careful that we don't subtract the
2599 * write counter enough to allow another writer to slip
2600 * in on this page.
2601 * We put in a discarded commit instead, to make sure
2602 * that this space is not used again, and this space will
2603 * not be accounted into 'entries_bytes'.
2604 *
2605 * If we are less than the minimum size, we don't need to
2606 * worry about it.
2607 */
2608 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2609 /* No room for any events */
2610
2611 /* Mark the rest of the page with padding */
2612 rb_event_set_padding(event);
2613
2614 /* Make sure the padding is visible before the write update */
2615 smp_wmb();
2616
2617 /* Set the write back to the previous setting */
2618 local_sub(length, &tail_page->write);
2619 return;
2620 }
2621
2622 /* Put in a discarded event */
2623 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2624 event->type_len = RINGBUF_TYPE_PADDING;
2625 /* time delta must be non zero */
2626 event->time_delta = 1;
2627
2628 /* account for padding bytes */
2629 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2630
2631 /* Make sure the padding is visible before the tail_page->write update */
2632 smp_wmb();
2633
2634 /* Set write to end of buffer */
2635 length = (tail + length) - BUF_PAGE_SIZE;
2636 local_sub(length, &tail_page->write);
2637 }
2638
2639 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2640
2641 /*
2642 * This is the slow path, force gcc not to inline it.
2643 */
2644 static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2645 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2646 unsigned long tail, struct rb_event_info *info)
2647 {
2648 struct buffer_page *tail_page = info->tail_page;
2649 struct buffer_page *commit_page = cpu_buffer->commit_page;
2650 struct trace_buffer *buffer = cpu_buffer->buffer;
2651 struct buffer_page *next_page;
2652 int ret;
2653
2654 next_page = tail_page;
2655
2656 rb_inc_page(&next_page);
2657
2658 /*
2659 * If for some reason, we had an interrupt storm that made
2660 * it all the way around the buffer, bail, and warn
2661 * about it.
2662 */
2663 if (unlikely(next_page == commit_page)) {
2664 local_inc(&cpu_buffer->commit_overrun);
2665 goto out_reset;
2666 }
2667
2668 /*
2669 * This is where the fun begins!
2670 *
2671 * We are fighting against races between a reader that
2672 * could be on another CPU trying to swap its reader
2673 * page with the buffer head.
2674 *
2675 * We are also fighting against interrupts coming in and
2676 * moving the head or tail on us as well.
2677 *
2678 * If the next page is the head page then we have filled
2679 * the buffer, unless the commit page is still on the
2680 * reader page.
2681 */
2682 if (rb_is_head_page(next_page, &tail_page->list)) {
2683
2684 /*
2685 * If the commit is not on the reader page, then
2686 * move the header page.
2687 */
2688 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2689 /*
2690 * If we are not in overwrite mode,
2691 * this is easy, just stop here.
2692 */
2693 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2694 local_inc(&cpu_buffer->dropped_events);
2695 goto out_reset;
2696 }
2697
2698 ret = rb_handle_head_page(cpu_buffer,
2699 tail_page,
2700 next_page);
2701 if (ret < 0)
2702 goto out_reset;
2703 if (ret)
2704 goto out_again;
2705 } else {
2706 /*
2707 * We need to be careful here too. The
2708 * commit page could still be on the reader
2709 * page. We could have a small buffer, and
2710 * have filled up the buffer with events
2711 * from interrupts and such, and wrapped.
2712 *
2713 * Note, if the tail page is also on the
2714 * reader_page, we let it move out.
2715 */
2716 if (unlikely((cpu_buffer->commit_page !=
2717 cpu_buffer->tail_page) &&
2718 (cpu_buffer->commit_page ==
2719 cpu_buffer->reader_page))) {
2720 local_inc(&cpu_buffer->commit_overrun);
2721 goto out_reset;
2722 }
2723 }
2724 }
2725
2726 rb_tail_page_update(cpu_buffer, tail_page, next_page);
2727
2728 out_again:
2729
2730 rb_reset_tail(cpu_buffer, tail, info);
2731
2732 /* Commit what we have for now. */
2733 rb_end_commit(cpu_buffer);
2734 /* rb_end_commit() decs committing */
2735 local_inc(&cpu_buffer->committing);
2736
2737 /* fail and let the caller try again */
2738 return ERR_PTR(-EAGAIN);
2739
2740 out_reset:
2741 /* reset write */
2742 rb_reset_tail(cpu_buffer, tail, info);
2743
2744 return NULL;
2745 }
2746
2747 /* Slow path */
2748 static struct ring_buffer_event *
rb_add_time_stamp(struct ring_buffer_event * event,u64 delta,bool abs)2749 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2750 {
2751 if (abs)
2752 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2753 else
2754 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2755
2756 /* Not the first event on the page, or not delta? */
2757 if (abs || rb_event_index(event)) {
2758 event->time_delta = delta & TS_MASK;
2759 event->array[0] = delta >> TS_SHIFT;
2760 } else {
2761 /* nope, just zero it */
2762 event->time_delta = 0;
2763 event->array[0] = 0;
2764 }
2765
2766 return skip_time_extend(event);
2767 }
2768
2769 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
sched_clock_stable(void)2770 static inline bool sched_clock_stable(void)
2771 {
2772 return true;
2773 }
2774 #endif
2775
2776 static void
rb_check_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)2777 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2778 struct rb_event_info *info)
2779 {
2780 u64 write_stamp;
2781
2782 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
2783 (unsigned long long)info->delta,
2784 (unsigned long long)info->ts,
2785 (unsigned long long)info->before,
2786 (unsigned long long)info->after,
2787 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
2788 sched_clock_stable() ? "" :
2789 "If you just came from a suspend/resume,\n"
2790 "please switch to the trace global clock:\n"
2791 " echo global > /sys/kernel/debug/tracing/trace_clock\n"
2792 "or add trace_clock=global to the kernel command line\n");
2793 }
2794
rb_add_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event ** event,struct rb_event_info * info,u64 * delta,unsigned int * length)2795 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2796 struct ring_buffer_event **event,
2797 struct rb_event_info *info,
2798 u64 *delta,
2799 unsigned int *length)
2800 {
2801 bool abs = info->add_timestamp &
2802 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2803
2804 if (unlikely(info->delta > (1ULL << 59))) {
2805 /* did the clock go backwards */
2806 if (info->before == info->after && info->before > info->ts) {
2807 /* not interrupted */
2808 static int once;
2809
2810 /*
2811 * This is possible with a recalibrating of the TSC.
2812 * Do not produce a call stack, but just report it.
2813 */
2814 if (!once) {
2815 once++;
2816 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2817 info->before, info->ts);
2818 }
2819 } else
2820 rb_check_timestamp(cpu_buffer, info);
2821 if (!abs)
2822 info->delta = 0;
2823 }
2824 *event = rb_add_time_stamp(*event, info->delta, abs);
2825 *length -= RB_LEN_TIME_EXTEND;
2826 *delta = 0;
2827 }
2828
2829 /**
2830 * rb_update_event - update event type and data
2831 * @cpu_buffer: The per cpu buffer of the @event
2832 * @event: the event to update
2833 * @info: The info to update the @event with (contains length and delta)
2834 *
2835 * Update the type and data fields of the @event. The length
2836 * is the actual size that is written to the ring buffer,
2837 * and with this, we can determine what to place into the
2838 * data field.
2839 */
2840 static void
rb_update_event(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event,struct rb_event_info * info)2841 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2842 struct ring_buffer_event *event,
2843 struct rb_event_info *info)
2844 {
2845 unsigned length = info->length;
2846 u64 delta = info->delta;
2847 unsigned int nest = local_read(&cpu_buffer->committing) - 1;
2848
2849 if (!WARN_ON_ONCE(nest >= MAX_NEST))
2850 cpu_buffer->event_stamp[nest] = info->ts;
2851
2852 /*
2853 * If we need to add a timestamp, then we
2854 * add it to the start of the reserved space.
2855 */
2856 if (unlikely(info->add_timestamp))
2857 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
2858
2859 event->time_delta = delta;
2860 length -= RB_EVNT_HDR_SIZE;
2861 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2862 event->type_len = 0;
2863 event->array[0] = length;
2864 } else
2865 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2866 }
2867
rb_calculate_event_length(unsigned length)2868 static unsigned rb_calculate_event_length(unsigned length)
2869 {
2870 struct ring_buffer_event event; /* Used only for sizeof array */
2871
2872 /* zero length can cause confusions */
2873 if (!length)
2874 length++;
2875
2876 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2877 length += sizeof(event.array[0]);
2878
2879 length += RB_EVNT_HDR_SIZE;
2880 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2881
2882 /*
2883 * In case the time delta is larger than the 27 bits for it
2884 * in the header, we need to add a timestamp. If another
2885 * event comes in when trying to discard this one to increase
2886 * the length, then the timestamp will be added in the allocated
2887 * space of this event. If length is bigger than the size needed
2888 * for the TIME_EXTEND, then padding has to be used. The events
2889 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2890 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2891 * As length is a multiple of 4, we only need to worry if it
2892 * is 12 (RB_LEN_TIME_EXTEND + 4).
2893 */
2894 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2895 length += RB_ALIGNMENT;
2896
2897 return length;
2898 }
2899
2900 static inline int
rb_try_to_discard(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2901 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2902 struct ring_buffer_event *event)
2903 {
2904 unsigned long new_index, old_index;
2905 struct buffer_page *bpage;
2906 unsigned long index;
2907 unsigned long addr;
2908
2909 new_index = rb_event_index(event);
2910 old_index = new_index + rb_event_ts_length(event);
2911 addr = (unsigned long)event;
2912 addr &= PAGE_MASK;
2913
2914 bpage = READ_ONCE(cpu_buffer->tail_page);
2915
2916 /*
2917 * Make sure the tail_page is still the same and
2918 * the next write location is the end of this event
2919 */
2920 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2921 unsigned long write_mask =
2922 local_read(&bpage->write) & ~RB_WRITE_MASK;
2923 unsigned long event_length = rb_event_length(event);
2924
2925 /*
2926 * For the before_stamp to be different than the write_stamp
2927 * to make sure that the next event adds an absolute
2928 * value and does not rely on the saved write stamp, which
2929 * is now going to be bogus.
2930 *
2931 * By setting the before_stamp to zero, the next event
2932 * is not going to use the write_stamp and will instead
2933 * create an absolute timestamp. This means there's no
2934 * reason to update the wirte_stamp!
2935 */
2936 rb_time_set(&cpu_buffer->before_stamp, 0);
2937
2938 /*
2939 * If an event were to come in now, it would see that the
2940 * write_stamp and the before_stamp are different, and assume
2941 * that this event just added itself before updating
2942 * the write stamp. The interrupting event will fix the
2943 * write stamp for us, and use an absolute timestamp.
2944 */
2945
2946 /*
2947 * This is on the tail page. It is possible that
2948 * a write could come in and move the tail page
2949 * and write to the next page. That is fine
2950 * because we just shorten what is on this page.
2951 */
2952 old_index += write_mask;
2953 new_index += write_mask;
2954 index = local_cmpxchg(&bpage->write, old_index, new_index);
2955 if (index == old_index) {
2956 /* update counters */
2957 local_sub(event_length, &cpu_buffer->entries_bytes);
2958 return 1;
2959 }
2960 }
2961
2962 /* could not discard */
2963 return 0;
2964 }
2965
rb_start_commit(struct ring_buffer_per_cpu * cpu_buffer)2966 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2967 {
2968 local_inc(&cpu_buffer->committing);
2969 local_inc(&cpu_buffer->commits);
2970 }
2971
2972 static __always_inline void
rb_set_commit_to_write(struct ring_buffer_per_cpu * cpu_buffer)2973 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2974 {
2975 unsigned long max_count;
2976
2977 /*
2978 * We only race with interrupts and NMIs on this CPU.
2979 * If we own the commit event, then we can commit
2980 * all others that interrupted us, since the interruptions
2981 * are in stack format (they finish before they come
2982 * back to us). This allows us to do a simple loop to
2983 * assign the commit to the tail.
2984 */
2985 again:
2986 max_count = cpu_buffer->nr_pages * 100;
2987
2988 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2989 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2990 return;
2991 if (RB_WARN_ON(cpu_buffer,
2992 rb_is_reader_page(cpu_buffer->tail_page)))
2993 return;
2994 /*
2995 * No need for a memory barrier here, as the update
2996 * of the tail_page did it for this page.
2997 */
2998 local_set(&cpu_buffer->commit_page->page->commit,
2999 rb_page_write(cpu_buffer->commit_page));
3000 rb_inc_page(&cpu_buffer->commit_page);
3001 /* add barrier to keep gcc from optimizing too much */
3002 barrier();
3003 }
3004 while (rb_commit_index(cpu_buffer) !=
3005 rb_page_write(cpu_buffer->commit_page)) {
3006
3007 /* Make sure the readers see the content of what is committed. */
3008 smp_wmb();
3009 local_set(&cpu_buffer->commit_page->page->commit,
3010 rb_page_write(cpu_buffer->commit_page));
3011 RB_WARN_ON(cpu_buffer,
3012 local_read(&cpu_buffer->commit_page->page->commit) &
3013 ~RB_WRITE_MASK);
3014 barrier();
3015 }
3016
3017 /* again, keep gcc from optimizing */
3018 barrier();
3019
3020 /*
3021 * If an interrupt came in just after the first while loop
3022 * and pushed the tail page forward, we will be left with
3023 * a dangling commit that will never go forward.
3024 */
3025 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
3026 goto again;
3027 }
3028
rb_end_commit(struct ring_buffer_per_cpu * cpu_buffer)3029 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
3030 {
3031 unsigned long commits;
3032
3033 if (RB_WARN_ON(cpu_buffer,
3034 !local_read(&cpu_buffer->committing)))
3035 return;
3036
3037 again:
3038 commits = local_read(&cpu_buffer->commits);
3039 /* synchronize with interrupts */
3040 barrier();
3041 if (local_read(&cpu_buffer->committing) == 1)
3042 rb_set_commit_to_write(cpu_buffer);
3043
3044 local_dec(&cpu_buffer->committing);
3045
3046 /* synchronize with interrupts */
3047 barrier();
3048
3049 /*
3050 * Need to account for interrupts coming in between the
3051 * updating of the commit page and the clearing of the
3052 * committing counter.
3053 */
3054 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3055 !local_read(&cpu_buffer->committing)) {
3056 local_inc(&cpu_buffer->committing);
3057 goto again;
3058 }
3059 }
3060
rb_event_discard(struct ring_buffer_event * event)3061 static inline void rb_event_discard(struct ring_buffer_event *event)
3062 {
3063 if (extended_time(event))
3064 event = skip_time_extend(event);
3065
3066 /* array[0] holds the actual length for the discarded event */
3067 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3068 event->type_len = RINGBUF_TYPE_PADDING;
3069 /* time delta must be non zero */
3070 if (!event->time_delta)
3071 event->time_delta = 1;
3072 }
3073
rb_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3074 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
3075 struct ring_buffer_event *event)
3076 {
3077 local_inc(&cpu_buffer->entries);
3078 rb_end_commit(cpu_buffer);
3079 }
3080
3081 static __always_inline void
rb_wakeups(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer)3082 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
3083 {
3084 if (buffer->irq_work.waiters_pending) {
3085 buffer->irq_work.waiters_pending = false;
3086 /* irq_work_queue() supplies it's own memory barriers */
3087 irq_work_queue(&buffer->irq_work.work);
3088 }
3089
3090 if (cpu_buffer->irq_work.waiters_pending) {
3091 cpu_buffer->irq_work.waiters_pending = false;
3092 /* irq_work_queue() supplies it's own memory barriers */
3093 irq_work_queue(&cpu_buffer->irq_work.work);
3094 }
3095
3096 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3097 return;
3098
3099 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3100 return;
3101
3102 if (!cpu_buffer->irq_work.full_waiters_pending)
3103 return;
3104
3105 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3106
3107 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
3108 return;
3109
3110 cpu_buffer->irq_work.wakeup_full = true;
3111 cpu_buffer->irq_work.full_waiters_pending = false;
3112 /* irq_work_queue() supplies it's own memory barriers */
3113 irq_work_queue(&cpu_buffer->irq_work.work);
3114 }
3115
3116 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
3117 # define do_ring_buffer_record_recursion() \
3118 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
3119 #else
3120 # define do_ring_buffer_record_recursion() do { } while (0)
3121 #endif
3122
3123 /*
3124 * The lock and unlock are done within a preempt disable section.
3125 * The current_context per_cpu variable can only be modified
3126 * by the current task between lock and unlock. But it can
3127 * be modified more than once via an interrupt. To pass this
3128 * information from the lock to the unlock without having to
3129 * access the 'in_interrupt()' functions again (which do show
3130 * a bit of overhead in something as critical as function tracing,
3131 * we use a bitmask trick.
3132 *
3133 * bit 1 = NMI context
3134 * bit 2 = IRQ context
3135 * bit 3 = SoftIRQ context
3136 * bit 4 = normal context.
3137 *
3138 * This works because this is the order of contexts that can
3139 * preempt other contexts. A SoftIRQ never preempts an IRQ
3140 * context.
3141 *
3142 * When the context is determined, the corresponding bit is
3143 * checked and set (if it was set, then a recursion of that context
3144 * happened).
3145 *
3146 * On unlock, we need to clear this bit. To do so, just subtract
3147 * 1 from the current_context and AND it to itself.
3148 *
3149 * (binary)
3150 * 101 - 1 = 100
3151 * 101 & 100 = 100 (clearing bit zero)
3152 *
3153 * 1010 - 1 = 1001
3154 * 1010 & 1001 = 1000 (clearing bit 1)
3155 *
3156 * The least significant bit can be cleared this way, and it
3157 * just so happens that it is the same bit corresponding to
3158 * the current context.
3159 *
3160 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3161 * is set when a recursion is detected at the current context, and if
3162 * the TRANSITION bit is already set, it will fail the recursion.
3163 * This is needed because there's a lag between the changing of
3164 * interrupt context and updating the preempt count. In this case,
3165 * a false positive will be found. To handle this, one extra recursion
3166 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3167 * bit is already set, then it is considered a recursion and the function
3168 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3169 *
3170 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3171 * to be cleared. Even if it wasn't the context that set it. That is,
3172 * if an interrupt comes in while NORMAL bit is set and the ring buffer
3173 * is called before preempt_count() is updated, since the check will
3174 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3175 * NMI then comes in, it will set the NMI bit, but when the NMI code
3176 * does the trace_recursive_unlock() it will clear the TRANSITION bit
3177 * and leave the NMI bit set. But this is fine, because the interrupt
3178 * code that set the TRANSITION bit will then clear the NMI bit when it
3179 * calls trace_recursive_unlock(). If another NMI comes in, it will
3180 * set the TRANSITION bit and continue.
3181 *
3182 * Note: The TRANSITION bit only handles a single transition between context.
3183 */
3184
3185 static __always_inline int
trace_recursive_lock(struct ring_buffer_per_cpu * cpu_buffer)3186 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3187 {
3188 unsigned int val = cpu_buffer->current_context;
3189 int bit = interrupt_context_level();
3190
3191 bit = RB_CTX_NORMAL - bit;
3192
3193 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3194 /*
3195 * It is possible that this was called by transitioning
3196 * between interrupt context, and preempt_count() has not
3197 * been updated yet. In this case, use the TRANSITION bit.
3198 */
3199 bit = RB_CTX_TRANSITION;
3200 if (val & (1 << (bit + cpu_buffer->nest))) {
3201 do_ring_buffer_record_recursion();
3202 return 1;
3203 }
3204 }
3205
3206 val |= (1 << (bit + cpu_buffer->nest));
3207 cpu_buffer->current_context = val;
3208
3209 return 0;
3210 }
3211
3212 static __always_inline void
trace_recursive_unlock(struct ring_buffer_per_cpu * cpu_buffer)3213 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3214 {
3215 cpu_buffer->current_context &=
3216 cpu_buffer->current_context - (1 << cpu_buffer->nest);
3217 }
3218
3219 /* The recursive locking above uses 5 bits */
3220 #define NESTED_BITS 5
3221
3222 /**
3223 * ring_buffer_nest_start - Allow to trace while nested
3224 * @buffer: The ring buffer to modify
3225 *
3226 * The ring buffer has a safety mechanism to prevent recursion.
3227 * But there may be a case where a trace needs to be done while
3228 * tracing something else. In this case, calling this function
3229 * will allow this function to nest within a currently active
3230 * ring_buffer_lock_reserve().
3231 *
3232 * Call this function before calling another ring_buffer_lock_reserve() and
3233 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3234 */
ring_buffer_nest_start(struct trace_buffer * buffer)3235 void ring_buffer_nest_start(struct trace_buffer *buffer)
3236 {
3237 struct ring_buffer_per_cpu *cpu_buffer;
3238 int cpu;
3239
3240 /* Enabled by ring_buffer_nest_end() */
3241 preempt_disable_notrace();
3242 cpu = raw_smp_processor_id();
3243 cpu_buffer = buffer->buffers[cpu];
3244 /* This is the shift value for the above recursive locking */
3245 cpu_buffer->nest += NESTED_BITS;
3246 }
3247
3248 /**
3249 * ring_buffer_nest_end - Allow to trace while nested
3250 * @buffer: The ring buffer to modify
3251 *
3252 * Must be called after ring_buffer_nest_start() and after the
3253 * ring_buffer_unlock_commit().
3254 */
ring_buffer_nest_end(struct trace_buffer * buffer)3255 void ring_buffer_nest_end(struct trace_buffer *buffer)
3256 {
3257 struct ring_buffer_per_cpu *cpu_buffer;
3258 int cpu;
3259
3260 /* disabled by ring_buffer_nest_start() */
3261 cpu = raw_smp_processor_id();
3262 cpu_buffer = buffer->buffers[cpu];
3263 /* This is the shift value for the above recursive locking */
3264 cpu_buffer->nest -= NESTED_BITS;
3265 preempt_enable_notrace();
3266 }
3267
3268 /**
3269 * ring_buffer_unlock_commit - commit a reserved
3270 * @buffer: The buffer to commit to
3271 * @event: The event pointer to commit.
3272 *
3273 * This commits the data to the ring buffer, and releases any locks held.
3274 *
3275 * Must be paired with ring_buffer_lock_reserve.
3276 */
ring_buffer_unlock_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3277 int ring_buffer_unlock_commit(struct trace_buffer *buffer,
3278 struct ring_buffer_event *event)
3279 {
3280 struct ring_buffer_per_cpu *cpu_buffer;
3281 int cpu = raw_smp_processor_id();
3282
3283 cpu_buffer = buffer->buffers[cpu];
3284
3285 rb_commit(cpu_buffer, event);
3286
3287 rb_wakeups(buffer, cpu_buffer);
3288
3289 trace_recursive_unlock(cpu_buffer);
3290
3291 preempt_enable_notrace();
3292
3293 return 0;
3294 }
3295 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3296
3297 /* Special value to validate all deltas on a page. */
3298 #define CHECK_FULL_PAGE 1L
3299
3300 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
dump_buffer_page(struct buffer_data_page * bpage,struct rb_event_info * info,unsigned long tail)3301 static void dump_buffer_page(struct buffer_data_page *bpage,
3302 struct rb_event_info *info,
3303 unsigned long tail)
3304 {
3305 struct ring_buffer_event *event;
3306 u64 ts, delta;
3307 int e;
3308
3309 ts = bpage->time_stamp;
3310 pr_warn(" [%lld] PAGE TIME STAMP\n", ts);
3311
3312 for (e = 0; e < tail; e += rb_event_length(event)) {
3313
3314 event = (struct ring_buffer_event *)(bpage->data + e);
3315
3316 switch (event->type_len) {
3317
3318 case RINGBUF_TYPE_TIME_EXTEND:
3319 delta = rb_event_time_stamp(event);
3320 ts += delta;
3321 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta);
3322 break;
3323
3324 case RINGBUF_TYPE_TIME_STAMP:
3325 delta = rb_event_time_stamp(event);
3326 ts = delta;
3327 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta);
3328 break;
3329
3330 case RINGBUF_TYPE_PADDING:
3331 ts += event->time_delta;
3332 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta);
3333 break;
3334
3335 case RINGBUF_TYPE_DATA:
3336 ts += event->time_delta;
3337 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta);
3338 break;
3339
3340 default:
3341 break;
3342 }
3343 }
3344 }
3345
3346 static DEFINE_PER_CPU(atomic_t, checking);
3347 static atomic_t ts_dump;
3348
3349 /*
3350 * Check if the current event time stamp matches the deltas on
3351 * the buffer page.
3352 */
check_buffer(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info,unsigned long tail)3353 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3354 struct rb_event_info *info,
3355 unsigned long tail)
3356 {
3357 struct ring_buffer_event *event;
3358 struct buffer_data_page *bpage;
3359 u64 ts, delta;
3360 bool full = false;
3361 int e;
3362
3363 bpage = info->tail_page->page;
3364
3365 if (tail == CHECK_FULL_PAGE) {
3366 full = true;
3367 tail = local_read(&bpage->commit);
3368 } else if (info->add_timestamp &
3369 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
3370 /* Ignore events with absolute time stamps */
3371 return;
3372 }
3373
3374 /*
3375 * Do not check the first event (skip possible extends too).
3376 * Also do not check if previous events have not been committed.
3377 */
3378 if (tail <= 8 || tail > local_read(&bpage->commit))
3379 return;
3380
3381 /*
3382 * If this interrupted another event,
3383 */
3384 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
3385 goto out;
3386
3387 ts = bpage->time_stamp;
3388
3389 for (e = 0; e < tail; e += rb_event_length(event)) {
3390
3391 event = (struct ring_buffer_event *)(bpage->data + e);
3392
3393 switch (event->type_len) {
3394
3395 case RINGBUF_TYPE_TIME_EXTEND:
3396 delta = rb_event_time_stamp(event);
3397 ts += delta;
3398 break;
3399
3400 case RINGBUF_TYPE_TIME_STAMP:
3401 delta = rb_event_time_stamp(event);
3402 ts = delta;
3403 break;
3404
3405 case RINGBUF_TYPE_PADDING:
3406 if (event->time_delta == 1)
3407 break;
3408 fallthrough;
3409 case RINGBUF_TYPE_DATA:
3410 ts += event->time_delta;
3411 break;
3412
3413 default:
3414 RB_WARN_ON(cpu_buffer, 1);
3415 }
3416 }
3417 if ((full && ts > info->ts) ||
3418 (!full && ts + info->delta != info->ts)) {
3419 /* If another report is happening, ignore this one */
3420 if (atomic_inc_return(&ts_dump) != 1) {
3421 atomic_dec(&ts_dump);
3422 goto out;
3423 }
3424 atomic_inc(&cpu_buffer->record_disabled);
3425 /* There's some cases in boot up that this can happen */
3426 WARN_ON_ONCE(system_state != SYSTEM_BOOTING);
3427 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n",
3428 cpu_buffer->cpu,
3429 ts + info->delta, info->ts, info->delta,
3430 info->before, info->after,
3431 full ? " (full)" : "");
3432 dump_buffer_page(bpage, info, tail);
3433 atomic_dec(&ts_dump);
3434 /* Do not re-enable checking */
3435 return;
3436 }
3437 out:
3438 atomic_dec(this_cpu_ptr(&checking));
3439 }
3440 #else
check_buffer(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info,unsigned long tail)3441 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3442 struct rb_event_info *info,
3443 unsigned long tail)
3444 {
3445 }
3446 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
3447
3448 static struct ring_buffer_event *
__rb_reserve_next(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)3449 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
3450 struct rb_event_info *info)
3451 {
3452 struct ring_buffer_event *event;
3453 struct buffer_page *tail_page;
3454 unsigned long tail, write, w;
3455 bool a_ok;
3456 bool b_ok;
3457
3458 /* Don't let the compiler play games with cpu_buffer->tail_page */
3459 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
3460
3461 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
3462 barrier();
3463 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3464 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3465 barrier();
3466 info->ts = rb_time_stamp(cpu_buffer->buffer);
3467
3468 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
3469 info->delta = info->ts;
3470 } else {
3471 /*
3472 * If interrupting an event time update, we may need an
3473 * absolute timestamp.
3474 * Don't bother if this is the start of a new page (w == 0).
3475 */
3476 if (!w) {
3477 /* Use the sub-buffer timestamp */
3478 info->delta = 0;
3479 } else if (unlikely(!a_ok || !b_ok || info->before != info->after)) {
3480 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3481 info->length += RB_LEN_TIME_EXTEND;
3482 } else {
3483 info->delta = info->ts - info->after;
3484 if (unlikely(test_time_stamp(info->delta))) {
3485 info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3486 info->length += RB_LEN_TIME_EXTEND;
3487 }
3488 }
3489 }
3490
3491 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);
3492
3493 /*C*/ write = local_add_return(info->length, &tail_page->write);
3494
3495 /* set write to only the index of the write */
3496 write &= RB_WRITE_MASK;
3497
3498 tail = write - info->length;
3499
3500 /* See if we shot pass the end of this buffer page */
3501 if (unlikely(write > BUF_PAGE_SIZE)) {
3502 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
3503 return rb_move_tail(cpu_buffer, tail, info);
3504 }
3505
3506 if (likely(tail == w)) {
3507 /* Nothing interrupted us between A and C */
3508 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts);
3509 /*
3510 * If something came in between C and D, the write stamp
3511 * may now not be in sync. But that's fine as the before_stamp
3512 * will be different and then next event will just be forced
3513 * to use an absolute timestamp.
3514 */
3515 if (likely(!(info->add_timestamp &
3516 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3517 /* This did not interrupt any time update */
3518 info->delta = info->ts - info->after;
3519 else
3520 /* Just use full timestamp for interrupting event */
3521 info->delta = info->ts;
3522 check_buffer(cpu_buffer, info, tail);
3523 } else {
3524 u64 ts;
3525 /* SLOW PATH - Interrupted between A and C */
3526
3527 /* Save the old before_stamp */
3528 a_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3529 RB_WARN_ON(cpu_buffer, !a_ok);
3530
3531 /*
3532 * Read a new timestamp and update the before_stamp to make
3533 * the next event after this one force using an absolute
3534 * timestamp. This is in case an interrupt were to come in
3535 * between E and F.
3536 */
3537 ts = rb_time_stamp(cpu_buffer->buffer);
3538 rb_time_set(&cpu_buffer->before_stamp, ts);
3539
3540 barrier();
3541 /*E*/ a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3542 /* Was interrupted before here, write_stamp must be valid */
3543 RB_WARN_ON(cpu_buffer, !a_ok);
3544 barrier();
3545 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3546 info->after == info->before && info->after < ts) {
3547 /*
3548 * Nothing came after this event between C and F, it is
3549 * safe to use info->after for the delta as it
3550 * matched info->before and is still valid.
3551 */
3552 info->delta = ts - info->after;
3553 } else {
3554 /*
3555 * Interrupted between C and F:
3556 * Lost the previous events time stamp. Just set the
3557 * delta to zero, and this will be the same time as
3558 * the event this event interrupted. And the events that
3559 * came after this will still be correct (as they would
3560 * have built their delta on the previous event.
3561 */
3562 info->delta = 0;
3563 }
3564 info->ts = ts;
3565 info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
3566 }
3567
3568 /*
3569 * If this is the first commit on the page, then it has the same
3570 * timestamp as the page itself.
3571 */
3572 if (unlikely(!tail && !(info->add_timestamp &
3573 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3574 info->delta = 0;
3575
3576 /* We reserved something on the buffer */
3577
3578 event = __rb_page_index(tail_page, tail);
3579 rb_update_event(cpu_buffer, event, info);
3580
3581 local_inc(&tail_page->entries);
3582
3583 /*
3584 * If this is the first commit on the page, then update
3585 * its timestamp.
3586 */
3587 if (unlikely(!tail))
3588 tail_page->page->time_stamp = info->ts;
3589
3590 /* account for these added bytes */
3591 local_add(info->length, &cpu_buffer->entries_bytes);
3592
3593 return event;
3594 }
3595
3596 static __always_inline struct ring_buffer_event *
rb_reserve_next_event(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer,unsigned long length)3597 rb_reserve_next_event(struct trace_buffer *buffer,
3598 struct ring_buffer_per_cpu *cpu_buffer,
3599 unsigned long length)
3600 {
3601 struct ring_buffer_event *event;
3602 struct rb_event_info info;
3603 int nr_loops = 0;
3604 int add_ts_default;
3605
3606 /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3607 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3608 (unlikely(in_nmi()))) {
3609 return NULL;
3610 }
3611
3612 rb_start_commit(cpu_buffer);
3613 /* The commit page can not change after this */
3614
3615 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3616 /*
3617 * Due to the ability to swap a cpu buffer from a buffer
3618 * it is possible it was swapped before we committed.
3619 * (committing stops a swap). We check for it here and
3620 * if it happened, we have to fail the write.
3621 */
3622 barrier();
3623 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3624 local_dec(&cpu_buffer->committing);
3625 local_dec(&cpu_buffer->commits);
3626 return NULL;
3627 }
3628 #endif
3629
3630 info.length = rb_calculate_event_length(length);
3631
3632 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3633 add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3634 info.length += RB_LEN_TIME_EXTEND;
3635 if (info.length > BUF_MAX_DATA_SIZE)
3636 goto out_fail;
3637 } else {
3638 add_ts_default = RB_ADD_STAMP_NONE;
3639 }
3640
3641 again:
3642 info.add_timestamp = add_ts_default;
3643 info.delta = 0;
3644
3645 /*
3646 * We allow for interrupts to reenter here and do a trace.
3647 * If one does, it will cause this original code to loop
3648 * back here. Even with heavy interrupts happening, this
3649 * should only happen a few times in a row. If this happens
3650 * 1000 times in a row, there must be either an interrupt
3651 * storm or we have something buggy.
3652 * Bail!
3653 */
3654 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3655 goto out_fail;
3656
3657 event = __rb_reserve_next(cpu_buffer, &info);
3658
3659 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3660 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
3661 info.length -= RB_LEN_TIME_EXTEND;
3662 goto again;
3663 }
3664
3665 if (likely(event))
3666 return event;
3667 out_fail:
3668 rb_end_commit(cpu_buffer);
3669 return NULL;
3670 }
3671
3672 /**
3673 * ring_buffer_lock_reserve - reserve a part of the buffer
3674 * @buffer: the ring buffer to reserve from
3675 * @length: the length of the data to reserve (excluding event header)
3676 *
3677 * Returns a reserved event on the ring buffer to copy directly to.
3678 * The user of this interface will need to get the body to write into
3679 * and can use the ring_buffer_event_data() interface.
3680 *
3681 * The length is the length of the data needed, not the event length
3682 * which also includes the event header.
3683 *
3684 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3685 * If NULL is returned, then nothing has been allocated or locked.
3686 */
3687 struct ring_buffer_event *
ring_buffer_lock_reserve(struct trace_buffer * buffer,unsigned long length)3688 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
3689 {
3690 struct ring_buffer_per_cpu *cpu_buffer;
3691 struct ring_buffer_event *event;
3692 int cpu;
3693
3694 /* If we are tracing schedule, we don't want to recurse */
3695 preempt_disable_notrace();
3696
3697 if (unlikely(atomic_read(&buffer->record_disabled)))
3698 goto out;
3699
3700 cpu = raw_smp_processor_id();
3701
3702 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3703 goto out;
3704
3705 cpu_buffer = buffer->buffers[cpu];
3706
3707 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3708 goto out;
3709
3710 if (unlikely(length > BUF_MAX_DATA_SIZE))
3711 goto out;
3712
3713 if (unlikely(trace_recursive_lock(cpu_buffer)))
3714 goto out;
3715
3716 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3717 if (!event)
3718 goto out_unlock;
3719
3720 return event;
3721
3722 out_unlock:
3723 trace_recursive_unlock(cpu_buffer);
3724 out:
3725 preempt_enable_notrace();
3726 return NULL;
3727 }
3728 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3729
3730 /*
3731 * Decrement the entries to the page that an event is on.
3732 * The event does not even need to exist, only the pointer
3733 * to the page it is on. This may only be called before the commit
3734 * takes place.
3735 */
3736 static inline void
rb_decrement_entry(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3737 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3738 struct ring_buffer_event *event)
3739 {
3740 unsigned long addr = (unsigned long)event;
3741 struct buffer_page *bpage = cpu_buffer->commit_page;
3742 struct buffer_page *start;
3743
3744 addr &= PAGE_MASK;
3745
3746 /* Do the likely case first */
3747 if (likely(bpage->page == (void *)addr)) {
3748 local_dec(&bpage->entries);
3749 return;
3750 }
3751
3752 /*
3753 * Because the commit page may be on the reader page we
3754 * start with the next page and check the end loop there.
3755 */
3756 rb_inc_page(&bpage);
3757 start = bpage;
3758 do {
3759 if (bpage->page == (void *)addr) {
3760 local_dec(&bpage->entries);
3761 return;
3762 }
3763 rb_inc_page(&bpage);
3764 } while (bpage != start);
3765
3766 /* commit not part of this buffer?? */
3767 RB_WARN_ON(cpu_buffer, 1);
3768 }
3769
3770 /**
3771 * ring_buffer_discard_commit - discard an event that has not been committed
3772 * @buffer: the ring buffer
3773 * @event: non committed event to discard
3774 *
3775 * Sometimes an event that is in the ring buffer needs to be ignored.
3776 * This function lets the user discard an event in the ring buffer
3777 * and then that event will not be read later.
3778 *
3779 * This function only works if it is called before the item has been
3780 * committed. It will try to free the event from the ring buffer
3781 * if another event has not been added behind it.
3782 *
3783 * If another event has been added behind it, it will set the event
3784 * up as discarded, and perform the commit.
3785 *
3786 * If this function is called, do not call ring_buffer_unlock_commit on
3787 * the event.
3788 */
ring_buffer_discard_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3789 void ring_buffer_discard_commit(struct trace_buffer *buffer,
3790 struct ring_buffer_event *event)
3791 {
3792 struct ring_buffer_per_cpu *cpu_buffer;
3793 int cpu;
3794
3795 /* The event is discarded regardless */
3796 rb_event_discard(event);
3797
3798 cpu = smp_processor_id();
3799 cpu_buffer = buffer->buffers[cpu];
3800
3801 /*
3802 * This must only be called if the event has not been
3803 * committed yet. Thus we can assume that preemption
3804 * is still disabled.
3805 */
3806 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3807
3808 rb_decrement_entry(cpu_buffer, event);
3809 if (rb_try_to_discard(cpu_buffer, event))
3810 goto out;
3811
3812 out:
3813 rb_end_commit(cpu_buffer);
3814
3815 trace_recursive_unlock(cpu_buffer);
3816
3817 preempt_enable_notrace();
3818
3819 }
3820 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3821
3822 /**
3823 * ring_buffer_write - write data to the buffer without reserving
3824 * @buffer: The ring buffer to write to.
3825 * @length: The length of the data being written (excluding the event header)
3826 * @data: The data to write to the buffer.
3827 *
3828 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3829 * one function. If you already have the data to write to the buffer, it
3830 * may be easier to simply call this function.
3831 *
3832 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3833 * and not the length of the event which would hold the header.
3834 */
ring_buffer_write(struct trace_buffer * buffer,unsigned long length,void * data)3835 int ring_buffer_write(struct trace_buffer *buffer,
3836 unsigned long length,
3837 void *data)
3838 {
3839 struct ring_buffer_per_cpu *cpu_buffer;
3840 struct ring_buffer_event *event;
3841 void *body;
3842 int ret = -EBUSY;
3843 int cpu;
3844
3845 preempt_disable_notrace();
3846
3847 if (atomic_read(&buffer->record_disabled))
3848 goto out;
3849
3850 cpu = raw_smp_processor_id();
3851
3852 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3853 goto out;
3854
3855 cpu_buffer = buffer->buffers[cpu];
3856
3857 if (atomic_read(&cpu_buffer->record_disabled))
3858 goto out;
3859
3860 if (length > BUF_MAX_DATA_SIZE)
3861 goto out;
3862
3863 if (unlikely(trace_recursive_lock(cpu_buffer)))
3864 goto out;
3865
3866 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3867 if (!event)
3868 goto out_unlock;
3869
3870 body = rb_event_data(event);
3871
3872 memcpy(body, data, length);
3873
3874 rb_commit(cpu_buffer, event);
3875
3876 rb_wakeups(buffer, cpu_buffer);
3877
3878 ret = 0;
3879
3880 out_unlock:
3881 trace_recursive_unlock(cpu_buffer);
3882
3883 out:
3884 preempt_enable_notrace();
3885
3886 return ret;
3887 }
3888 EXPORT_SYMBOL_GPL(ring_buffer_write);
3889
rb_per_cpu_empty(struct ring_buffer_per_cpu * cpu_buffer)3890 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3891 {
3892 struct buffer_page *reader = cpu_buffer->reader_page;
3893 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3894 struct buffer_page *commit = cpu_buffer->commit_page;
3895
3896 /* In case of error, head will be NULL */
3897 if (unlikely(!head))
3898 return true;
3899
3900 /* Reader should exhaust content in reader page */
3901 if (reader->read != rb_page_commit(reader))
3902 return false;
3903
3904 /*
3905 * If writers are committing on the reader page, knowing all
3906 * committed content has been read, the ring buffer is empty.
3907 */
3908 if (commit == reader)
3909 return true;
3910
3911 /*
3912 * If writers are committing on a page other than reader page
3913 * and head page, there should always be content to read.
3914 */
3915 if (commit != head)
3916 return false;
3917
3918 /*
3919 * Writers are committing on the head page, we just need
3920 * to care about there're committed data, and the reader will
3921 * swap reader page with head page when it is to read data.
3922 */
3923 return rb_page_commit(commit) == 0;
3924 }
3925
3926 /**
3927 * ring_buffer_record_disable - stop all writes into the buffer
3928 * @buffer: The ring buffer to stop writes to.
3929 *
3930 * This prevents all writes to the buffer. Any attempt to write
3931 * to the buffer after this will fail and return NULL.
3932 *
3933 * The caller should call synchronize_rcu() after this.
3934 */
ring_buffer_record_disable(struct trace_buffer * buffer)3935 void ring_buffer_record_disable(struct trace_buffer *buffer)
3936 {
3937 atomic_inc(&buffer->record_disabled);
3938 }
3939 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3940
3941 /**
3942 * ring_buffer_record_enable - enable writes to the buffer
3943 * @buffer: The ring buffer to enable writes
3944 *
3945 * Note, multiple disables will need the same number of enables
3946 * to truly enable the writing (much like preempt_disable).
3947 */
ring_buffer_record_enable(struct trace_buffer * buffer)3948 void ring_buffer_record_enable(struct trace_buffer *buffer)
3949 {
3950 atomic_dec(&buffer->record_disabled);
3951 }
3952 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3953
3954 /**
3955 * ring_buffer_record_off - stop all writes into the buffer
3956 * @buffer: The ring buffer to stop writes to.
3957 *
3958 * This prevents all writes to the buffer. Any attempt to write
3959 * to the buffer after this will fail and return NULL.
3960 *
3961 * This is different than ring_buffer_record_disable() as
3962 * it works like an on/off switch, where as the disable() version
3963 * must be paired with a enable().
3964 */
ring_buffer_record_off(struct trace_buffer * buffer)3965 void ring_buffer_record_off(struct trace_buffer *buffer)
3966 {
3967 unsigned int rd;
3968 unsigned int new_rd;
3969
3970 do {
3971 rd = atomic_read(&buffer->record_disabled);
3972 new_rd = rd | RB_BUFFER_OFF;
3973 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3974 }
3975 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3976
3977 /**
3978 * ring_buffer_record_on - restart writes into the buffer
3979 * @buffer: The ring buffer to start writes to.
3980 *
3981 * This enables all writes to the buffer that was disabled by
3982 * ring_buffer_record_off().
3983 *
3984 * This is different than ring_buffer_record_enable() as
3985 * it works like an on/off switch, where as the enable() version
3986 * must be paired with a disable().
3987 */
ring_buffer_record_on(struct trace_buffer * buffer)3988 void ring_buffer_record_on(struct trace_buffer *buffer)
3989 {
3990 unsigned int rd;
3991 unsigned int new_rd;
3992
3993 do {
3994 rd = atomic_read(&buffer->record_disabled);
3995 new_rd = rd & ~RB_BUFFER_OFF;
3996 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3997 }
3998 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3999
4000 /**
4001 * ring_buffer_record_is_on - return true if the ring buffer can write
4002 * @buffer: The ring buffer to see if write is enabled
4003 *
4004 * Returns true if the ring buffer is in a state that it accepts writes.
4005 */
ring_buffer_record_is_on(struct trace_buffer * buffer)4006 bool ring_buffer_record_is_on(struct trace_buffer *buffer)
4007 {
4008 return !atomic_read(&buffer->record_disabled);
4009 }
4010
4011 /**
4012 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
4013 * @buffer: The ring buffer to see if write is set enabled
4014 *
4015 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
4016 * Note that this does NOT mean it is in a writable state.
4017 *
4018 * It may return true when the ring buffer has been disabled by
4019 * ring_buffer_record_disable(), as that is a temporary disabling of
4020 * the ring buffer.
4021 */
ring_buffer_record_is_set_on(struct trace_buffer * buffer)4022 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
4023 {
4024 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
4025 }
4026
4027 /**
4028 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
4029 * @buffer: The ring buffer to stop writes to.
4030 * @cpu: The CPU buffer to stop
4031 *
4032 * This prevents all writes to the buffer. Any attempt to write
4033 * to the buffer after this will fail and return NULL.
4034 *
4035 * The caller should call synchronize_rcu() after this.
4036 */
ring_buffer_record_disable_cpu(struct trace_buffer * buffer,int cpu)4037 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
4038 {
4039 struct ring_buffer_per_cpu *cpu_buffer;
4040
4041 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4042 return;
4043
4044 cpu_buffer = buffer->buffers[cpu];
4045 atomic_inc(&cpu_buffer->record_disabled);
4046 }
4047 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
4048
4049 /**
4050 * ring_buffer_record_enable_cpu - enable writes to the buffer
4051 * @buffer: The ring buffer to enable writes
4052 * @cpu: The CPU to enable.
4053 *
4054 * Note, multiple disables will need the same number of enables
4055 * to truly enable the writing (much like preempt_disable).
4056 */
ring_buffer_record_enable_cpu(struct trace_buffer * buffer,int cpu)4057 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
4058 {
4059 struct ring_buffer_per_cpu *cpu_buffer;
4060
4061 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4062 return;
4063
4064 cpu_buffer = buffer->buffers[cpu];
4065 atomic_dec(&cpu_buffer->record_disabled);
4066 }
4067 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
4068
4069 /*
4070 * The total entries in the ring buffer is the running counter
4071 * of entries entered into the ring buffer, minus the sum of
4072 * the entries read from the ring buffer and the number of
4073 * entries that were overwritten.
4074 */
4075 static inline unsigned long
rb_num_of_entries(struct ring_buffer_per_cpu * cpu_buffer)4076 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
4077 {
4078 return local_read(&cpu_buffer->entries) -
4079 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
4080 }
4081
4082 /**
4083 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
4084 * @buffer: The ring buffer
4085 * @cpu: The per CPU buffer to read from.
4086 */
ring_buffer_oldest_event_ts(struct trace_buffer * buffer,int cpu)4087 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
4088 {
4089 unsigned long flags;
4090 struct ring_buffer_per_cpu *cpu_buffer;
4091 struct buffer_page *bpage;
4092 u64 ret = 0;
4093
4094 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4095 return 0;
4096
4097 cpu_buffer = buffer->buffers[cpu];
4098 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4099 /*
4100 * if the tail is on reader_page, oldest time stamp is on the reader
4101 * page
4102 */
4103 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
4104 bpage = cpu_buffer->reader_page;
4105 else
4106 bpage = rb_set_head_page(cpu_buffer);
4107 if (bpage)
4108 ret = bpage->page->time_stamp;
4109 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4110
4111 return ret;
4112 }
4113 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
4114
4115 /**
4116 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
4117 * @buffer: The ring buffer
4118 * @cpu: The per CPU buffer to read from.
4119 */
ring_buffer_bytes_cpu(struct trace_buffer * buffer,int cpu)4120 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
4121 {
4122 struct ring_buffer_per_cpu *cpu_buffer;
4123 unsigned long ret;
4124
4125 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4126 return 0;
4127
4128 cpu_buffer = buffer->buffers[cpu];
4129 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
4130
4131 return ret;
4132 }
4133 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
4134
4135 /**
4136 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
4137 * @buffer: The ring buffer
4138 * @cpu: The per CPU buffer to get the entries from.
4139 */
ring_buffer_entries_cpu(struct trace_buffer * buffer,int cpu)4140 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
4141 {
4142 struct ring_buffer_per_cpu *cpu_buffer;
4143
4144 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4145 return 0;
4146
4147 cpu_buffer = buffer->buffers[cpu];
4148
4149 return rb_num_of_entries(cpu_buffer);
4150 }
4151 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
4152
4153 /**
4154 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
4155 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
4156 * @buffer: The ring buffer
4157 * @cpu: The per CPU buffer to get the number of overruns from
4158 */
ring_buffer_overrun_cpu(struct trace_buffer * buffer,int cpu)4159 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
4160 {
4161 struct ring_buffer_per_cpu *cpu_buffer;
4162 unsigned long ret;
4163
4164 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4165 return 0;
4166
4167 cpu_buffer = buffer->buffers[cpu];
4168 ret = local_read(&cpu_buffer->overrun);
4169
4170 return ret;
4171 }
4172 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
4173
4174 /**
4175 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4176 * commits failing due to the buffer wrapping around while there are uncommitted
4177 * events, such as during an interrupt storm.
4178 * @buffer: The ring buffer
4179 * @cpu: The per CPU buffer to get the number of overruns from
4180 */
4181 unsigned long
ring_buffer_commit_overrun_cpu(struct trace_buffer * buffer,int cpu)4182 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
4183 {
4184 struct ring_buffer_per_cpu *cpu_buffer;
4185 unsigned long ret;
4186
4187 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4188 return 0;
4189
4190 cpu_buffer = buffer->buffers[cpu];
4191 ret = local_read(&cpu_buffer->commit_overrun);
4192
4193 return ret;
4194 }
4195 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4196
4197 /**
4198 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4199 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4200 * @buffer: The ring buffer
4201 * @cpu: The per CPU buffer to get the number of overruns from
4202 */
4203 unsigned long
ring_buffer_dropped_events_cpu(struct trace_buffer * buffer,int cpu)4204 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
4205 {
4206 struct ring_buffer_per_cpu *cpu_buffer;
4207 unsigned long ret;
4208
4209 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4210 return 0;
4211
4212 cpu_buffer = buffer->buffers[cpu];
4213 ret = local_read(&cpu_buffer->dropped_events);
4214
4215 return ret;
4216 }
4217 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4218
4219 /**
4220 * ring_buffer_read_events_cpu - get the number of events successfully read
4221 * @buffer: The ring buffer
4222 * @cpu: The per CPU buffer to get the number of events read
4223 */
4224 unsigned long
ring_buffer_read_events_cpu(struct trace_buffer * buffer,int cpu)4225 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
4226 {
4227 struct ring_buffer_per_cpu *cpu_buffer;
4228
4229 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4230 return 0;
4231
4232 cpu_buffer = buffer->buffers[cpu];
4233 return cpu_buffer->read;
4234 }
4235 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4236
4237 /**
4238 * ring_buffer_entries - get the number of entries in a buffer
4239 * @buffer: The ring buffer
4240 *
4241 * Returns the total number of entries in the ring buffer
4242 * (all CPU entries)
4243 */
ring_buffer_entries(struct trace_buffer * buffer)4244 unsigned long ring_buffer_entries(struct trace_buffer *buffer)
4245 {
4246 struct ring_buffer_per_cpu *cpu_buffer;
4247 unsigned long entries = 0;
4248 int cpu;
4249
4250 /* if you care about this being correct, lock the buffer */
4251 for_each_buffer_cpu(buffer, cpu) {
4252 cpu_buffer = buffer->buffers[cpu];
4253 entries += rb_num_of_entries(cpu_buffer);
4254 }
4255
4256 return entries;
4257 }
4258 EXPORT_SYMBOL_GPL(ring_buffer_entries);
4259
4260 /**
4261 * ring_buffer_overruns - get the number of overruns in buffer
4262 * @buffer: The ring buffer
4263 *
4264 * Returns the total number of overruns in the ring buffer
4265 * (all CPU entries)
4266 */
ring_buffer_overruns(struct trace_buffer * buffer)4267 unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
4268 {
4269 struct ring_buffer_per_cpu *cpu_buffer;
4270 unsigned long overruns = 0;
4271 int cpu;
4272
4273 /* if you care about this being correct, lock the buffer */
4274 for_each_buffer_cpu(buffer, cpu) {
4275 cpu_buffer = buffer->buffers[cpu];
4276 overruns += local_read(&cpu_buffer->overrun);
4277 }
4278
4279 return overruns;
4280 }
4281 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
4282
rb_iter_reset(struct ring_buffer_iter * iter)4283 static void rb_iter_reset(struct ring_buffer_iter *iter)
4284 {
4285 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4286
4287 /* Iterator usage is expected to have record disabled */
4288 iter->head_page = cpu_buffer->reader_page;
4289 iter->head = cpu_buffer->reader_page->read;
4290 iter->next_event = iter->head;
4291
4292 iter->cache_reader_page = iter->head_page;
4293 iter->cache_read = cpu_buffer->read;
4294 iter->cache_pages_removed = cpu_buffer->pages_removed;
4295
4296 if (iter->head) {
4297 iter->read_stamp = cpu_buffer->read_stamp;
4298 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4299 } else {
4300 iter->read_stamp = iter->head_page->page->time_stamp;
4301 iter->page_stamp = iter->read_stamp;
4302 }
4303 }
4304
4305 /**
4306 * ring_buffer_iter_reset - reset an iterator
4307 * @iter: The iterator to reset
4308 *
4309 * Resets the iterator, so that it will start from the beginning
4310 * again.
4311 */
ring_buffer_iter_reset(struct ring_buffer_iter * iter)4312 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4313 {
4314 struct ring_buffer_per_cpu *cpu_buffer;
4315 unsigned long flags;
4316
4317 if (!iter)
4318 return;
4319
4320 cpu_buffer = iter->cpu_buffer;
4321
4322 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4323 rb_iter_reset(iter);
4324 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4325 }
4326 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
4327
4328 /**
4329 * ring_buffer_iter_empty - check if an iterator has no more to read
4330 * @iter: The iterator to check
4331 */
ring_buffer_iter_empty(struct ring_buffer_iter * iter)4332 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4333 {
4334 struct ring_buffer_per_cpu *cpu_buffer;
4335 struct buffer_page *reader;
4336 struct buffer_page *head_page;
4337 struct buffer_page *commit_page;
4338 struct buffer_page *curr_commit_page;
4339 unsigned commit;
4340 u64 curr_commit_ts;
4341 u64 commit_ts;
4342
4343 cpu_buffer = iter->cpu_buffer;
4344 reader = cpu_buffer->reader_page;
4345 head_page = cpu_buffer->head_page;
4346 commit_page = cpu_buffer->commit_page;
4347 commit_ts = commit_page->page->time_stamp;
4348
4349 /*
4350 * When the writer goes across pages, it issues a cmpxchg which
4351 * is a mb(), which will synchronize with the rmb here.
4352 * (see rb_tail_page_update())
4353 */
4354 smp_rmb();
4355 commit = rb_page_commit(commit_page);
4356 /* We want to make sure that the commit page doesn't change */
4357 smp_rmb();
4358
4359 /* Make sure commit page didn't change */
4360 curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4361 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4362
4363 /* If the commit page changed, then there's more data */
4364 if (curr_commit_page != commit_page ||
4365 curr_commit_ts != commit_ts)
4366 return 0;
4367
4368 /* Still racy, as it may return a false positive, but that's OK */
4369 return ((iter->head_page == commit_page && iter->head >= commit) ||
4370 (iter->head_page == reader && commit_page == head_page &&
4371 head_page->read == commit &&
4372 iter->head == rb_page_commit(cpu_buffer->reader_page)));
4373 }
4374 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
4375
4376 static void
rb_update_read_stamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)4377 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4378 struct ring_buffer_event *event)
4379 {
4380 u64 delta;
4381
4382 switch (event->type_len) {
4383 case RINGBUF_TYPE_PADDING:
4384 return;
4385
4386 case RINGBUF_TYPE_TIME_EXTEND:
4387 delta = rb_event_time_stamp(event);
4388 cpu_buffer->read_stamp += delta;
4389 return;
4390
4391 case RINGBUF_TYPE_TIME_STAMP:
4392 delta = rb_event_time_stamp(event);
4393 cpu_buffer->read_stamp = delta;
4394 return;
4395
4396 case RINGBUF_TYPE_DATA:
4397 cpu_buffer->read_stamp += event->time_delta;
4398 return;
4399
4400 default:
4401 RB_WARN_ON(cpu_buffer, 1);
4402 }
4403 return;
4404 }
4405
4406 static void
rb_update_iter_read_stamp(struct ring_buffer_iter * iter,struct ring_buffer_event * event)4407 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4408 struct ring_buffer_event *event)
4409 {
4410 u64 delta;
4411
4412 switch (event->type_len) {
4413 case RINGBUF_TYPE_PADDING:
4414 return;
4415
4416 case RINGBUF_TYPE_TIME_EXTEND:
4417 delta = rb_event_time_stamp(event);
4418 iter->read_stamp += delta;
4419 return;
4420
4421 case RINGBUF_TYPE_TIME_STAMP:
4422 delta = rb_event_time_stamp(event);
4423 iter->read_stamp = delta;
4424 return;
4425
4426 case RINGBUF_TYPE_DATA:
4427 iter->read_stamp += event->time_delta;
4428 return;
4429
4430 default:
4431 RB_WARN_ON(iter->cpu_buffer, 1);
4432 }
4433 return;
4434 }
4435
4436 static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu * cpu_buffer)4437 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4438 {
4439 struct buffer_page *reader = NULL;
4440 unsigned long overwrite;
4441 unsigned long flags;
4442 int nr_loops = 0;
4443 int ret;
4444
4445 local_irq_save(flags);
4446 arch_spin_lock(&cpu_buffer->lock);
4447
4448 again:
4449 /*
4450 * This should normally only loop twice. But because the
4451 * start of the reader inserts an empty page, it causes
4452 * a case where we will loop three times. There should be no
4453 * reason to loop four times (that I know of).
4454 */
4455 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
4456 reader = NULL;
4457 goto out;
4458 }
4459
4460 reader = cpu_buffer->reader_page;
4461
4462 /* If there's more to read, return this page */
4463 if (cpu_buffer->reader_page->read < rb_page_size(reader))
4464 goto out;
4465
4466 /* Never should we have an index greater than the size */
4467 if (RB_WARN_ON(cpu_buffer,
4468 cpu_buffer->reader_page->read > rb_page_size(reader)))
4469 goto out;
4470
4471 /* check if we caught up to the tail */
4472 reader = NULL;
4473 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
4474 goto out;
4475
4476 /* Don't bother swapping if the ring buffer is empty */
4477 if (rb_num_of_entries(cpu_buffer) == 0)
4478 goto out;
4479
4480 /*
4481 * Reset the reader page to size zero.
4482 */
4483 local_set(&cpu_buffer->reader_page->write, 0);
4484 local_set(&cpu_buffer->reader_page->entries, 0);
4485 local_set(&cpu_buffer->reader_page->page->commit, 0);
4486 cpu_buffer->reader_page->real_end = 0;
4487
4488 spin:
4489 /*
4490 * Splice the empty reader page into the list around the head.
4491 */
4492 reader = rb_set_head_page(cpu_buffer);
4493 if (!reader)
4494 goto out;
4495 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
4496 cpu_buffer->reader_page->list.prev = reader->list.prev;
4497
4498 /*
4499 * cpu_buffer->pages just needs to point to the buffer, it
4500 * has no specific buffer page to point to. Lets move it out
4501 * of our way so we don't accidentally swap it.
4502 */
4503 cpu_buffer->pages = reader->list.prev;
4504
4505 /* The reader page will be pointing to the new head */
4506 rb_set_list_to_head(&cpu_buffer->reader_page->list);
4507
4508 /*
4509 * We want to make sure we read the overruns after we set up our
4510 * pointers to the next object. The writer side does a
4511 * cmpxchg to cross pages which acts as the mb on the writer
4512 * side. Note, the reader will constantly fail the swap
4513 * while the writer is updating the pointers, so this
4514 * guarantees that the overwrite recorded here is the one we
4515 * want to compare with the last_overrun.
4516 */
4517 smp_mb();
4518 overwrite = local_read(&(cpu_buffer->overrun));
4519
4520 /*
4521 * Here's the tricky part.
4522 *
4523 * We need to move the pointer past the header page.
4524 * But we can only do that if a writer is not currently
4525 * moving it. The page before the header page has the
4526 * flag bit '1' set if it is pointing to the page we want.
4527 * but if the writer is in the process of moving it
4528 * than it will be '2' or already moved '0'.
4529 */
4530
4531 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
4532
4533 /*
4534 * If we did not convert it, then we must try again.
4535 */
4536 if (!ret)
4537 goto spin;
4538
4539 /*
4540 * Yay! We succeeded in replacing the page.
4541 *
4542 * Now make the new head point back to the reader page.
4543 */
4544 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
4545 rb_inc_page(&cpu_buffer->head_page);
4546
4547 local_inc(&cpu_buffer->pages_read);
4548
4549 /* Finally update the reader page to the new head */
4550 cpu_buffer->reader_page = reader;
4551 cpu_buffer->reader_page->read = 0;
4552
4553 if (overwrite != cpu_buffer->last_overrun) {
4554 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4555 cpu_buffer->last_overrun = overwrite;
4556 }
4557
4558 goto again;
4559
4560 out:
4561 /* Update the read_stamp on the first event */
4562 if (reader && reader->read == 0)
4563 cpu_buffer->read_stamp = reader->page->time_stamp;
4564
4565 arch_spin_unlock(&cpu_buffer->lock);
4566 local_irq_restore(flags);
4567
4568 /*
4569 * The writer has preempt disable, wait for it. But not forever
4570 * Although, 1 second is pretty much "forever"
4571 */
4572 #define USECS_WAIT 1000000
4573 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4574 /* If the write is past the end of page, a writer is still updating it */
4575 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
4576 break;
4577
4578 udelay(1);
4579
4580 /* Get the latest version of the reader write value */
4581 smp_rmb();
4582 }
4583
4584 /* The writer is not moving forward? Something is wrong */
4585 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4586 reader = NULL;
4587
4588 /*
4589 * Make sure we see any padding after the write update
4590 * (see rb_reset_tail()).
4591 *
4592 * In addition, a writer may be writing on the reader page
4593 * if the page has not been fully filled, so the read barrier
4594 * is also needed to make sure we see the content of what is
4595 * committed by the writer (see rb_set_commit_to_write()).
4596 */
4597 smp_rmb();
4598
4599
4600 return reader;
4601 }
4602
rb_advance_reader(struct ring_buffer_per_cpu * cpu_buffer)4603 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4604 {
4605 struct ring_buffer_event *event;
4606 struct buffer_page *reader;
4607 unsigned length;
4608
4609 reader = rb_get_reader_page(cpu_buffer);
4610
4611 /* This function should not be called when buffer is empty */
4612 if (RB_WARN_ON(cpu_buffer, !reader))
4613 return;
4614
4615 event = rb_reader_event(cpu_buffer);
4616
4617 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4618 cpu_buffer->read++;
4619
4620 rb_update_read_stamp(cpu_buffer, event);
4621
4622 length = rb_event_length(event);
4623 cpu_buffer->reader_page->read += length;
4624 cpu_buffer->read_bytes += length;
4625 }
4626
rb_advance_iter(struct ring_buffer_iter * iter)4627 static void rb_advance_iter(struct ring_buffer_iter *iter)
4628 {
4629 struct ring_buffer_per_cpu *cpu_buffer;
4630
4631 cpu_buffer = iter->cpu_buffer;
4632
4633 /* If head == next_event then we need to jump to the next event */
4634 if (iter->head == iter->next_event) {
4635 /* If the event gets overwritten again, there's nothing to do */
4636 if (rb_iter_head_event(iter) == NULL)
4637 return;
4638 }
4639
4640 iter->head = iter->next_event;
4641
4642 /*
4643 * Check if we are at the end of the buffer.
4644 */
4645 if (iter->next_event >= rb_page_size(iter->head_page)) {
4646 /* discarded commits can make the page empty */
4647 if (iter->head_page == cpu_buffer->commit_page)
4648 return;
4649 rb_inc_iter(iter);
4650 return;
4651 }
4652
4653 rb_update_iter_read_stamp(iter, iter->event);
4654 }
4655
rb_lost_events(struct ring_buffer_per_cpu * cpu_buffer)4656 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4657 {
4658 return cpu_buffer->lost_events;
4659 }
4660
4661 static struct ring_buffer_event *
rb_buffer_peek(struct ring_buffer_per_cpu * cpu_buffer,u64 * ts,unsigned long * lost_events)4662 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4663 unsigned long *lost_events)
4664 {
4665 struct ring_buffer_event *event;
4666 struct buffer_page *reader;
4667 int nr_loops = 0;
4668
4669 if (ts)
4670 *ts = 0;
4671 again:
4672 /*
4673 * We repeat when a time extend is encountered.
4674 * Since the time extend is always attached to a data event,
4675 * we should never loop more than once.
4676 * (We never hit the following condition more than twice).
4677 */
4678 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4679 return NULL;
4680
4681 reader = rb_get_reader_page(cpu_buffer);
4682 if (!reader)
4683 return NULL;
4684
4685 event = rb_reader_event(cpu_buffer);
4686
4687 switch (event->type_len) {
4688 case RINGBUF_TYPE_PADDING:
4689 if (rb_null_event(event))
4690 RB_WARN_ON(cpu_buffer, 1);
4691 /*
4692 * Because the writer could be discarding every
4693 * event it creates (which would probably be bad)
4694 * if we were to go back to "again" then we may never
4695 * catch up, and will trigger the warn on, or lock
4696 * the box. Return the padding, and we will release
4697 * the current locks, and try again.
4698 */
4699 return event;
4700
4701 case RINGBUF_TYPE_TIME_EXTEND:
4702 /* Internal data, OK to advance */
4703 rb_advance_reader(cpu_buffer);
4704 goto again;
4705
4706 case RINGBUF_TYPE_TIME_STAMP:
4707 if (ts) {
4708 *ts = rb_event_time_stamp(event);
4709 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4710 cpu_buffer->cpu, ts);
4711 }
4712 /* Internal data, OK to advance */
4713 rb_advance_reader(cpu_buffer);
4714 goto again;
4715
4716 case RINGBUF_TYPE_DATA:
4717 if (ts && !(*ts)) {
4718 *ts = cpu_buffer->read_stamp + event->time_delta;
4719 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4720 cpu_buffer->cpu, ts);
4721 }
4722 if (lost_events)
4723 *lost_events = rb_lost_events(cpu_buffer);
4724 return event;
4725
4726 default:
4727 RB_WARN_ON(cpu_buffer, 1);
4728 }
4729
4730 return NULL;
4731 }
4732 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4733
4734 static struct ring_buffer_event *
rb_iter_peek(struct ring_buffer_iter * iter,u64 * ts)4735 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4736 {
4737 struct trace_buffer *buffer;
4738 struct ring_buffer_per_cpu *cpu_buffer;
4739 struct ring_buffer_event *event;
4740 int nr_loops = 0;
4741
4742 if (ts)
4743 *ts = 0;
4744
4745 cpu_buffer = iter->cpu_buffer;
4746 buffer = cpu_buffer->buffer;
4747
4748 /*
4749 * Check if someone performed a consuming read to the buffer
4750 * or removed some pages from the buffer. In these cases,
4751 * iterator was invalidated and we need to reset it.
4752 */
4753 if (unlikely(iter->cache_read != cpu_buffer->read ||
4754 iter->cache_reader_page != cpu_buffer->reader_page ||
4755 iter->cache_pages_removed != cpu_buffer->pages_removed))
4756 rb_iter_reset(iter);
4757
4758 again:
4759 if (ring_buffer_iter_empty(iter))
4760 return NULL;
4761
4762 /*
4763 * As the writer can mess with what the iterator is trying
4764 * to read, just give up if we fail to get an event after
4765 * three tries. The iterator is not as reliable when reading
4766 * the ring buffer with an active write as the consumer is.
4767 * Do not warn if the three failures is reached.
4768 */
4769 if (++nr_loops > 3)
4770 return NULL;
4771
4772 if (rb_per_cpu_empty(cpu_buffer))
4773 return NULL;
4774
4775 if (iter->head >= rb_page_size(iter->head_page)) {
4776 rb_inc_iter(iter);
4777 goto again;
4778 }
4779
4780 event = rb_iter_head_event(iter);
4781 if (!event)
4782 goto again;
4783
4784 switch (event->type_len) {
4785 case RINGBUF_TYPE_PADDING:
4786 if (rb_null_event(event)) {
4787 rb_inc_iter(iter);
4788 goto again;
4789 }
4790 rb_advance_iter(iter);
4791 return event;
4792
4793 case RINGBUF_TYPE_TIME_EXTEND:
4794 /* Internal data, OK to advance */
4795 rb_advance_iter(iter);
4796 goto again;
4797
4798 case RINGBUF_TYPE_TIME_STAMP:
4799 if (ts) {
4800 *ts = rb_event_time_stamp(event);
4801 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4802 cpu_buffer->cpu, ts);
4803 }
4804 /* Internal data, OK to advance */
4805 rb_advance_iter(iter);
4806 goto again;
4807
4808 case RINGBUF_TYPE_DATA:
4809 if (ts && !(*ts)) {
4810 *ts = iter->read_stamp + event->time_delta;
4811 ring_buffer_normalize_time_stamp(buffer,
4812 cpu_buffer->cpu, ts);
4813 }
4814 return event;
4815
4816 default:
4817 RB_WARN_ON(cpu_buffer, 1);
4818 }
4819
4820 return NULL;
4821 }
4822 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4823
rb_reader_lock(struct ring_buffer_per_cpu * cpu_buffer)4824 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4825 {
4826 if (likely(!in_nmi())) {
4827 raw_spin_lock(&cpu_buffer->reader_lock);
4828 return true;
4829 }
4830
4831 /*
4832 * If an NMI die dumps out the content of the ring buffer
4833 * trylock must be used to prevent a deadlock if the NMI
4834 * preempted a task that holds the ring buffer locks. If
4835 * we get the lock then all is fine, if not, then continue
4836 * to do the read, but this can corrupt the ring buffer,
4837 * so it must be permanently disabled from future writes.
4838 * Reading from NMI is a oneshot deal.
4839 */
4840 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4841 return true;
4842
4843 /* Continue without locking, but disable the ring buffer */
4844 atomic_inc(&cpu_buffer->record_disabled);
4845 return false;
4846 }
4847
4848 static inline void
rb_reader_unlock(struct ring_buffer_per_cpu * cpu_buffer,bool locked)4849 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4850 {
4851 if (likely(locked))
4852 raw_spin_unlock(&cpu_buffer->reader_lock);
4853 return;
4854 }
4855
4856 /**
4857 * ring_buffer_peek - peek at the next event to be read
4858 * @buffer: The ring buffer to read
4859 * @cpu: The cpu to peak at
4860 * @ts: The timestamp counter of this event.
4861 * @lost_events: a variable to store if events were lost (may be NULL)
4862 *
4863 * This will return the event that will be read next, but does
4864 * not consume the data.
4865 */
4866 struct ring_buffer_event *
ring_buffer_peek(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)4867 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
4868 unsigned long *lost_events)
4869 {
4870 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4871 struct ring_buffer_event *event;
4872 unsigned long flags;
4873 bool dolock;
4874
4875 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4876 return NULL;
4877
4878 again:
4879 local_irq_save(flags);
4880 dolock = rb_reader_lock(cpu_buffer);
4881 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4882 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4883 rb_advance_reader(cpu_buffer);
4884 rb_reader_unlock(cpu_buffer, dolock);
4885 local_irq_restore(flags);
4886
4887 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4888 goto again;
4889
4890 return event;
4891 }
4892
4893 /** ring_buffer_iter_dropped - report if there are dropped events
4894 * @iter: The ring buffer iterator
4895 *
4896 * Returns true if there was dropped events since the last peek.
4897 */
ring_buffer_iter_dropped(struct ring_buffer_iter * iter)4898 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
4899 {
4900 bool ret = iter->missed_events != 0;
4901
4902 iter->missed_events = 0;
4903 return ret;
4904 }
4905 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
4906
4907 /**
4908 * ring_buffer_iter_peek - peek at the next event to be read
4909 * @iter: The ring buffer iterator
4910 * @ts: The timestamp counter of this event.
4911 *
4912 * This will return the event that will be read next, but does
4913 * not increment the iterator.
4914 */
4915 struct ring_buffer_event *
ring_buffer_iter_peek(struct ring_buffer_iter * iter,u64 * ts)4916 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4917 {
4918 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4919 struct ring_buffer_event *event;
4920 unsigned long flags;
4921
4922 again:
4923 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4924 event = rb_iter_peek(iter, ts);
4925 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4926
4927 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4928 goto again;
4929
4930 return event;
4931 }
4932
4933 /**
4934 * ring_buffer_consume - return an event and consume it
4935 * @buffer: The ring buffer to get the next event from
4936 * @cpu: the cpu to read the buffer from
4937 * @ts: a variable to store the timestamp (may be NULL)
4938 * @lost_events: a variable to store if events were lost (may be NULL)
4939 *
4940 * Returns the next event in the ring buffer, and that event is consumed.
4941 * Meaning, that sequential reads will keep returning a different event,
4942 * and eventually empty the ring buffer if the producer is slower.
4943 */
4944 struct ring_buffer_event *
ring_buffer_consume(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)4945 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
4946 unsigned long *lost_events)
4947 {
4948 struct ring_buffer_per_cpu *cpu_buffer;
4949 struct ring_buffer_event *event = NULL;
4950 unsigned long flags;
4951 bool dolock;
4952
4953 again:
4954 /* might be called in atomic */
4955 preempt_disable();
4956
4957 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4958 goto out;
4959
4960 cpu_buffer = buffer->buffers[cpu];
4961 local_irq_save(flags);
4962 dolock = rb_reader_lock(cpu_buffer);
4963
4964 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4965 if (event) {
4966 cpu_buffer->lost_events = 0;
4967 rb_advance_reader(cpu_buffer);
4968 }
4969
4970 rb_reader_unlock(cpu_buffer, dolock);
4971 local_irq_restore(flags);
4972
4973 out:
4974 preempt_enable();
4975
4976 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4977 goto again;
4978
4979 return event;
4980 }
4981 EXPORT_SYMBOL_GPL(ring_buffer_consume);
4982
4983 /**
4984 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4985 * @buffer: The ring buffer to read from
4986 * @cpu: The cpu buffer to iterate over
4987 * @flags: gfp flags to use for memory allocation
4988 *
4989 * This performs the initial preparations necessary to iterate
4990 * through the buffer. Memory is allocated, buffer recording
4991 * is disabled, and the iterator pointer is returned to the caller.
4992 *
4993 * Disabling buffer recording prevents the reading from being
4994 * corrupted. This is not a consuming read, so a producer is not
4995 * expected.
4996 *
4997 * After a sequence of ring_buffer_read_prepare calls, the user is
4998 * expected to make at least one call to ring_buffer_read_prepare_sync.
4999 * Afterwards, ring_buffer_read_start is invoked to get things going
5000 * for real.
5001 *
5002 * This overall must be paired with ring_buffer_read_finish.
5003 */
5004 struct ring_buffer_iter *
ring_buffer_read_prepare(struct trace_buffer * buffer,int cpu,gfp_t flags)5005 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
5006 {
5007 struct ring_buffer_per_cpu *cpu_buffer;
5008 struct ring_buffer_iter *iter;
5009
5010 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5011 return NULL;
5012
5013 iter = kzalloc(sizeof(*iter), flags);
5014 if (!iter)
5015 return NULL;
5016
5017 /* Holds the entire event: data and meta data */
5018 iter->event = kmalloc(BUF_PAGE_SIZE, flags);
5019 if (!iter->event) {
5020 kfree(iter);
5021 return NULL;
5022 }
5023
5024 cpu_buffer = buffer->buffers[cpu];
5025
5026 iter->cpu_buffer = cpu_buffer;
5027
5028 atomic_inc(&cpu_buffer->resize_disabled);
5029
5030 return iter;
5031 }
5032 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
5033
5034 /**
5035 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
5036 *
5037 * All previously invoked ring_buffer_read_prepare calls to prepare
5038 * iterators will be synchronized. Afterwards, read_buffer_read_start
5039 * calls on those iterators are allowed.
5040 */
5041 void
ring_buffer_read_prepare_sync(void)5042 ring_buffer_read_prepare_sync(void)
5043 {
5044 synchronize_rcu();
5045 }
5046 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
5047
5048 /**
5049 * ring_buffer_read_start - start a non consuming read of the buffer
5050 * @iter: The iterator returned by ring_buffer_read_prepare
5051 *
5052 * This finalizes the startup of an iteration through the buffer.
5053 * The iterator comes from a call to ring_buffer_read_prepare and
5054 * an intervening ring_buffer_read_prepare_sync must have been
5055 * performed.
5056 *
5057 * Must be paired with ring_buffer_read_finish.
5058 */
5059 void
ring_buffer_read_start(struct ring_buffer_iter * iter)5060 ring_buffer_read_start(struct ring_buffer_iter *iter)
5061 {
5062 struct ring_buffer_per_cpu *cpu_buffer;
5063 unsigned long flags;
5064
5065 if (!iter)
5066 return;
5067
5068 cpu_buffer = iter->cpu_buffer;
5069
5070 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5071 arch_spin_lock(&cpu_buffer->lock);
5072 rb_iter_reset(iter);
5073 arch_spin_unlock(&cpu_buffer->lock);
5074 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5075 }
5076 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
5077
5078 /**
5079 * ring_buffer_read_finish - finish reading the iterator of the buffer
5080 * @iter: The iterator retrieved by ring_buffer_start
5081 *
5082 * This re-enables the recording to the buffer, and frees the
5083 * iterator.
5084 */
5085 void
ring_buffer_read_finish(struct ring_buffer_iter * iter)5086 ring_buffer_read_finish(struct ring_buffer_iter *iter)
5087 {
5088 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5089 unsigned long flags;
5090
5091 /*
5092 * Ring buffer is disabled from recording, here's a good place
5093 * to check the integrity of the ring buffer.
5094 * Must prevent readers from trying to read, as the check
5095 * clears the HEAD page and readers require it.
5096 */
5097 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5098 rb_check_pages(cpu_buffer);
5099 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5100
5101 atomic_dec(&cpu_buffer->resize_disabled);
5102 kfree(iter->event);
5103 kfree(iter);
5104 }
5105 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
5106
5107 /**
5108 * ring_buffer_iter_advance - advance the iterator to the next location
5109 * @iter: The ring buffer iterator
5110 *
5111 * Move the location of the iterator such that the next read will
5112 * be the next location of the iterator.
5113 */
ring_buffer_iter_advance(struct ring_buffer_iter * iter)5114 void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
5115 {
5116 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5117 unsigned long flags;
5118
5119 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5120
5121 rb_advance_iter(iter);
5122
5123 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5124 }
5125 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
5126
5127 /**
5128 * ring_buffer_size - return the size of the ring buffer (in bytes)
5129 * @buffer: The ring buffer.
5130 * @cpu: The CPU to get ring buffer size from.
5131 */
ring_buffer_size(struct trace_buffer * buffer,int cpu)5132 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
5133 {
5134 /*
5135 * Earlier, this method returned
5136 * BUF_PAGE_SIZE * buffer->nr_pages
5137 * Since the nr_pages field is now removed, we have converted this to
5138 * return the per cpu buffer value.
5139 */
5140 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5141 return 0;
5142
5143 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
5144 }
5145 EXPORT_SYMBOL_GPL(ring_buffer_size);
5146
rb_clear_buffer_page(struct buffer_page * page)5147 static void rb_clear_buffer_page(struct buffer_page *page)
5148 {
5149 local_set(&page->write, 0);
5150 local_set(&page->entries, 0);
5151 rb_init_page(page->page);
5152 page->read = 0;
5153 }
5154
5155 static void
rb_reset_cpu(struct ring_buffer_per_cpu * cpu_buffer)5156 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
5157 {
5158 struct buffer_page *page;
5159
5160 rb_head_page_deactivate(cpu_buffer);
5161
5162 cpu_buffer->head_page
5163 = list_entry(cpu_buffer->pages, struct buffer_page, list);
5164 rb_clear_buffer_page(cpu_buffer->head_page);
5165 list_for_each_entry(page, cpu_buffer->pages, list) {
5166 rb_clear_buffer_page(page);
5167 }
5168
5169 cpu_buffer->tail_page = cpu_buffer->head_page;
5170 cpu_buffer->commit_page = cpu_buffer->head_page;
5171
5172 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5173 INIT_LIST_HEAD(&cpu_buffer->new_pages);
5174 rb_clear_buffer_page(cpu_buffer->reader_page);
5175
5176 local_set(&cpu_buffer->entries_bytes, 0);
5177 local_set(&cpu_buffer->overrun, 0);
5178 local_set(&cpu_buffer->commit_overrun, 0);
5179 local_set(&cpu_buffer->dropped_events, 0);
5180 local_set(&cpu_buffer->entries, 0);
5181 local_set(&cpu_buffer->committing, 0);
5182 local_set(&cpu_buffer->commits, 0);
5183 local_set(&cpu_buffer->pages_touched, 0);
5184 local_set(&cpu_buffer->pages_lost, 0);
5185 local_set(&cpu_buffer->pages_read, 0);
5186 cpu_buffer->last_pages_touch = 0;
5187 cpu_buffer->shortest_full = 0;
5188 cpu_buffer->read = 0;
5189 cpu_buffer->read_bytes = 0;
5190
5191 rb_time_set(&cpu_buffer->write_stamp, 0);
5192 rb_time_set(&cpu_buffer->before_stamp, 0);
5193
5194 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
5195
5196 cpu_buffer->lost_events = 0;
5197 cpu_buffer->last_overrun = 0;
5198
5199 rb_head_page_activate(cpu_buffer);
5200 cpu_buffer->pages_removed = 0;
5201 }
5202
5203 /* Must have disabled the cpu buffer then done a synchronize_rcu */
reset_disabled_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)5204 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5205 {
5206 unsigned long flags;
5207
5208 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5209
5210 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5211 goto out;
5212
5213 arch_spin_lock(&cpu_buffer->lock);
5214
5215 rb_reset_cpu(cpu_buffer);
5216
5217 arch_spin_unlock(&cpu_buffer->lock);
5218
5219 out:
5220 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5221 }
5222
5223 /**
5224 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5225 * @buffer: The ring buffer to reset a per cpu buffer of
5226 * @cpu: The CPU buffer to be reset
5227 */
ring_buffer_reset_cpu(struct trace_buffer * buffer,int cpu)5228 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
5229 {
5230 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5231
5232 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5233 return;
5234
5235 /* prevent another thread from changing buffer sizes */
5236 mutex_lock(&buffer->mutex);
5237
5238 atomic_inc(&cpu_buffer->resize_disabled);
5239 atomic_inc(&cpu_buffer->record_disabled);
5240
5241 /* Make sure all commits have finished */
5242 synchronize_rcu();
5243
5244 reset_disabled_cpu_buffer(cpu_buffer);
5245
5246 atomic_dec(&cpu_buffer->record_disabled);
5247 atomic_dec(&cpu_buffer->resize_disabled);
5248
5249 mutex_unlock(&buffer->mutex);
5250 }
5251 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
5252
5253 /* Flag to ensure proper resetting of atomic variables */
5254 #define RESET_BIT (1 << 30)
5255
5256 /**
5257 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5258 * @buffer: The ring buffer to reset a per cpu buffer of
5259 * @cpu: The CPU buffer to be reset
5260 */
ring_buffer_reset_online_cpus(struct trace_buffer * buffer)5261 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5262 {
5263 struct ring_buffer_per_cpu *cpu_buffer;
5264 int cpu;
5265
5266 /* prevent another thread from changing buffer sizes */
5267 mutex_lock(&buffer->mutex);
5268
5269 for_each_online_buffer_cpu(buffer, cpu) {
5270 cpu_buffer = buffer->buffers[cpu];
5271
5272 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
5273 atomic_inc(&cpu_buffer->record_disabled);
5274 }
5275
5276 /* Make sure all commits have finished */
5277 synchronize_rcu();
5278
5279 for_each_buffer_cpu(buffer, cpu) {
5280 cpu_buffer = buffer->buffers[cpu];
5281
5282 /*
5283 * If a CPU came online during the synchronize_rcu(), then
5284 * ignore it.
5285 */
5286 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5287 continue;
5288
5289 reset_disabled_cpu_buffer(cpu_buffer);
5290
5291 atomic_dec(&cpu_buffer->record_disabled);
5292 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
5293 }
5294
5295 mutex_unlock(&buffer->mutex);
5296 }
5297
5298 /**
5299 * ring_buffer_reset - reset a ring buffer
5300 * @buffer: The ring buffer to reset all cpu buffers
5301 */
ring_buffer_reset(struct trace_buffer * buffer)5302 void ring_buffer_reset(struct trace_buffer *buffer)
5303 {
5304 struct ring_buffer_per_cpu *cpu_buffer;
5305 int cpu;
5306
5307 /* prevent another thread from changing buffer sizes */
5308 mutex_lock(&buffer->mutex);
5309
5310 for_each_buffer_cpu(buffer, cpu) {
5311 cpu_buffer = buffer->buffers[cpu];
5312
5313 atomic_inc(&cpu_buffer->resize_disabled);
5314 atomic_inc(&cpu_buffer->record_disabled);
5315 }
5316
5317 /* Make sure all commits have finished */
5318 synchronize_rcu();
5319
5320 for_each_buffer_cpu(buffer, cpu) {
5321 cpu_buffer = buffer->buffers[cpu];
5322
5323 reset_disabled_cpu_buffer(cpu_buffer);
5324
5325 atomic_dec(&cpu_buffer->record_disabled);
5326 atomic_dec(&cpu_buffer->resize_disabled);
5327 }
5328
5329 mutex_unlock(&buffer->mutex);
5330 }
5331 EXPORT_SYMBOL_GPL(ring_buffer_reset);
5332
5333 /**
5334 * rind_buffer_empty - is the ring buffer empty?
5335 * @buffer: The ring buffer to test
5336 */
ring_buffer_empty(struct trace_buffer * buffer)5337 bool ring_buffer_empty(struct trace_buffer *buffer)
5338 {
5339 struct ring_buffer_per_cpu *cpu_buffer;
5340 unsigned long flags;
5341 bool dolock;
5342 int cpu;
5343 int ret;
5344
5345 /* yes this is racy, but if you don't like the race, lock the buffer */
5346 for_each_buffer_cpu(buffer, cpu) {
5347 cpu_buffer = buffer->buffers[cpu];
5348 local_irq_save(flags);
5349 dolock = rb_reader_lock(cpu_buffer);
5350 ret = rb_per_cpu_empty(cpu_buffer);
5351 rb_reader_unlock(cpu_buffer, dolock);
5352 local_irq_restore(flags);
5353
5354 if (!ret)
5355 return false;
5356 }
5357
5358 return true;
5359 }
5360 EXPORT_SYMBOL_GPL(ring_buffer_empty);
5361
5362 /**
5363 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5364 * @buffer: The ring buffer
5365 * @cpu: The CPU buffer to test
5366 */
ring_buffer_empty_cpu(struct trace_buffer * buffer,int cpu)5367 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
5368 {
5369 struct ring_buffer_per_cpu *cpu_buffer;
5370 unsigned long flags;
5371 bool dolock;
5372 int ret;
5373
5374 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5375 return true;
5376
5377 cpu_buffer = buffer->buffers[cpu];
5378 local_irq_save(flags);
5379 dolock = rb_reader_lock(cpu_buffer);
5380 ret = rb_per_cpu_empty(cpu_buffer);
5381 rb_reader_unlock(cpu_buffer, dolock);
5382 local_irq_restore(flags);
5383
5384 return ret;
5385 }
5386 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
5387
5388 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
5389 /**
5390 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5391 * @buffer_a: One buffer to swap with
5392 * @buffer_b: The other buffer to swap with
5393 * @cpu: the CPU of the buffers to swap
5394 *
5395 * This function is useful for tracers that want to take a "snapshot"
5396 * of a CPU buffer and has another back up buffer lying around.
5397 * it is expected that the tracer handles the cpu buffer not being
5398 * used at the moment.
5399 */
ring_buffer_swap_cpu(struct trace_buffer * buffer_a,struct trace_buffer * buffer_b,int cpu)5400 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5401 struct trace_buffer *buffer_b, int cpu)
5402 {
5403 struct ring_buffer_per_cpu *cpu_buffer_a;
5404 struct ring_buffer_per_cpu *cpu_buffer_b;
5405 int ret = -EINVAL;
5406
5407 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5408 !cpumask_test_cpu(cpu, buffer_b->cpumask))
5409 goto out;
5410
5411 cpu_buffer_a = buffer_a->buffers[cpu];
5412 cpu_buffer_b = buffer_b->buffers[cpu];
5413
5414 /* At least make sure the two buffers are somewhat the same */
5415 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
5416 goto out;
5417
5418 ret = -EAGAIN;
5419
5420 if (atomic_read(&buffer_a->record_disabled))
5421 goto out;
5422
5423 if (atomic_read(&buffer_b->record_disabled))
5424 goto out;
5425
5426 if (atomic_read(&cpu_buffer_a->record_disabled))
5427 goto out;
5428
5429 if (atomic_read(&cpu_buffer_b->record_disabled))
5430 goto out;
5431
5432 /*
5433 * We can't do a synchronize_rcu here because this
5434 * function can be called in atomic context.
5435 * Normally this will be called from the same CPU as cpu.
5436 * If not it's up to the caller to protect this.
5437 */
5438 atomic_inc(&cpu_buffer_a->record_disabled);
5439 atomic_inc(&cpu_buffer_b->record_disabled);
5440
5441 ret = -EBUSY;
5442 if (local_read(&cpu_buffer_a->committing))
5443 goto out_dec;
5444 if (local_read(&cpu_buffer_b->committing))
5445 goto out_dec;
5446
5447 /*
5448 * When resize is in progress, we cannot swap it because
5449 * it will mess the state of the cpu buffer.
5450 */
5451 if (atomic_read(&buffer_a->resizing))
5452 goto out_dec;
5453 if (atomic_read(&buffer_b->resizing))
5454 goto out_dec;
5455
5456 buffer_a->buffers[cpu] = cpu_buffer_b;
5457 buffer_b->buffers[cpu] = cpu_buffer_a;
5458
5459 cpu_buffer_b->buffer = buffer_a;
5460 cpu_buffer_a->buffer = buffer_b;
5461
5462 ret = 0;
5463
5464 out_dec:
5465 atomic_dec(&cpu_buffer_a->record_disabled);
5466 atomic_dec(&cpu_buffer_b->record_disabled);
5467 out:
5468 return ret;
5469 }
5470 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
5471 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
5472
5473 /**
5474 * ring_buffer_alloc_read_page - allocate a page to read from buffer
5475 * @buffer: the buffer to allocate for.
5476 * @cpu: the cpu buffer to allocate.
5477 *
5478 * This function is used in conjunction with ring_buffer_read_page.
5479 * When reading a full page from the ring buffer, these functions
5480 * can be used to speed up the process. The calling function should
5481 * allocate a few pages first with this function. Then when it
5482 * needs to get pages from the ring buffer, it passes the result
5483 * of this function into ring_buffer_read_page, which will swap
5484 * the page that was allocated, with the read page of the buffer.
5485 *
5486 * Returns:
5487 * The page allocated, or ERR_PTR
5488 */
ring_buffer_alloc_read_page(struct trace_buffer * buffer,int cpu)5489 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
5490 {
5491 struct ring_buffer_per_cpu *cpu_buffer;
5492 struct buffer_data_page *bpage = NULL;
5493 unsigned long flags;
5494 struct page *page;
5495
5496 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5497 return ERR_PTR(-ENODEV);
5498
5499 cpu_buffer = buffer->buffers[cpu];
5500 local_irq_save(flags);
5501 arch_spin_lock(&cpu_buffer->lock);
5502
5503 if (cpu_buffer->free_page) {
5504 bpage = cpu_buffer->free_page;
5505 cpu_buffer->free_page = NULL;
5506 }
5507
5508 arch_spin_unlock(&cpu_buffer->lock);
5509 local_irq_restore(flags);
5510
5511 if (bpage)
5512 goto out;
5513
5514 page = alloc_pages_node(cpu_to_node(cpu),
5515 GFP_KERNEL | __GFP_NORETRY, 0);
5516 if (!page)
5517 return ERR_PTR(-ENOMEM);
5518
5519 bpage = page_address(page);
5520
5521 out:
5522 rb_init_page(bpage);
5523
5524 return bpage;
5525 }
5526 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
5527
5528 /**
5529 * ring_buffer_free_read_page - free an allocated read page
5530 * @buffer: the buffer the page was allocate for
5531 * @cpu: the cpu buffer the page came from
5532 * @data: the page to free
5533 *
5534 * Free a page allocated from ring_buffer_alloc_read_page.
5535 */
ring_buffer_free_read_page(struct trace_buffer * buffer,int cpu,void * data)5536 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data)
5537 {
5538 struct ring_buffer_per_cpu *cpu_buffer;
5539 struct buffer_data_page *bpage = data;
5540 struct page *page = virt_to_page(bpage);
5541 unsigned long flags;
5542
5543 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5544 return;
5545
5546 cpu_buffer = buffer->buffers[cpu];
5547
5548 /* If the page is still in use someplace else, we can't reuse it */
5549 if (page_ref_count(page) > 1)
5550 goto out;
5551
5552 local_irq_save(flags);
5553 arch_spin_lock(&cpu_buffer->lock);
5554
5555 if (!cpu_buffer->free_page) {
5556 cpu_buffer->free_page = bpage;
5557 bpage = NULL;
5558 }
5559
5560 arch_spin_unlock(&cpu_buffer->lock);
5561 local_irq_restore(flags);
5562
5563 out:
5564 free_page((unsigned long)bpage);
5565 }
5566 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
5567
5568 /**
5569 * ring_buffer_read_page - extract a page from the ring buffer
5570 * @buffer: buffer to extract from
5571 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
5572 * @len: amount to extract
5573 * @cpu: the cpu of the buffer to extract
5574 * @full: should the extraction only happen when the page is full.
5575 *
5576 * This function will pull out a page from the ring buffer and consume it.
5577 * @data_page must be the address of the variable that was returned
5578 * from ring_buffer_alloc_read_page. This is because the page might be used
5579 * to swap with a page in the ring buffer.
5580 *
5581 * for example:
5582 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
5583 * if (IS_ERR(rpage))
5584 * return PTR_ERR(rpage);
5585 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
5586 * if (ret >= 0)
5587 * process_page(rpage, ret);
5588 *
5589 * When @full is set, the function will not return true unless
5590 * the writer is off the reader page.
5591 *
5592 * Note: it is up to the calling functions to handle sleeps and wakeups.
5593 * The ring buffer can be used anywhere in the kernel and can not
5594 * blindly call wake_up. The layer that uses the ring buffer must be
5595 * responsible for that.
5596 *
5597 * Returns:
5598 * >=0 if data has been transferred, returns the offset of consumed data.
5599 * <0 if no data has been transferred.
5600 */
ring_buffer_read_page(struct trace_buffer * buffer,void ** data_page,size_t len,int cpu,int full)5601 int ring_buffer_read_page(struct trace_buffer *buffer,
5602 void **data_page, size_t len, int cpu, int full)
5603 {
5604 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5605 struct ring_buffer_event *event;
5606 struct buffer_data_page *bpage;
5607 struct buffer_page *reader;
5608 unsigned long missed_events;
5609 unsigned long flags;
5610 unsigned int commit;
5611 unsigned int read;
5612 u64 save_timestamp;
5613 int ret = -1;
5614
5615 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5616 goto out;
5617
5618 /*
5619 * If len is not big enough to hold the page header, then
5620 * we can not copy anything.
5621 */
5622 if (len <= BUF_PAGE_HDR_SIZE)
5623 goto out;
5624
5625 len -= BUF_PAGE_HDR_SIZE;
5626
5627 if (!data_page)
5628 goto out;
5629
5630 bpage = *data_page;
5631 if (!bpage)
5632 goto out;
5633
5634 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5635
5636 reader = rb_get_reader_page(cpu_buffer);
5637 if (!reader)
5638 goto out_unlock;
5639
5640 event = rb_reader_event(cpu_buffer);
5641
5642 read = reader->read;
5643 commit = rb_page_commit(reader);
5644
5645 /* Check if any events were dropped */
5646 missed_events = cpu_buffer->lost_events;
5647
5648 /*
5649 * If this page has been partially read or
5650 * if len is not big enough to read the rest of the page or
5651 * a writer is still on the page, then
5652 * we must copy the data from the page to the buffer.
5653 * Otherwise, we can simply swap the page with the one passed in.
5654 */
5655 if (read || (len < (commit - read)) ||
5656 cpu_buffer->reader_page == cpu_buffer->commit_page) {
5657 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
5658 unsigned int rpos = read;
5659 unsigned int pos = 0;
5660 unsigned int size;
5661
5662 /*
5663 * If a full page is expected, this can still be returned
5664 * if there's been a previous partial read and the
5665 * rest of the page can be read and the commit page is off
5666 * the reader page.
5667 */
5668 if (full &&
5669 (!read || (len < (commit - read)) ||
5670 cpu_buffer->reader_page == cpu_buffer->commit_page))
5671 goto out_unlock;
5672
5673 if (len > (commit - read))
5674 len = (commit - read);
5675
5676 /* Always keep the time extend and data together */
5677 size = rb_event_ts_length(event);
5678
5679 if (len < size)
5680 goto out_unlock;
5681
5682 /* save the current timestamp, since the user will need it */
5683 save_timestamp = cpu_buffer->read_stamp;
5684
5685 /* Need to copy one event at a time */
5686 do {
5687 /* We need the size of one event, because
5688 * rb_advance_reader only advances by one event,
5689 * whereas rb_event_ts_length may include the size of
5690 * one or two events.
5691 * We have already ensured there's enough space if this
5692 * is a time extend. */
5693 size = rb_event_length(event);
5694 memcpy(bpage->data + pos, rpage->data + rpos, size);
5695
5696 len -= size;
5697
5698 rb_advance_reader(cpu_buffer);
5699 rpos = reader->read;
5700 pos += size;
5701
5702 if (rpos >= commit)
5703 break;
5704
5705 event = rb_reader_event(cpu_buffer);
5706 /* Always keep the time extend and data together */
5707 size = rb_event_ts_length(event);
5708 } while (len >= size);
5709
5710 /* update bpage */
5711 local_set(&bpage->commit, pos);
5712 bpage->time_stamp = save_timestamp;
5713
5714 /* we copied everything to the beginning */
5715 read = 0;
5716 } else {
5717 /* update the entry counter */
5718 cpu_buffer->read += rb_page_entries(reader);
5719 cpu_buffer->read_bytes += rb_page_commit(reader);
5720
5721 /* swap the pages */
5722 rb_init_page(bpage);
5723 bpage = reader->page;
5724 reader->page = *data_page;
5725 local_set(&reader->write, 0);
5726 local_set(&reader->entries, 0);
5727 reader->read = 0;
5728 *data_page = bpage;
5729
5730 /*
5731 * Use the real_end for the data size,
5732 * This gives us a chance to store the lost events
5733 * on the page.
5734 */
5735 if (reader->real_end)
5736 local_set(&bpage->commit, reader->real_end);
5737 }
5738 ret = read;
5739
5740 cpu_buffer->lost_events = 0;
5741
5742 commit = local_read(&bpage->commit);
5743 /*
5744 * Set a flag in the commit field if we lost events
5745 */
5746 if (missed_events) {
5747 /* If there is room at the end of the page to save the
5748 * missed events, then record it there.
5749 */
5750 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5751 memcpy(&bpage->data[commit], &missed_events,
5752 sizeof(missed_events));
5753 local_add(RB_MISSED_STORED, &bpage->commit);
5754 commit += sizeof(missed_events);
5755 }
5756 local_add(RB_MISSED_EVENTS, &bpage->commit);
5757 }
5758
5759 /*
5760 * This page may be off to user land. Zero it out here.
5761 */
5762 if (commit < BUF_PAGE_SIZE)
5763 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5764
5765 out_unlock:
5766 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5767
5768 out:
5769 return ret;
5770 }
5771 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5772
5773 /*
5774 * We only allocate new buffers, never free them if the CPU goes down.
5775 * If we were to free the buffer, then the user would lose any trace that was in
5776 * the buffer.
5777 */
trace_rb_cpu_prepare(unsigned int cpu,struct hlist_node * node)5778 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5779 {
5780 struct trace_buffer *buffer;
5781 long nr_pages_same;
5782 int cpu_i;
5783 unsigned long nr_pages;
5784
5785 buffer = container_of(node, struct trace_buffer, node);
5786 if (cpumask_test_cpu(cpu, buffer->cpumask))
5787 return 0;
5788
5789 nr_pages = 0;
5790 nr_pages_same = 1;
5791 /* check if all cpu sizes are same */
5792 for_each_buffer_cpu(buffer, cpu_i) {
5793 /* fill in the size from first enabled cpu */
5794 if (nr_pages == 0)
5795 nr_pages = buffer->buffers[cpu_i]->nr_pages;
5796 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5797 nr_pages_same = 0;
5798 break;
5799 }
5800 }
5801 /* allocate minimum pages, user can later expand it */
5802 if (!nr_pages_same)
5803 nr_pages = 2;
5804 buffer->buffers[cpu] =
5805 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
5806 if (!buffer->buffers[cpu]) {
5807 WARN(1, "failed to allocate ring buffer on CPU %u\n",
5808 cpu);
5809 return -ENOMEM;
5810 }
5811 smp_wmb();
5812 cpumask_set_cpu(cpu, buffer->cpumask);
5813 return 0;
5814 }
5815
5816 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
5817 /*
5818 * This is a basic integrity check of the ring buffer.
5819 * Late in the boot cycle this test will run when configured in.
5820 * It will kick off a thread per CPU that will go into a loop
5821 * writing to the per cpu ring buffer various sizes of data.
5822 * Some of the data will be large items, some small.
5823 *
5824 * Another thread is created that goes into a spin, sending out
5825 * IPIs to the other CPUs to also write into the ring buffer.
5826 * this is to test the nesting ability of the buffer.
5827 *
5828 * Basic stats are recorded and reported. If something in the
5829 * ring buffer should happen that's not expected, a big warning
5830 * is displayed and all ring buffers are disabled.
5831 */
5832 static struct task_struct *rb_threads[NR_CPUS] __initdata;
5833
5834 struct rb_test_data {
5835 struct trace_buffer *buffer;
5836 unsigned long events;
5837 unsigned long bytes_written;
5838 unsigned long bytes_alloc;
5839 unsigned long bytes_dropped;
5840 unsigned long events_nested;
5841 unsigned long bytes_written_nested;
5842 unsigned long bytes_alloc_nested;
5843 unsigned long bytes_dropped_nested;
5844 int min_size_nested;
5845 int max_size_nested;
5846 int max_size;
5847 int min_size;
5848 int cpu;
5849 int cnt;
5850 };
5851
5852 static struct rb_test_data rb_data[NR_CPUS] __initdata;
5853
5854 /* 1 meg per cpu */
5855 #define RB_TEST_BUFFER_SIZE 1048576
5856
5857 static char rb_string[] __initdata =
5858 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
5859 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
5860 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
5861
5862 static bool rb_test_started __initdata;
5863
5864 struct rb_item {
5865 int size;
5866 char str[];
5867 };
5868
rb_write_something(struct rb_test_data * data,bool nested)5869 static __init int rb_write_something(struct rb_test_data *data, bool nested)
5870 {
5871 struct ring_buffer_event *event;
5872 struct rb_item *item;
5873 bool started;
5874 int event_len;
5875 int size;
5876 int len;
5877 int cnt;
5878
5879 /* Have nested writes different that what is written */
5880 cnt = data->cnt + (nested ? 27 : 0);
5881
5882 /* Multiply cnt by ~e, to make some unique increment */
5883 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
5884
5885 len = size + sizeof(struct rb_item);
5886
5887 started = rb_test_started;
5888 /* read rb_test_started before checking buffer enabled */
5889 smp_rmb();
5890
5891 event = ring_buffer_lock_reserve(data->buffer, len);
5892 if (!event) {
5893 /* Ignore dropped events before test starts. */
5894 if (started) {
5895 if (nested)
5896 data->bytes_dropped += len;
5897 else
5898 data->bytes_dropped_nested += len;
5899 }
5900 return len;
5901 }
5902
5903 event_len = ring_buffer_event_length(event);
5904
5905 if (RB_WARN_ON(data->buffer, event_len < len))
5906 goto out;
5907
5908 item = ring_buffer_event_data(event);
5909 item->size = size;
5910 memcpy(item->str, rb_string, size);
5911
5912 if (nested) {
5913 data->bytes_alloc_nested += event_len;
5914 data->bytes_written_nested += len;
5915 data->events_nested++;
5916 if (!data->min_size_nested || len < data->min_size_nested)
5917 data->min_size_nested = len;
5918 if (len > data->max_size_nested)
5919 data->max_size_nested = len;
5920 } else {
5921 data->bytes_alloc += event_len;
5922 data->bytes_written += len;
5923 data->events++;
5924 if (!data->min_size || len < data->min_size)
5925 data->max_size = len;
5926 if (len > data->max_size)
5927 data->max_size = len;
5928 }
5929
5930 out:
5931 ring_buffer_unlock_commit(data->buffer, event);
5932
5933 return 0;
5934 }
5935
rb_test(void * arg)5936 static __init int rb_test(void *arg)
5937 {
5938 struct rb_test_data *data = arg;
5939
5940 while (!kthread_should_stop()) {
5941 rb_write_something(data, false);
5942 data->cnt++;
5943
5944 set_current_state(TASK_INTERRUPTIBLE);
5945 /* Now sleep between a min of 100-300us and a max of 1ms */
5946 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5947 }
5948
5949 return 0;
5950 }
5951
rb_ipi(void * ignore)5952 static __init void rb_ipi(void *ignore)
5953 {
5954 struct rb_test_data *data;
5955 int cpu = smp_processor_id();
5956
5957 data = &rb_data[cpu];
5958 rb_write_something(data, true);
5959 }
5960
rb_hammer_test(void * arg)5961 static __init int rb_hammer_test(void *arg)
5962 {
5963 while (!kthread_should_stop()) {
5964
5965 /* Send an IPI to all cpus to write data! */
5966 smp_call_function(rb_ipi, NULL, 1);
5967 /* No sleep, but for non preempt, let others run */
5968 schedule();
5969 }
5970
5971 return 0;
5972 }
5973
test_ringbuffer(void)5974 static __init int test_ringbuffer(void)
5975 {
5976 struct task_struct *rb_hammer;
5977 struct trace_buffer *buffer;
5978 int cpu;
5979 int ret = 0;
5980
5981 if (security_locked_down(LOCKDOWN_TRACEFS)) {
5982 pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
5983 return 0;
5984 }
5985
5986 pr_info("Running ring buffer tests...\n");
5987
5988 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5989 if (WARN_ON(!buffer))
5990 return 0;
5991
5992 /* Disable buffer so that threads can't write to it yet */
5993 ring_buffer_record_off(buffer);
5994
5995 for_each_online_cpu(cpu) {
5996 rb_data[cpu].buffer = buffer;
5997 rb_data[cpu].cpu = cpu;
5998 rb_data[cpu].cnt = cpu;
5999 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
6000 "rbtester/%d", cpu);
6001 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
6002 pr_cont("FAILED\n");
6003 ret = PTR_ERR(rb_threads[cpu]);
6004 goto out_free;
6005 }
6006
6007 kthread_bind(rb_threads[cpu], cpu);
6008 wake_up_process(rb_threads[cpu]);
6009 }
6010
6011 /* Now create the rb hammer! */
6012 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
6013 if (WARN_ON(IS_ERR(rb_hammer))) {
6014 pr_cont("FAILED\n");
6015 ret = PTR_ERR(rb_hammer);
6016 goto out_free;
6017 }
6018
6019 ring_buffer_record_on(buffer);
6020 /*
6021 * Show buffer is enabled before setting rb_test_started.
6022 * Yes there's a small race window where events could be
6023 * dropped and the thread wont catch it. But when a ring
6024 * buffer gets enabled, there will always be some kind of
6025 * delay before other CPUs see it. Thus, we don't care about
6026 * those dropped events. We care about events dropped after
6027 * the threads see that the buffer is active.
6028 */
6029 smp_wmb();
6030 rb_test_started = true;
6031
6032 set_current_state(TASK_INTERRUPTIBLE);
6033 /* Just run for 10 seconds */;
6034 schedule_timeout(10 * HZ);
6035
6036 kthread_stop(rb_hammer);
6037
6038 out_free:
6039 for_each_online_cpu(cpu) {
6040 if (!rb_threads[cpu])
6041 break;
6042 kthread_stop(rb_threads[cpu]);
6043 }
6044 if (ret) {
6045 ring_buffer_free(buffer);
6046 return ret;
6047 }
6048
6049 /* Report! */
6050 pr_info("finished\n");
6051 for_each_online_cpu(cpu) {
6052 struct ring_buffer_event *event;
6053 struct rb_test_data *data = &rb_data[cpu];
6054 struct rb_item *item;
6055 unsigned long total_events;
6056 unsigned long total_dropped;
6057 unsigned long total_written;
6058 unsigned long total_alloc;
6059 unsigned long total_read = 0;
6060 unsigned long total_size = 0;
6061 unsigned long total_len = 0;
6062 unsigned long total_lost = 0;
6063 unsigned long lost;
6064 int big_event_size;
6065 int small_event_size;
6066
6067 ret = -1;
6068
6069 total_events = data->events + data->events_nested;
6070 total_written = data->bytes_written + data->bytes_written_nested;
6071 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
6072 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
6073
6074 big_event_size = data->max_size + data->max_size_nested;
6075 small_event_size = data->min_size + data->min_size_nested;
6076
6077 pr_info("CPU %d:\n", cpu);
6078 pr_info(" events: %ld\n", total_events);
6079 pr_info(" dropped bytes: %ld\n", total_dropped);
6080 pr_info(" alloced bytes: %ld\n", total_alloc);
6081 pr_info(" written bytes: %ld\n", total_written);
6082 pr_info(" biggest event: %d\n", big_event_size);
6083 pr_info(" smallest event: %d\n", small_event_size);
6084
6085 if (RB_WARN_ON(buffer, total_dropped))
6086 break;
6087
6088 ret = 0;
6089
6090 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
6091 total_lost += lost;
6092 item = ring_buffer_event_data(event);
6093 total_len += ring_buffer_event_length(event);
6094 total_size += item->size + sizeof(struct rb_item);
6095 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
6096 pr_info("FAILED!\n");
6097 pr_info("buffer had: %.*s\n", item->size, item->str);
6098 pr_info("expected: %.*s\n", item->size, rb_string);
6099 RB_WARN_ON(buffer, 1);
6100 ret = -1;
6101 break;
6102 }
6103 total_read++;
6104 }
6105 if (ret)
6106 break;
6107
6108 ret = -1;
6109
6110 pr_info(" read events: %ld\n", total_read);
6111 pr_info(" lost events: %ld\n", total_lost);
6112 pr_info(" total events: %ld\n", total_lost + total_read);
6113 pr_info(" recorded len bytes: %ld\n", total_len);
6114 pr_info(" recorded size bytes: %ld\n", total_size);
6115 if (total_lost)
6116 pr_info(" With dropped events, record len and size may not match\n"
6117 " alloced and written from above\n");
6118 if (!total_lost) {
6119 if (RB_WARN_ON(buffer, total_len != total_alloc ||
6120 total_size != total_written))
6121 break;
6122 }
6123 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
6124 break;
6125
6126 ret = 0;
6127 }
6128 if (!ret)
6129 pr_info("Ring buffer PASSED!\n");
6130
6131 ring_buffer_free(buffer);
6132 return 0;
6133 }
6134
6135 late_initcall(test_ringbuffer);
6136 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
6137