1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
7 #include <linux/trace_events.h>
8 #include <linux/ring_buffer.h>
9 #include <linux/trace_clock.h>
10 #include <linux/sched/clock.h>
11 #include <linux/trace_seq.h>
12 #include <linux/spinlock.h>
13 #include <linux/irq_work.h>
14 #include <linux/security.h>
15 #include <linux/uaccess.h>
16 #include <linux/hardirq.h>
17 #include <linux/kthread.h> /* for self test */
18 #include <linux/module.h>
19 #include <linux/percpu.h>
20 #include <linux/mutex.h>
21 #include <linux/delay.h>
22 #include <linux/slab.h>
23 #include <linux/init.h>
24 #include <linux/hash.h>
25 #include <linux/list.h>
26 #include <linux/cpu.h>
27 #include <linux/oom.h>
28
29 #include <asm/local.h>
30
31 static void update_pages_handler(struct work_struct *work);
32
33 /*
34 * The ring buffer header is special. We must manually up keep it.
35 */
ring_buffer_print_entry_header(struct trace_seq * s)36 int ring_buffer_print_entry_header(struct trace_seq *s)
37 {
38 trace_seq_puts(s, "# compressed entry header\n");
39 trace_seq_puts(s, "\ttype_len : 5 bits\n");
40 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
41 trace_seq_puts(s, "\tarray : 32 bits\n");
42 trace_seq_putc(s, '\n');
43 trace_seq_printf(s, "\tpadding : type == %d\n",
44 RINGBUF_TYPE_PADDING);
45 trace_seq_printf(s, "\ttime_extend : type == %d\n",
46 RINGBUF_TYPE_TIME_EXTEND);
47 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
48 RINGBUF_TYPE_TIME_STAMP);
49 trace_seq_printf(s, "\tdata max type_len == %d\n",
50 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
51
52 return !trace_seq_has_overflowed(s);
53 }
54
55 /*
56 * The ring buffer is made up of a list of pages. A separate list of pages is
57 * allocated for each CPU. A writer may only write to a buffer that is
58 * associated with the CPU it is currently executing on. A reader may read
59 * from any per cpu buffer.
60 *
61 * The reader is special. For each per cpu buffer, the reader has its own
62 * reader page. When a reader has read the entire reader page, this reader
63 * page is swapped with another page in the ring buffer.
64 *
65 * Now, as long as the writer is off the reader page, the reader can do what
66 * ever it wants with that page. The writer will never write to that page
67 * again (as long as it is out of the ring buffer).
68 *
69 * Here's some silly ASCII art.
70 *
71 * +------+
72 * |reader| RING BUFFER
73 * |page |
74 * +------+ +---+ +---+ +---+
75 * | |-->| |-->| |
76 * +---+ +---+ +---+
77 * ^ |
78 * | |
79 * +---------------+
80 *
81 *
82 * +------+
83 * |reader| RING BUFFER
84 * |page |------------------v
85 * +------+ +---+ +---+ +---+
86 * | |-->| |-->| |
87 * +---+ +---+ +---+
88 * ^ |
89 * | |
90 * +---------------+
91 *
92 *
93 * +------+
94 * |reader| RING BUFFER
95 * |page |------------------v
96 * +------+ +---+ +---+ +---+
97 * ^ | |-->| |-->| |
98 * | +---+ +---+ +---+
99 * | |
100 * | |
101 * +------------------------------+
102 *
103 *
104 * +------+
105 * |buffer| RING BUFFER
106 * |page |------------------v
107 * +------+ +---+ +---+ +---+
108 * ^ | | | |-->| |
109 * | New +---+ +---+ +---+
110 * | Reader------^ |
111 * | page |
112 * +------------------------------+
113 *
114 *
115 * After we make this swap, the reader can hand this page off to the splice
116 * code and be done with it. It can even allocate a new page if it needs to
117 * and swap that into the ring buffer.
118 *
119 * We will be using cmpxchg soon to make all this lockless.
120 *
121 */
122
123 /* Used for individual buffers (after the counter) */
124 #define RB_BUFFER_OFF (1 << 20)
125
126 #define BUF_PAGE_HDR_SIZE offsetof(struct buffer_data_page, data)
127
128 #define RB_EVNT_HDR_SIZE (offsetof(struct ring_buffer_event, array))
129 #define RB_ALIGNMENT 4U
130 #define RB_MAX_SMALL_DATA (RB_ALIGNMENT * RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
131 #define RB_EVNT_MIN_SIZE 8U /* two 32bit words */
132
133 #ifndef CONFIG_HAVE_64BIT_ALIGNED_ACCESS
134 # define RB_FORCE_8BYTE_ALIGNMENT 0
135 # define RB_ARCH_ALIGNMENT RB_ALIGNMENT
136 #else
137 # define RB_FORCE_8BYTE_ALIGNMENT 1
138 # define RB_ARCH_ALIGNMENT 8U
139 #endif
140
141 #define RB_ALIGN_DATA __aligned(RB_ARCH_ALIGNMENT)
142
143 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
144 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
145
146 enum {
147 RB_LEN_TIME_EXTEND = 8,
148 RB_LEN_TIME_STAMP = 8,
149 };
150
151 #define skip_time_extend(event) \
152 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
153
154 #define extended_time(event) \
155 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
156
rb_null_event(struct ring_buffer_event * event)157 static inline int rb_null_event(struct ring_buffer_event *event)
158 {
159 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
160 }
161
rb_event_set_padding(struct ring_buffer_event * event)162 static void rb_event_set_padding(struct ring_buffer_event *event)
163 {
164 /* padding has a NULL time_delta */
165 event->type_len = RINGBUF_TYPE_PADDING;
166 event->time_delta = 0;
167 }
168
169 static unsigned
rb_event_data_length(struct ring_buffer_event * event)170 rb_event_data_length(struct ring_buffer_event *event)
171 {
172 unsigned length;
173
174 if (event->type_len)
175 length = event->type_len * RB_ALIGNMENT;
176 else
177 length = event->array[0];
178 return length + RB_EVNT_HDR_SIZE;
179 }
180
181 /*
182 * Return the length of the given event. Will return
183 * the length of the time extend if the event is a
184 * time extend.
185 */
186 static inline unsigned
rb_event_length(struct ring_buffer_event * event)187 rb_event_length(struct ring_buffer_event *event)
188 {
189 switch (event->type_len) {
190 case RINGBUF_TYPE_PADDING:
191 if (rb_null_event(event))
192 /* undefined */
193 return -1;
194 return event->array[0] + RB_EVNT_HDR_SIZE;
195
196 case RINGBUF_TYPE_TIME_EXTEND:
197 return RB_LEN_TIME_EXTEND;
198
199 case RINGBUF_TYPE_TIME_STAMP:
200 return RB_LEN_TIME_STAMP;
201
202 case RINGBUF_TYPE_DATA:
203 return rb_event_data_length(event);
204 default:
205 WARN_ON_ONCE(1);
206 }
207 /* not hit */
208 return 0;
209 }
210
211 /*
212 * Return total length of time extend and data,
213 * or just the event length for all other events.
214 */
215 static inline unsigned
rb_event_ts_length(struct ring_buffer_event * event)216 rb_event_ts_length(struct ring_buffer_event *event)
217 {
218 unsigned len = 0;
219
220 if (extended_time(event)) {
221 /* time extends include the data event after it */
222 len = RB_LEN_TIME_EXTEND;
223 event = skip_time_extend(event);
224 }
225 return len + rb_event_length(event);
226 }
227
228 /**
229 * ring_buffer_event_length - return the length of the event
230 * @event: the event to get the length of
231 *
232 * Returns the size of the data load of a data event.
233 * If the event is something other than a data event, it
234 * returns the size of the event itself. With the exception
235 * of a TIME EXTEND, where it still returns the size of the
236 * data load of the data event after it.
237 */
ring_buffer_event_length(struct ring_buffer_event * event)238 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
239 {
240 unsigned length;
241
242 if (extended_time(event))
243 event = skip_time_extend(event);
244
245 length = rb_event_length(event);
246 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
247 return length;
248 length -= RB_EVNT_HDR_SIZE;
249 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
250 length -= sizeof(event->array[0]);
251 return length;
252 }
253 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
254
255 /* inline for ring buffer fast paths */
256 static __always_inline void *
rb_event_data(struct ring_buffer_event * event)257 rb_event_data(struct ring_buffer_event *event)
258 {
259 if (extended_time(event))
260 event = skip_time_extend(event);
261 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
262 /* If length is in len field, then array[0] has the data */
263 if (event->type_len)
264 return (void *)&event->array[0];
265 /* Otherwise length is in array[0] and array[1] has the data */
266 return (void *)&event->array[1];
267 }
268
269 /**
270 * ring_buffer_event_data - return the data of the event
271 * @event: the event to get the data from
272 */
ring_buffer_event_data(struct ring_buffer_event * event)273 void *ring_buffer_event_data(struct ring_buffer_event *event)
274 {
275 return rb_event_data(event);
276 }
277 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
278
279 #define for_each_buffer_cpu(buffer, cpu) \
280 for_each_cpu(cpu, buffer->cpumask)
281
282 #define for_each_online_buffer_cpu(buffer, cpu) \
283 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
284
285 #define TS_SHIFT 27
286 #define TS_MASK ((1ULL << TS_SHIFT) - 1)
287 #define TS_DELTA_TEST (~TS_MASK)
288
289 /**
290 * ring_buffer_event_time_stamp - return the event's extended timestamp
291 * @event: the event to get the timestamp of
292 *
293 * Returns the extended timestamp associated with a data event.
294 * An extended time_stamp is a 64-bit timestamp represented
295 * internally in a special way that makes the best use of space
296 * contained within a ring buffer event. This function decodes
297 * it and maps it to a straight u64 value.
298 */
ring_buffer_event_time_stamp(struct ring_buffer_event * event)299 u64 ring_buffer_event_time_stamp(struct ring_buffer_event *event)
300 {
301 u64 ts;
302
303 ts = event->array[0];
304 ts <<= TS_SHIFT;
305 ts += event->time_delta;
306
307 return ts;
308 }
309
310 /* Flag when events were overwritten */
311 #define RB_MISSED_EVENTS (1 << 31)
312 /* Missed count stored at end */
313 #define RB_MISSED_STORED (1 << 30)
314
315 struct buffer_data_page {
316 u64 time_stamp; /* page time stamp */
317 local_t commit; /* write committed index */
318 unsigned char data[] RB_ALIGN_DATA; /* data of buffer page */
319 };
320
321 /*
322 * Note, the buffer_page list must be first. The buffer pages
323 * are allocated in cache lines, which means that each buffer
324 * page will be at the beginning of a cache line, and thus
325 * the least significant bits will be zero. We use this to
326 * add flags in the list struct pointers, to make the ring buffer
327 * lockless.
328 */
329 struct buffer_page {
330 struct list_head list; /* list of buffer pages */
331 local_t write; /* index for next write */
332 unsigned read; /* index for next read */
333 local_t entries; /* entries on this page */
334 unsigned long real_end; /* real end of data */
335 struct buffer_data_page *page; /* Actual data page */
336 };
337
338 /*
339 * The buffer page counters, write and entries, must be reset
340 * atomically when crossing page boundaries. To synchronize this
341 * update, two counters are inserted into the number. One is
342 * the actual counter for the write position or count on the page.
343 *
344 * The other is a counter of updaters. Before an update happens
345 * the update partition of the counter is incremented. This will
346 * allow the updater to update the counter atomically.
347 *
348 * The counter is 20 bits, and the state data is 12.
349 */
350 #define RB_WRITE_MASK 0xfffff
351 #define RB_WRITE_INTCNT (1 << 20)
352
rb_init_page(struct buffer_data_page * bpage)353 static void rb_init_page(struct buffer_data_page *bpage)
354 {
355 local_set(&bpage->commit, 0);
356 }
357
rb_page_commit(struct buffer_page * bpage)358 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
359 {
360 return local_read(&bpage->page->commit);
361 }
362
free_buffer_page(struct buffer_page * bpage)363 static void free_buffer_page(struct buffer_page *bpage)
364 {
365 free_page((unsigned long)bpage->page);
366 kfree(bpage);
367 }
368
369 /*
370 * We need to fit the time_stamp delta into 27 bits.
371 */
test_time_stamp(u64 delta)372 static inline int test_time_stamp(u64 delta)
373 {
374 if (delta & TS_DELTA_TEST)
375 return 1;
376 return 0;
377 }
378
379 #define BUF_PAGE_SIZE (PAGE_SIZE - BUF_PAGE_HDR_SIZE)
380
381 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
382 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
383
ring_buffer_print_page_header(struct trace_seq * s)384 int ring_buffer_print_page_header(struct trace_seq *s)
385 {
386 struct buffer_data_page field;
387
388 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
389 "offset:0;\tsize:%u;\tsigned:%u;\n",
390 (unsigned int)sizeof(field.time_stamp),
391 (unsigned int)is_signed_type(u64));
392
393 trace_seq_printf(s, "\tfield: local_t commit;\t"
394 "offset:%u;\tsize:%u;\tsigned:%u;\n",
395 (unsigned int)offsetof(typeof(field), commit),
396 (unsigned int)sizeof(field.commit),
397 (unsigned int)is_signed_type(long));
398
399 trace_seq_printf(s, "\tfield: int overwrite;\t"
400 "offset:%u;\tsize:%u;\tsigned:%u;\n",
401 (unsigned int)offsetof(typeof(field), commit),
402 1,
403 (unsigned int)is_signed_type(long));
404
405 trace_seq_printf(s, "\tfield: char data;\t"
406 "offset:%u;\tsize:%u;\tsigned:%u;\n",
407 (unsigned int)offsetof(typeof(field), data),
408 (unsigned int)BUF_PAGE_SIZE,
409 (unsigned int)is_signed_type(char));
410
411 return !trace_seq_has_overflowed(s);
412 }
413
414 struct rb_irq_work {
415 struct irq_work work;
416 wait_queue_head_t waiters;
417 wait_queue_head_t full_waiters;
418 long wait_index;
419 bool waiters_pending;
420 bool full_waiters_pending;
421 bool wakeup_full;
422 };
423
424 /*
425 * Structure to hold event state and handle nested events.
426 */
427 struct rb_event_info {
428 u64 ts;
429 u64 delta;
430 u64 before;
431 u64 after;
432 unsigned long length;
433 struct buffer_page *tail_page;
434 int add_timestamp;
435 };
436
437 /*
438 * Used for the add_timestamp
439 * NONE
440 * EXTEND - wants a time extend
441 * ABSOLUTE - the buffer requests all events to have absolute time stamps
442 * FORCE - force a full time stamp.
443 */
444 enum {
445 RB_ADD_STAMP_NONE = 0,
446 RB_ADD_STAMP_EXTEND = BIT(1),
447 RB_ADD_STAMP_ABSOLUTE = BIT(2),
448 RB_ADD_STAMP_FORCE = BIT(3)
449 };
450 /*
451 * Used for which event context the event is in.
452 * TRANSITION = 0
453 * NMI = 1
454 * IRQ = 2
455 * SOFTIRQ = 3
456 * NORMAL = 4
457 *
458 * See trace_recursive_lock() comment below for more details.
459 */
460 enum {
461 RB_CTX_TRANSITION,
462 RB_CTX_NMI,
463 RB_CTX_IRQ,
464 RB_CTX_SOFTIRQ,
465 RB_CTX_NORMAL,
466 RB_CTX_MAX
467 };
468
469 #if BITS_PER_LONG == 32
470 #define RB_TIME_32
471 #endif
472
473 /* To test on 64 bit machines */
474 //#define RB_TIME_32
475
476 #ifdef RB_TIME_32
477
478 struct rb_time_struct {
479 local_t cnt;
480 local_t top;
481 local_t bottom;
482 };
483 #else
484 #include <asm/local64.h>
485 struct rb_time_struct {
486 local64_t time;
487 };
488 #endif
489 typedef struct rb_time_struct rb_time_t;
490
491 /*
492 * head_page == tail_page && head == tail then buffer is empty.
493 */
494 struct ring_buffer_per_cpu {
495 int cpu;
496 atomic_t record_disabled;
497 atomic_t resize_disabled;
498 struct trace_buffer *buffer;
499 raw_spinlock_t reader_lock; /* serialize readers */
500 arch_spinlock_t lock;
501 struct lock_class_key lock_key;
502 struct buffer_data_page *free_page;
503 unsigned long nr_pages;
504 unsigned int current_context;
505 struct list_head *pages;
506 struct buffer_page *head_page; /* read from head */
507 struct buffer_page *tail_page; /* write to tail */
508 struct buffer_page *commit_page; /* committed pages */
509 struct buffer_page *reader_page;
510 unsigned long lost_events;
511 unsigned long last_overrun;
512 unsigned long nest;
513 local_t entries_bytes;
514 local_t entries;
515 local_t overrun;
516 local_t commit_overrun;
517 local_t dropped_events;
518 local_t committing;
519 local_t commits;
520 local_t pages_touched;
521 local_t pages_lost;
522 local_t pages_read;
523 long last_pages_touch;
524 size_t shortest_full;
525 unsigned long read;
526 unsigned long read_bytes;
527 rb_time_t write_stamp;
528 rb_time_t before_stamp;
529 u64 read_stamp;
530 /* pages removed since last reset */
531 unsigned long pages_removed;
532 /* ring buffer pages to update, > 0 to add, < 0 to remove */
533 long nr_pages_to_update;
534 struct list_head new_pages; /* new pages to add */
535 struct work_struct update_pages_work;
536 struct completion update_done;
537
538 struct rb_irq_work irq_work;
539 };
540
541 struct trace_buffer {
542 unsigned flags;
543 int cpus;
544 atomic_t record_disabled;
545 atomic_t resizing;
546 cpumask_var_t cpumask;
547
548 struct lock_class_key *reader_lock_key;
549
550 struct mutex mutex;
551
552 struct ring_buffer_per_cpu **buffers;
553
554 struct hlist_node node;
555 u64 (*clock)(void);
556
557 struct rb_irq_work irq_work;
558 bool time_stamp_abs;
559 };
560
561 struct ring_buffer_iter {
562 struct ring_buffer_per_cpu *cpu_buffer;
563 unsigned long head;
564 unsigned long next_event;
565 struct buffer_page *head_page;
566 struct buffer_page *cache_reader_page;
567 unsigned long cache_read;
568 unsigned long cache_pages_removed;
569 u64 read_stamp;
570 u64 page_stamp;
571 struct ring_buffer_event *event;
572 int missed_events;
573 };
574
575 #ifdef RB_TIME_32
576
577 /*
578 * On 32 bit machines, local64_t is very expensive. As the ring
579 * buffer doesn't need all the features of a true 64 bit atomic,
580 * on 32 bit, it uses these functions (64 still uses local64_t).
581 *
582 * For the ring buffer, 64 bit required operations for the time is
583 * the following:
584 *
585 * - Only need 59 bits (uses 60 to make it even).
586 * - Reads may fail if it interrupted a modification of the time stamp.
587 * It will succeed if it did not interrupt another write even if
588 * the read itself is interrupted by a write.
589 * It returns whether it was successful or not.
590 *
591 * - Writes always succeed and will overwrite other writes and writes
592 * that were done by events interrupting the current write.
593 *
594 * - A write followed by a read of the same time stamp will always succeed,
595 * but may not contain the same value.
596 *
597 * - A cmpxchg will fail if it interrupted another write or cmpxchg.
598 * Other than that, it acts like a normal cmpxchg.
599 *
600 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
601 * (bottom being the least significant 30 bits of the 60 bit time stamp).
602 *
603 * The two most significant bits of each half holds a 2 bit counter (0-3).
604 * Each update will increment this counter by one.
605 * When reading the top and bottom, if the two counter bits match then the
606 * top and bottom together make a valid 60 bit number.
607 */
608 #define RB_TIME_SHIFT 30
609 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
610
rb_time_cnt(unsigned long val)611 static inline int rb_time_cnt(unsigned long val)
612 {
613 return (val >> RB_TIME_SHIFT) & 3;
614 }
615
rb_time_val(unsigned long top,unsigned long bottom)616 static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
617 {
618 u64 val;
619
620 val = top & RB_TIME_VAL_MASK;
621 val <<= RB_TIME_SHIFT;
622 val |= bottom & RB_TIME_VAL_MASK;
623
624 return val;
625 }
626
__rb_time_read(rb_time_t * t,u64 * ret,unsigned long * cnt)627 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
628 {
629 unsigned long top, bottom;
630 unsigned long c;
631
632 /*
633 * If the read is interrupted by a write, then the cnt will
634 * be different. Loop until both top and bottom have been read
635 * without interruption.
636 */
637 do {
638 c = local_read(&t->cnt);
639 top = local_read(&t->top);
640 bottom = local_read(&t->bottom);
641 } while (c != local_read(&t->cnt));
642
643 *cnt = rb_time_cnt(top);
644
645 /* If top and bottom counts don't match, this interrupted a write */
646 if (*cnt != rb_time_cnt(bottom))
647 return false;
648
649 *ret = rb_time_val(top, bottom);
650 return true;
651 }
652
rb_time_read(rb_time_t * t,u64 * ret)653 static bool rb_time_read(rb_time_t *t, u64 *ret)
654 {
655 unsigned long cnt;
656
657 return __rb_time_read(t, ret, &cnt);
658 }
659
rb_time_val_cnt(unsigned long val,unsigned long cnt)660 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
661 {
662 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
663 }
664
rb_time_split(u64 val,unsigned long * top,unsigned long * bottom)665 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
666 {
667 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
668 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
669 }
670
rb_time_val_set(local_t * t,unsigned long val,unsigned long cnt)671 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
672 {
673 val = rb_time_val_cnt(val, cnt);
674 local_set(t, val);
675 }
676
rb_time_set(rb_time_t * t,u64 val)677 static void rb_time_set(rb_time_t *t, u64 val)
678 {
679 unsigned long cnt, top, bottom;
680
681 rb_time_split(val, &top, &bottom);
682
683 /* Writes always succeed with a valid number even if it gets interrupted. */
684 do {
685 cnt = local_inc_return(&t->cnt);
686 rb_time_val_set(&t->top, top, cnt);
687 rb_time_val_set(&t->bottom, bottom, cnt);
688 } while (cnt != local_read(&t->cnt));
689 }
690
691 static inline bool
rb_time_read_cmpxchg(local_t * l,unsigned long expect,unsigned long set)692 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
693 {
694 unsigned long ret;
695
696 ret = local_cmpxchg(l, expect, set);
697 return ret == expect;
698 }
699
rb_time_cmpxchg(rb_time_t * t,u64 expect,u64 set)700 static int rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
701 {
702 unsigned long cnt, top, bottom;
703 unsigned long cnt2, top2, bottom2;
704 u64 val;
705
706 /* Any interruptions in this function should cause a failure */
707 cnt = local_read(&t->cnt);
708
709 /* The cmpxchg always fails if it interrupted an update */
710 if (!__rb_time_read(t, &val, &cnt2))
711 return false;
712
713 if (val != expect)
714 return false;
715
716 if ((cnt & 3) != cnt2)
717 return false;
718
719 cnt2 = cnt + 1;
720
721 rb_time_split(val, &top, &bottom);
722 top = rb_time_val_cnt(top, cnt);
723 bottom = rb_time_val_cnt(bottom, cnt);
724
725 rb_time_split(set, &top2, &bottom2);
726 top2 = rb_time_val_cnt(top2, cnt2);
727 bottom2 = rb_time_val_cnt(bottom2, cnt2);
728
729 if (!rb_time_read_cmpxchg(&t->cnt, cnt, cnt2))
730 return false;
731 if (!rb_time_read_cmpxchg(&t->top, top, top2))
732 return false;
733 if (!rb_time_read_cmpxchg(&t->bottom, bottom, bottom2))
734 return false;
735 return true;
736 }
737
738 #else /* 64 bits */
739
740 /* local64_t always succeeds */
741
rb_time_read(rb_time_t * t,u64 * ret)742 static inline bool rb_time_read(rb_time_t *t, u64 *ret)
743 {
744 *ret = local64_read(&t->time);
745 return true;
746 }
rb_time_set(rb_time_t * t,u64 val)747 static void rb_time_set(rb_time_t *t, u64 val)
748 {
749 local64_set(&t->time, val);
750 }
751
rb_time_cmpxchg(rb_time_t * t,u64 expect,u64 set)752 static bool rb_time_cmpxchg(rb_time_t *t, u64 expect, u64 set)
753 {
754 u64 val;
755 val = local64_cmpxchg(&t->time, expect, set);
756 return val == expect;
757 }
758 #endif
759
760 /**
761 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
762 * @buffer: The ring_buffer to get the number of pages from
763 * @cpu: The cpu of the ring_buffer to get the number of pages from
764 *
765 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
766 */
ring_buffer_nr_pages(struct trace_buffer * buffer,int cpu)767 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
768 {
769 return buffer->buffers[cpu]->nr_pages;
770 }
771
772 /**
773 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
774 * @buffer: The ring_buffer to get the number of pages from
775 * @cpu: The cpu of the ring_buffer to get the number of pages from
776 *
777 * Returns the number of pages that have content in the ring buffer.
778 */
ring_buffer_nr_dirty_pages(struct trace_buffer * buffer,int cpu)779 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
780 {
781 size_t read;
782 size_t lost;
783 size_t cnt;
784
785 read = local_read(&buffer->buffers[cpu]->pages_read);
786 lost = local_read(&buffer->buffers[cpu]->pages_lost);
787 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
788
789 if (WARN_ON_ONCE(cnt < lost))
790 return 0;
791
792 cnt -= lost;
793
794 /* The reader can read an empty page, but not more than that */
795 if (cnt < read) {
796 WARN_ON_ONCE(read > cnt + 1);
797 return 0;
798 }
799
800 return cnt - read;
801 }
802
full_hit(struct trace_buffer * buffer,int cpu,int full)803 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
804 {
805 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
806 size_t nr_pages;
807 size_t dirty;
808
809 nr_pages = cpu_buffer->nr_pages;
810 if (!nr_pages || !full)
811 return true;
812
813 /*
814 * Add one as dirty will never equal nr_pages, as the sub-buffer
815 * that the writer is on is not counted as dirty.
816 * This is needed if "buffer_percent" is set to 100.
817 */
818 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
819
820 return (dirty * 100) >= (full * nr_pages);
821 }
822
823 /*
824 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
825 *
826 * Schedules a delayed work to wake up any task that is blocked on the
827 * ring buffer waiters queue.
828 */
rb_wake_up_waiters(struct irq_work * work)829 static void rb_wake_up_waiters(struct irq_work *work)
830 {
831 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
832
833 wake_up_all(&rbwork->waiters);
834 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
835 rbwork->wakeup_full = false;
836 rbwork->full_waiters_pending = false;
837 wake_up_all(&rbwork->full_waiters);
838 }
839 }
840
841 /**
842 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
843 * @buffer: The ring buffer to wake waiters on
844 *
845 * In the case of a file that represents a ring buffer is closing,
846 * it is prudent to wake up any waiters that are on this.
847 */
ring_buffer_wake_waiters(struct trace_buffer * buffer,int cpu)848 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
849 {
850 struct ring_buffer_per_cpu *cpu_buffer;
851 struct rb_irq_work *rbwork;
852
853 if (cpu == RING_BUFFER_ALL_CPUS) {
854
855 /* Wake up individual ones too. One level recursion */
856 for_each_buffer_cpu(buffer, cpu)
857 ring_buffer_wake_waiters(buffer, cpu);
858
859 rbwork = &buffer->irq_work;
860 } else {
861 cpu_buffer = buffer->buffers[cpu];
862 rbwork = &cpu_buffer->irq_work;
863 }
864
865 rbwork->wait_index++;
866 /* make sure the waiters see the new index */
867 smp_wmb();
868
869 /* This can be called in any context */
870 irq_work_queue(&rbwork->work);
871 }
872
873 /**
874 * ring_buffer_wait - wait for input to the ring buffer
875 * @buffer: buffer to wait on
876 * @cpu: the cpu buffer to wait on
877 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
878 *
879 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
880 * as data is added to any of the @buffer's cpu buffers. Otherwise
881 * it will wait for data to be added to a specific cpu buffer.
882 */
ring_buffer_wait(struct trace_buffer * buffer,int cpu,int full)883 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
884 {
885 struct ring_buffer_per_cpu *cpu_buffer;
886 DEFINE_WAIT(wait);
887 struct rb_irq_work *work;
888 long wait_index;
889 int ret = 0;
890
891 /*
892 * Depending on what the caller is waiting for, either any
893 * data in any cpu buffer, or a specific buffer, put the
894 * caller on the appropriate wait queue.
895 */
896 if (cpu == RING_BUFFER_ALL_CPUS) {
897 work = &buffer->irq_work;
898 /* Full only makes sense on per cpu reads */
899 full = 0;
900 } else {
901 if (!cpumask_test_cpu(cpu, buffer->cpumask))
902 return -ENODEV;
903 cpu_buffer = buffer->buffers[cpu];
904 work = &cpu_buffer->irq_work;
905 }
906
907 wait_index = READ_ONCE(work->wait_index);
908
909 while (true) {
910 if (full)
911 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
912 else
913 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
914
915 /*
916 * The events can happen in critical sections where
917 * checking a work queue can cause deadlocks.
918 * After adding a task to the queue, this flag is set
919 * only to notify events to try to wake up the queue
920 * using irq_work.
921 *
922 * We don't clear it even if the buffer is no longer
923 * empty. The flag only causes the next event to run
924 * irq_work to do the work queue wake up. The worse
925 * that can happen if we race with !trace_empty() is that
926 * an event will cause an irq_work to try to wake up
927 * an empty queue.
928 *
929 * There's no reason to protect this flag either, as
930 * the work queue and irq_work logic will do the necessary
931 * synchronization for the wake ups. The only thing
932 * that is necessary is that the wake up happens after
933 * a task has been queued. It's OK for spurious wake ups.
934 */
935 if (full)
936 work->full_waiters_pending = true;
937 else
938 work->waiters_pending = true;
939
940 if (signal_pending(current)) {
941 ret = -EINTR;
942 break;
943 }
944
945 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
946 break;
947
948 if (cpu != RING_BUFFER_ALL_CPUS &&
949 !ring_buffer_empty_cpu(buffer, cpu)) {
950 unsigned long flags;
951 bool pagebusy;
952 bool done;
953
954 if (!full)
955 break;
956
957 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
958 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
959 done = !pagebusy && full_hit(buffer, cpu, full);
960
961 if (!cpu_buffer->shortest_full ||
962 cpu_buffer->shortest_full > full)
963 cpu_buffer->shortest_full = full;
964 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
965 if (done)
966 break;
967 }
968
969 schedule();
970
971 /* Make sure to see the new wait index */
972 smp_rmb();
973 if (wait_index != work->wait_index)
974 break;
975 }
976
977 if (full)
978 finish_wait(&work->full_waiters, &wait);
979 else
980 finish_wait(&work->waiters, &wait);
981
982 return ret;
983 }
984
985 /**
986 * ring_buffer_poll_wait - poll on buffer input
987 * @buffer: buffer to wait on
988 * @cpu: the cpu buffer to wait on
989 * @filp: the file descriptor
990 * @poll_table: The poll descriptor
991 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
992 *
993 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
994 * as data is added to any of the @buffer's cpu buffers. Otherwise
995 * it will wait for data to be added to a specific cpu buffer.
996 *
997 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
998 * zero otherwise.
999 */
ring_buffer_poll_wait(struct trace_buffer * buffer,int cpu,struct file * filp,poll_table * poll_table,int full)1000 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
1001 struct file *filp, poll_table *poll_table, int full)
1002 {
1003 struct ring_buffer_per_cpu *cpu_buffer;
1004 struct rb_irq_work *work;
1005
1006 if (cpu == RING_BUFFER_ALL_CPUS) {
1007 work = &buffer->irq_work;
1008 full = 0;
1009 } else {
1010 if (!cpumask_test_cpu(cpu, buffer->cpumask))
1011 return EPOLLERR;
1012
1013 cpu_buffer = buffer->buffers[cpu];
1014 work = &cpu_buffer->irq_work;
1015 }
1016
1017 if (full) {
1018 poll_wait(filp, &work->full_waiters, poll_table);
1019 work->full_waiters_pending = true;
1020 if (!cpu_buffer->shortest_full ||
1021 cpu_buffer->shortest_full > full)
1022 cpu_buffer->shortest_full = full;
1023 } else {
1024 poll_wait(filp, &work->waiters, poll_table);
1025 work->waiters_pending = true;
1026 }
1027
1028 /*
1029 * There's a tight race between setting the waiters_pending and
1030 * checking if the ring buffer is empty. Once the waiters_pending bit
1031 * is set, the next event will wake the task up, but we can get stuck
1032 * if there's only a single event in.
1033 *
1034 * FIXME: Ideally, we need a memory barrier on the writer side as well,
1035 * but adding a memory barrier to all events will cause too much of a
1036 * performance hit in the fast path. We only need a memory barrier when
1037 * the buffer goes from empty to having content. But as this race is
1038 * extremely small, and it's not a problem if another event comes in, we
1039 * will fix it later.
1040 */
1041 smp_mb();
1042
1043 if (full)
1044 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
1045
1046 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1047 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
1048 return EPOLLIN | EPOLLRDNORM;
1049 return 0;
1050 }
1051
1052 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
1053 #define RB_WARN_ON(b, cond) \
1054 ({ \
1055 int _____ret = unlikely(cond); \
1056 if (_____ret) { \
1057 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1058 struct ring_buffer_per_cpu *__b = \
1059 (void *)b; \
1060 atomic_inc(&__b->buffer->record_disabled); \
1061 } else \
1062 atomic_inc(&b->record_disabled); \
1063 WARN_ON(1); \
1064 } \
1065 _____ret; \
1066 })
1067
1068 /* Up this if you want to test the TIME_EXTENTS and normalization */
1069 #define DEBUG_SHIFT 0
1070
rb_time_stamp(struct trace_buffer * buffer)1071 static inline u64 rb_time_stamp(struct trace_buffer *buffer)
1072 {
1073 u64 ts;
1074
1075 /* Skip retpolines :-( */
1076 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local))
1077 ts = trace_clock_local();
1078 else
1079 ts = buffer->clock();
1080
1081 /* shift to debug/test normalization and TIME_EXTENTS */
1082 return ts << DEBUG_SHIFT;
1083 }
1084
ring_buffer_time_stamp(struct trace_buffer * buffer,int cpu)1085 u64 ring_buffer_time_stamp(struct trace_buffer *buffer, int cpu)
1086 {
1087 u64 time;
1088
1089 preempt_disable_notrace();
1090 time = rb_time_stamp(buffer);
1091 preempt_enable_notrace();
1092
1093 return time;
1094 }
1095 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1096
ring_buffer_normalize_time_stamp(struct trace_buffer * buffer,int cpu,u64 * ts)1097 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
1098 int cpu, u64 *ts)
1099 {
1100 /* Just stupid testing the normalize function and deltas */
1101 *ts >>= DEBUG_SHIFT;
1102 }
1103 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1104
1105 /*
1106 * Making the ring buffer lockless makes things tricky.
1107 * Although writes only happen on the CPU that they are on,
1108 * and they only need to worry about interrupts. Reads can
1109 * happen on any CPU.
1110 *
1111 * The reader page is always off the ring buffer, but when the
1112 * reader finishes with a page, it needs to swap its page with
1113 * a new one from the buffer. The reader needs to take from
1114 * the head (writes go to the tail). But if a writer is in overwrite
1115 * mode and wraps, it must push the head page forward.
1116 *
1117 * Here lies the problem.
1118 *
1119 * The reader must be careful to replace only the head page, and
1120 * not another one. As described at the top of the file in the
1121 * ASCII art, the reader sets its old page to point to the next
1122 * page after head. It then sets the page after head to point to
1123 * the old reader page. But if the writer moves the head page
1124 * during this operation, the reader could end up with the tail.
1125 *
1126 * We use cmpxchg to help prevent this race. We also do something
1127 * special with the page before head. We set the LSB to 1.
1128 *
1129 * When the writer must push the page forward, it will clear the
1130 * bit that points to the head page, move the head, and then set
1131 * the bit that points to the new head page.
1132 *
1133 * We also don't want an interrupt coming in and moving the head
1134 * page on another writer. Thus we use the second LSB to catch
1135 * that too. Thus:
1136 *
1137 * head->list->prev->next bit 1 bit 0
1138 * ------- -------
1139 * Normal page 0 0
1140 * Points to head page 0 1
1141 * New head page 1 0
1142 *
1143 * Note we can not trust the prev pointer of the head page, because:
1144 *
1145 * +----+ +-----+ +-----+
1146 * | |------>| T |---X--->| N |
1147 * | |<------| | | |
1148 * +----+ +-----+ +-----+
1149 * ^ ^ |
1150 * | +-----+ | |
1151 * +----------| R |----------+ |
1152 * | |<-----------+
1153 * +-----+
1154 *
1155 * Key: ---X--> HEAD flag set in pointer
1156 * T Tail page
1157 * R Reader page
1158 * N Next page
1159 *
1160 * (see __rb_reserve_next() to see where this happens)
1161 *
1162 * What the above shows is that the reader just swapped out
1163 * the reader page with a page in the buffer, but before it
1164 * could make the new header point back to the new page added
1165 * it was preempted by a writer. The writer moved forward onto
1166 * the new page added by the reader and is about to move forward
1167 * again.
1168 *
1169 * You can see, it is legitimate for the previous pointer of
1170 * the head (or any page) not to point back to itself. But only
1171 * temporarily.
1172 */
1173
1174 #define RB_PAGE_NORMAL 0UL
1175 #define RB_PAGE_HEAD 1UL
1176 #define RB_PAGE_UPDATE 2UL
1177
1178
1179 #define RB_FLAG_MASK 3UL
1180
1181 /* PAGE_MOVED is not part of the mask */
1182 #define RB_PAGE_MOVED 4UL
1183
1184 /*
1185 * rb_list_head - remove any bit
1186 */
rb_list_head(struct list_head * list)1187 static struct list_head *rb_list_head(struct list_head *list)
1188 {
1189 unsigned long val = (unsigned long)list;
1190
1191 return (struct list_head *)(val & ~RB_FLAG_MASK);
1192 }
1193
1194 /*
1195 * rb_is_head_page - test if the given page is the head page
1196 *
1197 * Because the reader may move the head_page pointer, we can
1198 * not trust what the head page is (it may be pointing to
1199 * the reader page). But if the next page is a header page,
1200 * its flags will be non zero.
1201 */
1202 static inline int
rb_is_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * page,struct list_head * list)1203 rb_is_head_page(struct ring_buffer_per_cpu *cpu_buffer,
1204 struct buffer_page *page, struct list_head *list)
1205 {
1206 unsigned long val;
1207
1208 val = (unsigned long)list->next;
1209
1210 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1211 return RB_PAGE_MOVED;
1212
1213 return val & RB_FLAG_MASK;
1214 }
1215
1216 /*
1217 * rb_is_reader_page
1218 *
1219 * The unique thing about the reader page, is that, if the
1220 * writer is ever on it, the previous pointer never points
1221 * back to the reader page.
1222 */
rb_is_reader_page(struct buffer_page * page)1223 static bool rb_is_reader_page(struct buffer_page *page)
1224 {
1225 struct list_head *list = page->list.prev;
1226
1227 return rb_list_head(list->next) != &page->list;
1228 }
1229
1230 /*
1231 * rb_set_list_to_head - set a list_head to be pointing to head.
1232 */
rb_set_list_to_head(struct ring_buffer_per_cpu * cpu_buffer,struct list_head * list)1233 static void rb_set_list_to_head(struct ring_buffer_per_cpu *cpu_buffer,
1234 struct list_head *list)
1235 {
1236 unsigned long *ptr;
1237
1238 ptr = (unsigned long *)&list->next;
1239 *ptr |= RB_PAGE_HEAD;
1240 *ptr &= ~RB_PAGE_UPDATE;
1241 }
1242
1243 /*
1244 * rb_head_page_activate - sets up head page
1245 */
rb_head_page_activate(struct ring_buffer_per_cpu * cpu_buffer)1246 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1247 {
1248 struct buffer_page *head;
1249
1250 head = cpu_buffer->head_page;
1251 if (!head)
1252 return;
1253
1254 /*
1255 * Set the previous list pointer to have the HEAD flag.
1256 */
1257 rb_set_list_to_head(cpu_buffer, head->list.prev);
1258 }
1259
rb_list_head_clear(struct list_head * list)1260 static void rb_list_head_clear(struct list_head *list)
1261 {
1262 unsigned long *ptr = (unsigned long *)&list->next;
1263
1264 *ptr &= ~RB_FLAG_MASK;
1265 }
1266
1267 /*
1268 * rb_head_page_deactivate - clears head page ptr (for free list)
1269 */
1270 static void
rb_head_page_deactivate(struct ring_buffer_per_cpu * cpu_buffer)1271 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1272 {
1273 struct list_head *hd;
1274
1275 /* Go through the whole list and clear any pointers found. */
1276 rb_list_head_clear(cpu_buffer->pages);
1277
1278 list_for_each(hd, cpu_buffer->pages)
1279 rb_list_head_clear(hd);
1280 }
1281
rb_head_page_set(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag,int new_flag)1282 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1283 struct buffer_page *head,
1284 struct buffer_page *prev,
1285 int old_flag, int new_flag)
1286 {
1287 struct list_head *list;
1288 unsigned long val = (unsigned long)&head->list;
1289 unsigned long ret;
1290
1291 list = &prev->list;
1292
1293 val &= ~RB_FLAG_MASK;
1294
1295 ret = cmpxchg((unsigned long *)&list->next,
1296 val | old_flag, val | new_flag);
1297
1298 /* check if the reader took the page */
1299 if ((ret & ~RB_FLAG_MASK) != val)
1300 return RB_PAGE_MOVED;
1301
1302 return ret & RB_FLAG_MASK;
1303 }
1304
rb_head_page_set_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1305 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1306 struct buffer_page *head,
1307 struct buffer_page *prev,
1308 int old_flag)
1309 {
1310 return rb_head_page_set(cpu_buffer, head, prev,
1311 old_flag, RB_PAGE_UPDATE);
1312 }
1313
rb_head_page_set_head(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1314 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1315 struct buffer_page *head,
1316 struct buffer_page *prev,
1317 int old_flag)
1318 {
1319 return rb_head_page_set(cpu_buffer, head, prev,
1320 old_flag, RB_PAGE_HEAD);
1321 }
1322
rb_head_page_set_normal(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1323 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1324 struct buffer_page *head,
1325 struct buffer_page *prev,
1326 int old_flag)
1327 {
1328 return rb_head_page_set(cpu_buffer, head, prev,
1329 old_flag, RB_PAGE_NORMAL);
1330 }
1331
rb_inc_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page ** bpage)1332 static inline void rb_inc_page(struct ring_buffer_per_cpu *cpu_buffer,
1333 struct buffer_page **bpage)
1334 {
1335 struct list_head *p = rb_list_head((*bpage)->list.next);
1336
1337 *bpage = list_entry(p, struct buffer_page, list);
1338 }
1339
1340 static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu * cpu_buffer)1341 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1342 {
1343 struct buffer_page *head;
1344 struct buffer_page *page;
1345 struct list_head *list;
1346 int i;
1347
1348 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1349 return NULL;
1350
1351 /* sanity check */
1352 list = cpu_buffer->pages;
1353 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1354 return NULL;
1355
1356 page = head = cpu_buffer->head_page;
1357 /*
1358 * It is possible that the writer moves the header behind
1359 * where we started, and we miss in one loop.
1360 * A second loop should grab the header, but we'll do
1361 * three loops just because I'm paranoid.
1362 */
1363 for (i = 0; i < 3; i++) {
1364 do {
1365 if (rb_is_head_page(cpu_buffer, page, page->list.prev)) {
1366 cpu_buffer->head_page = page;
1367 return page;
1368 }
1369 rb_inc_page(cpu_buffer, &page);
1370 } while (page != head);
1371 }
1372
1373 RB_WARN_ON(cpu_buffer, 1);
1374
1375 return NULL;
1376 }
1377
rb_head_page_replace(struct buffer_page * old,struct buffer_page * new)1378 static int rb_head_page_replace(struct buffer_page *old,
1379 struct buffer_page *new)
1380 {
1381 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1382 unsigned long val;
1383 unsigned long ret;
1384
1385 val = *ptr & ~RB_FLAG_MASK;
1386 val |= RB_PAGE_HEAD;
1387
1388 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1389
1390 return ret == val;
1391 }
1392
1393 /*
1394 * rb_tail_page_update - move the tail page forward
1395 */
rb_tail_page_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)1396 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1397 struct buffer_page *tail_page,
1398 struct buffer_page *next_page)
1399 {
1400 unsigned long old_entries;
1401 unsigned long old_write;
1402
1403 /*
1404 * The tail page now needs to be moved forward.
1405 *
1406 * We need to reset the tail page, but without messing
1407 * with possible erasing of data brought in by interrupts
1408 * that have moved the tail page and are currently on it.
1409 *
1410 * We add a counter to the write field to denote this.
1411 */
1412 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1413 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1414
1415 local_inc(&cpu_buffer->pages_touched);
1416 /*
1417 * Just make sure we have seen our old_write and synchronize
1418 * with any interrupts that come in.
1419 */
1420 barrier();
1421
1422 /*
1423 * If the tail page is still the same as what we think
1424 * it is, then it is up to us to update the tail
1425 * pointer.
1426 */
1427 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1428 /* Zero the write counter */
1429 unsigned long val = old_write & ~RB_WRITE_MASK;
1430 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1431
1432 /*
1433 * This will only succeed if an interrupt did
1434 * not come in and change it. In which case, we
1435 * do not want to modify it.
1436 *
1437 * We add (void) to let the compiler know that we do not care
1438 * about the return value of these functions. We use the
1439 * cmpxchg to only update if an interrupt did not already
1440 * do it for us. If the cmpxchg fails, we don't care.
1441 */
1442 (void)local_cmpxchg(&next_page->write, old_write, val);
1443 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1444
1445 /*
1446 * No need to worry about races with clearing out the commit.
1447 * it only can increment when a commit takes place. But that
1448 * only happens in the outer most nested commit.
1449 */
1450 local_set(&next_page->page->commit, 0);
1451
1452 /* Again, either we update tail_page or an interrupt does */
1453 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1454 }
1455 }
1456
rb_check_bpage(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * bpage)1457 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1458 struct buffer_page *bpage)
1459 {
1460 unsigned long val = (unsigned long)bpage;
1461
1462 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1463 return 1;
1464
1465 return 0;
1466 }
1467
1468 /**
1469 * rb_check_pages - integrity check of buffer pages
1470 * @cpu_buffer: CPU buffer with pages to test
1471 *
1472 * As a safety measure we check to make sure the data pages have not
1473 * been corrupted.
1474 */
rb_check_pages(struct ring_buffer_per_cpu * cpu_buffer)1475 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1476 {
1477 struct list_head *head = rb_list_head(cpu_buffer->pages);
1478 struct list_head *tmp;
1479
1480 if (RB_WARN_ON(cpu_buffer,
1481 rb_list_head(rb_list_head(head->next)->prev) != head))
1482 return -1;
1483
1484 if (RB_WARN_ON(cpu_buffer,
1485 rb_list_head(rb_list_head(head->prev)->next) != head))
1486 return -1;
1487
1488 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
1489 if (RB_WARN_ON(cpu_buffer,
1490 rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
1491 return -1;
1492
1493 if (RB_WARN_ON(cpu_buffer,
1494 rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
1495 return -1;
1496 }
1497
1498 return 0;
1499 }
1500
__rb_allocate_pages(long nr_pages,struct list_head * pages,int cpu)1501 static int __rb_allocate_pages(long nr_pages, struct list_head *pages, int cpu)
1502 {
1503 struct buffer_page *bpage, *tmp;
1504 bool user_thread = current->mm != NULL;
1505 gfp_t mflags;
1506 long i;
1507
1508 /*
1509 * Check if the available memory is there first.
1510 * Note, si_mem_available() only gives us a rough estimate of available
1511 * memory. It may not be accurate. But we don't care, we just want
1512 * to prevent doing any allocation when it is obvious that it is
1513 * not going to succeed.
1514 */
1515 i = si_mem_available();
1516 if (i < nr_pages)
1517 return -ENOMEM;
1518
1519 /*
1520 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1521 * gracefully without invoking oom-killer and the system is not
1522 * destabilized.
1523 */
1524 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1525
1526 /*
1527 * If a user thread allocates too much, and si_mem_available()
1528 * reports there's enough memory, even though there is not.
1529 * Make sure the OOM killer kills this thread. This can happen
1530 * even with RETRY_MAYFAIL because another task may be doing
1531 * an allocation after this task has taken all memory.
1532 * This is the task the OOM killer needs to take out during this
1533 * loop, even if it was triggered by an allocation somewhere else.
1534 */
1535 if (user_thread)
1536 set_current_oom_origin();
1537 for (i = 0; i < nr_pages; i++) {
1538 struct page *page;
1539
1540 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1541 mflags, cpu_to_node(cpu));
1542 if (!bpage)
1543 goto free_pages;
1544
1545 list_add(&bpage->list, pages);
1546
1547 page = alloc_pages_node(cpu_to_node(cpu), mflags, 0);
1548 if (!page)
1549 goto free_pages;
1550 bpage->page = page_address(page);
1551 rb_init_page(bpage->page);
1552
1553 if (user_thread && fatal_signal_pending(current))
1554 goto free_pages;
1555 }
1556 if (user_thread)
1557 clear_current_oom_origin();
1558
1559 return 0;
1560
1561 free_pages:
1562 list_for_each_entry_safe(bpage, tmp, pages, list) {
1563 list_del_init(&bpage->list);
1564 free_buffer_page(bpage);
1565 }
1566 if (user_thread)
1567 clear_current_oom_origin();
1568
1569 return -ENOMEM;
1570 }
1571
rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1572 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1573 unsigned long nr_pages)
1574 {
1575 LIST_HEAD(pages);
1576
1577 WARN_ON(!nr_pages);
1578
1579 if (__rb_allocate_pages(nr_pages, &pages, cpu_buffer->cpu))
1580 return -ENOMEM;
1581
1582 /*
1583 * The ring buffer page list is a circular list that does not
1584 * start and end with a list head. All page list items point to
1585 * other pages.
1586 */
1587 cpu_buffer->pages = pages.next;
1588 list_del(&pages);
1589
1590 cpu_buffer->nr_pages = nr_pages;
1591
1592 rb_check_pages(cpu_buffer);
1593
1594 return 0;
1595 }
1596
1597 static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer * buffer,long nr_pages,int cpu)1598 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
1599 {
1600 struct ring_buffer_per_cpu *cpu_buffer;
1601 struct buffer_page *bpage;
1602 struct page *page;
1603 int ret;
1604
1605 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1606 GFP_KERNEL, cpu_to_node(cpu));
1607 if (!cpu_buffer)
1608 return NULL;
1609
1610 cpu_buffer->cpu = cpu;
1611 cpu_buffer->buffer = buffer;
1612 raw_spin_lock_init(&cpu_buffer->reader_lock);
1613 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1614 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1615 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1616 init_completion(&cpu_buffer->update_done);
1617 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1618 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1619 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1620
1621 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1622 GFP_KERNEL, cpu_to_node(cpu));
1623 if (!bpage)
1624 goto fail_free_buffer;
1625
1626 rb_check_bpage(cpu_buffer, bpage);
1627
1628 cpu_buffer->reader_page = bpage;
1629 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1630 if (!page)
1631 goto fail_free_reader;
1632 bpage->page = page_address(page);
1633 rb_init_page(bpage->page);
1634
1635 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1636 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1637
1638 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1639 if (ret < 0)
1640 goto fail_free_reader;
1641
1642 cpu_buffer->head_page
1643 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1644 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1645
1646 rb_head_page_activate(cpu_buffer);
1647
1648 return cpu_buffer;
1649
1650 fail_free_reader:
1651 free_buffer_page(cpu_buffer->reader_page);
1652
1653 fail_free_buffer:
1654 kfree(cpu_buffer);
1655 return NULL;
1656 }
1657
rb_free_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)1658 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1659 {
1660 struct list_head *head = cpu_buffer->pages;
1661 struct buffer_page *bpage, *tmp;
1662
1663 irq_work_sync(&cpu_buffer->irq_work.work);
1664
1665 free_buffer_page(cpu_buffer->reader_page);
1666
1667 if (head) {
1668 rb_head_page_deactivate(cpu_buffer);
1669
1670 list_for_each_entry_safe(bpage, tmp, head, list) {
1671 list_del_init(&bpage->list);
1672 free_buffer_page(bpage);
1673 }
1674 bpage = list_entry(head, struct buffer_page, list);
1675 free_buffer_page(bpage);
1676 }
1677
1678 free_page((unsigned long)cpu_buffer->free_page);
1679
1680 kfree(cpu_buffer);
1681 }
1682
1683 /**
1684 * __ring_buffer_alloc - allocate a new ring_buffer
1685 * @size: the size in bytes per cpu that is needed.
1686 * @flags: attributes to set for the ring buffer.
1687 * @key: ring buffer reader_lock_key.
1688 *
1689 * Currently the only flag that is available is the RB_FL_OVERWRITE
1690 * flag. This flag means that the buffer will overwrite old data
1691 * when the buffer wraps. If this flag is not set, the buffer will
1692 * drop data when the tail hits the head.
1693 */
__ring_buffer_alloc(unsigned long size,unsigned flags,struct lock_class_key * key)1694 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1695 struct lock_class_key *key)
1696 {
1697 struct trace_buffer *buffer;
1698 long nr_pages;
1699 int bsize;
1700 int cpu;
1701 int ret;
1702
1703 /* keep it in its own cache line */
1704 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1705 GFP_KERNEL);
1706 if (!buffer)
1707 return NULL;
1708
1709 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1710 goto fail_free_buffer;
1711
1712 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1713 buffer->flags = flags;
1714 buffer->clock = trace_clock_local;
1715 buffer->reader_lock_key = key;
1716
1717 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1718 init_waitqueue_head(&buffer->irq_work.waiters);
1719
1720 /* need at least two pages */
1721 if (nr_pages < 2)
1722 nr_pages = 2;
1723
1724 buffer->cpus = nr_cpu_ids;
1725
1726 bsize = sizeof(void *) * nr_cpu_ids;
1727 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1728 GFP_KERNEL);
1729 if (!buffer->buffers)
1730 goto fail_free_cpumask;
1731
1732 cpu = raw_smp_processor_id();
1733 cpumask_set_cpu(cpu, buffer->cpumask);
1734 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1735 if (!buffer->buffers[cpu])
1736 goto fail_free_buffers;
1737
1738 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1739 if (ret < 0)
1740 goto fail_free_buffers;
1741
1742 mutex_init(&buffer->mutex);
1743
1744 return buffer;
1745
1746 fail_free_buffers:
1747 for_each_buffer_cpu(buffer, cpu) {
1748 if (buffer->buffers[cpu])
1749 rb_free_cpu_buffer(buffer->buffers[cpu]);
1750 }
1751 kfree(buffer->buffers);
1752
1753 fail_free_cpumask:
1754 free_cpumask_var(buffer->cpumask);
1755
1756 fail_free_buffer:
1757 kfree(buffer);
1758 return NULL;
1759 }
1760 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1761
1762 /**
1763 * ring_buffer_free - free a ring buffer.
1764 * @buffer: the buffer to free.
1765 */
1766 void
ring_buffer_free(struct trace_buffer * buffer)1767 ring_buffer_free(struct trace_buffer *buffer)
1768 {
1769 int cpu;
1770
1771 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1772
1773 irq_work_sync(&buffer->irq_work.work);
1774
1775 for_each_buffer_cpu(buffer, cpu)
1776 rb_free_cpu_buffer(buffer->buffers[cpu]);
1777
1778 kfree(buffer->buffers);
1779 free_cpumask_var(buffer->cpumask);
1780
1781 kfree(buffer);
1782 }
1783 EXPORT_SYMBOL_GPL(ring_buffer_free);
1784
ring_buffer_set_clock(struct trace_buffer * buffer,u64 (* clock)(void))1785 void ring_buffer_set_clock(struct trace_buffer *buffer,
1786 u64 (*clock)(void))
1787 {
1788 buffer->clock = clock;
1789 }
1790
ring_buffer_set_time_stamp_abs(struct trace_buffer * buffer,bool abs)1791 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
1792 {
1793 buffer->time_stamp_abs = abs;
1794 }
1795
ring_buffer_time_stamp_abs(struct trace_buffer * buffer)1796 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
1797 {
1798 return buffer->time_stamp_abs;
1799 }
1800
1801 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1802
rb_page_entries(struct buffer_page * bpage)1803 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1804 {
1805 return local_read(&bpage->entries) & RB_WRITE_MASK;
1806 }
1807
rb_page_write(struct buffer_page * bpage)1808 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1809 {
1810 return local_read(&bpage->write) & RB_WRITE_MASK;
1811 }
1812
1813 static int
rb_remove_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1814 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1815 {
1816 struct list_head *tail_page, *to_remove, *next_page;
1817 struct buffer_page *to_remove_page, *tmp_iter_page;
1818 struct buffer_page *last_page, *first_page;
1819 unsigned long nr_removed;
1820 unsigned long head_bit;
1821 int page_entries;
1822
1823 head_bit = 0;
1824
1825 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1826 atomic_inc(&cpu_buffer->record_disabled);
1827 /*
1828 * We don't race with the readers since we have acquired the reader
1829 * lock. We also don't race with writers after disabling recording.
1830 * This makes it easy to figure out the first and the last page to be
1831 * removed from the list. We unlink all the pages in between including
1832 * the first and last pages. This is done in a busy loop so that we
1833 * lose the least number of traces.
1834 * The pages are freed after we restart recording and unlock readers.
1835 */
1836 tail_page = &cpu_buffer->tail_page->list;
1837
1838 /*
1839 * tail page might be on reader page, we remove the next page
1840 * from the ring buffer
1841 */
1842 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1843 tail_page = rb_list_head(tail_page->next);
1844 to_remove = tail_page;
1845
1846 /* start of pages to remove */
1847 first_page = list_entry(rb_list_head(to_remove->next),
1848 struct buffer_page, list);
1849
1850 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1851 to_remove = rb_list_head(to_remove)->next;
1852 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1853 }
1854 /* Read iterators need to reset themselves when some pages removed */
1855 cpu_buffer->pages_removed += nr_removed;
1856
1857 next_page = rb_list_head(to_remove)->next;
1858
1859 /*
1860 * Now we remove all pages between tail_page and next_page.
1861 * Make sure that we have head_bit value preserved for the
1862 * next page
1863 */
1864 tail_page->next = (struct list_head *)((unsigned long)next_page |
1865 head_bit);
1866 next_page = rb_list_head(next_page);
1867 next_page->prev = tail_page;
1868
1869 /* make sure pages points to a valid page in the ring buffer */
1870 cpu_buffer->pages = next_page;
1871
1872 /* update head page */
1873 if (head_bit)
1874 cpu_buffer->head_page = list_entry(next_page,
1875 struct buffer_page, list);
1876
1877 /* pages are removed, resume tracing and then free the pages */
1878 atomic_dec(&cpu_buffer->record_disabled);
1879 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1880
1881 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1882
1883 /* last buffer page to remove */
1884 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1885 list);
1886 tmp_iter_page = first_page;
1887
1888 do {
1889 cond_resched();
1890
1891 to_remove_page = tmp_iter_page;
1892 rb_inc_page(cpu_buffer, &tmp_iter_page);
1893
1894 /* update the counters */
1895 page_entries = rb_page_entries(to_remove_page);
1896 if (page_entries) {
1897 /*
1898 * If something was added to this page, it was full
1899 * since it is not the tail page. So we deduct the
1900 * bytes consumed in ring buffer from here.
1901 * Increment overrun to account for the lost events.
1902 */
1903 local_add(page_entries, &cpu_buffer->overrun);
1904 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
1905 local_inc(&cpu_buffer->pages_lost);
1906 }
1907
1908 /*
1909 * We have already removed references to this list item, just
1910 * free up the buffer_page and its page
1911 */
1912 free_buffer_page(to_remove_page);
1913 nr_removed--;
1914
1915 } while (to_remove_page != last_page);
1916
1917 RB_WARN_ON(cpu_buffer, nr_removed);
1918
1919 return nr_removed == 0;
1920 }
1921
1922 static int
rb_insert_pages(struct ring_buffer_per_cpu * cpu_buffer)1923 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1924 {
1925 struct list_head *pages = &cpu_buffer->new_pages;
1926 int retries, success;
1927
1928 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1929 /*
1930 * We are holding the reader lock, so the reader page won't be swapped
1931 * in the ring buffer. Now we are racing with the writer trying to
1932 * move head page and the tail page.
1933 * We are going to adapt the reader page update process where:
1934 * 1. We first splice the start and end of list of new pages between
1935 * the head page and its previous page.
1936 * 2. We cmpxchg the prev_page->next to point from head page to the
1937 * start of new pages list.
1938 * 3. Finally, we update the head->prev to the end of new list.
1939 *
1940 * We will try this process 10 times, to make sure that we don't keep
1941 * spinning.
1942 */
1943 retries = 10;
1944 success = 0;
1945 while (retries--) {
1946 struct list_head *head_page, *prev_page, *r;
1947 struct list_head *last_page, *first_page;
1948 struct list_head *head_page_with_bit;
1949
1950 head_page = &rb_set_head_page(cpu_buffer)->list;
1951 if (!head_page)
1952 break;
1953 prev_page = head_page->prev;
1954
1955 first_page = pages->next;
1956 last_page = pages->prev;
1957
1958 head_page_with_bit = (struct list_head *)
1959 ((unsigned long)head_page | RB_PAGE_HEAD);
1960
1961 last_page->next = head_page_with_bit;
1962 first_page->prev = prev_page;
1963
1964 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
1965
1966 if (r == head_page_with_bit) {
1967 /*
1968 * yay, we replaced the page pointer to our new list,
1969 * now, we just have to update to head page's prev
1970 * pointer to point to end of list
1971 */
1972 head_page->prev = last_page;
1973 success = 1;
1974 break;
1975 }
1976 }
1977
1978 if (success)
1979 INIT_LIST_HEAD(pages);
1980 /*
1981 * If we weren't successful in adding in new pages, warn and stop
1982 * tracing
1983 */
1984 RB_WARN_ON(cpu_buffer, !success);
1985 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1986
1987 /* free pages if they weren't inserted */
1988 if (!success) {
1989 struct buffer_page *bpage, *tmp;
1990 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
1991 list) {
1992 list_del_init(&bpage->list);
1993 free_buffer_page(bpage);
1994 }
1995 }
1996 return success;
1997 }
1998
rb_update_pages(struct ring_buffer_per_cpu * cpu_buffer)1999 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
2000 {
2001 int success;
2002
2003 if (cpu_buffer->nr_pages_to_update > 0)
2004 success = rb_insert_pages(cpu_buffer);
2005 else
2006 success = rb_remove_pages(cpu_buffer,
2007 -cpu_buffer->nr_pages_to_update);
2008
2009 if (success)
2010 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
2011 }
2012
update_pages_handler(struct work_struct * work)2013 static void update_pages_handler(struct work_struct *work)
2014 {
2015 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2016 struct ring_buffer_per_cpu, update_pages_work);
2017 rb_update_pages(cpu_buffer);
2018 complete(&cpu_buffer->update_done);
2019 }
2020
2021 /**
2022 * ring_buffer_resize - resize the ring buffer
2023 * @buffer: the buffer to resize.
2024 * @size: the new size.
2025 * @cpu_id: the cpu buffer to resize
2026 *
2027 * Minimum size is 2 * BUF_PAGE_SIZE.
2028 *
2029 * Returns 0 on success and < 0 on failure.
2030 */
ring_buffer_resize(struct trace_buffer * buffer,unsigned long size,int cpu_id)2031 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
2032 int cpu_id)
2033 {
2034 struct ring_buffer_per_cpu *cpu_buffer;
2035 unsigned long nr_pages;
2036 int cpu, err;
2037
2038 /*
2039 * Always succeed at resizing a non-existent buffer:
2040 */
2041 if (!buffer)
2042 return 0;
2043
2044 /* Make sure the requested buffer exists */
2045 if (cpu_id != RING_BUFFER_ALL_CPUS &&
2046 !cpumask_test_cpu(cpu_id, buffer->cpumask))
2047 return 0;
2048
2049 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
2050
2051 /* we need a minimum of two pages */
2052 if (nr_pages < 2)
2053 nr_pages = 2;
2054
2055 size = nr_pages * BUF_PAGE_SIZE;
2056
2057 /* prevent another thread from changing buffer sizes */
2058 mutex_lock(&buffer->mutex);
2059 atomic_inc(&buffer->resizing);
2060
2061 if (cpu_id == RING_BUFFER_ALL_CPUS) {
2062 /*
2063 * Don't succeed if resizing is disabled, as a reader might be
2064 * manipulating the ring buffer and is expecting a sane state while
2065 * this is true.
2066 */
2067 for_each_buffer_cpu(buffer, cpu) {
2068 cpu_buffer = buffer->buffers[cpu];
2069 if (atomic_read(&cpu_buffer->resize_disabled)) {
2070 err = -EBUSY;
2071 goto out_err_unlock;
2072 }
2073 }
2074
2075 /* calculate the pages to update */
2076 for_each_buffer_cpu(buffer, cpu) {
2077 cpu_buffer = buffer->buffers[cpu];
2078
2079 cpu_buffer->nr_pages_to_update = nr_pages -
2080 cpu_buffer->nr_pages;
2081 /*
2082 * nothing more to do for removing pages or no update
2083 */
2084 if (cpu_buffer->nr_pages_to_update <= 0)
2085 continue;
2086 /*
2087 * to add pages, make sure all new pages can be
2088 * allocated without receiving ENOMEM
2089 */
2090 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2091 if (__rb_allocate_pages(cpu_buffer->nr_pages_to_update,
2092 &cpu_buffer->new_pages, cpu)) {
2093 /* not enough memory for new pages */
2094 err = -ENOMEM;
2095 goto out_err;
2096 }
2097
2098 cond_resched();
2099 }
2100
2101 get_online_cpus();
2102 /*
2103 * Fire off all the required work handlers
2104 * We can't schedule on offline CPUs, but it's not necessary
2105 * since we can change their buffer sizes without any race.
2106 */
2107 for_each_buffer_cpu(buffer, cpu) {
2108 cpu_buffer = buffer->buffers[cpu];
2109 if (!cpu_buffer->nr_pages_to_update)
2110 continue;
2111
2112 /* Can't run something on an offline CPU. */
2113 if (!cpu_online(cpu)) {
2114 rb_update_pages(cpu_buffer);
2115 cpu_buffer->nr_pages_to_update = 0;
2116 } else {
2117 schedule_work_on(cpu,
2118 &cpu_buffer->update_pages_work);
2119 }
2120 }
2121
2122 /* wait for all the updates to complete */
2123 for_each_buffer_cpu(buffer, cpu) {
2124 cpu_buffer = buffer->buffers[cpu];
2125 if (!cpu_buffer->nr_pages_to_update)
2126 continue;
2127
2128 if (cpu_online(cpu))
2129 wait_for_completion(&cpu_buffer->update_done);
2130 cpu_buffer->nr_pages_to_update = 0;
2131 }
2132
2133 put_online_cpus();
2134 } else {
2135 /* Make sure this CPU has been initialized */
2136 if (!cpumask_test_cpu(cpu_id, buffer->cpumask))
2137 goto out;
2138
2139 cpu_buffer = buffer->buffers[cpu_id];
2140
2141 if (nr_pages == cpu_buffer->nr_pages)
2142 goto out;
2143
2144 /*
2145 * Don't succeed if resizing is disabled, as a reader might be
2146 * manipulating the ring buffer and is expecting a sane state while
2147 * this is true.
2148 */
2149 if (atomic_read(&cpu_buffer->resize_disabled)) {
2150 err = -EBUSY;
2151 goto out_err_unlock;
2152 }
2153
2154 cpu_buffer->nr_pages_to_update = nr_pages -
2155 cpu_buffer->nr_pages;
2156
2157 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2158 if (cpu_buffer->nr_pages_to_update > 0 &&
2159 __rb_allocate_pages(cpu_buffer->nr_pages_to_update,
2160 &cpu_buffer->new_pages, cpu_id)) {
2161 err = -ENOMEM;
2162 goto out_err;
2163 }
2164
2165 get_online_cpus();
2166
2167 /* Can't run something on an offline CPU. */
2168 if (!cpu_online(cpu_id))
2169 rb_update_pages(cpu_buffer);
2170 else {
2171 schedule_work_on(cpu_id,
2172 &cpu_buffer->update_pages_work);
2173 wait_for_completion(&cpu_buffer->update_done);
2174 }
2175
2176 cpu_buffer->nr_pages_to_update = 0;
2177 put_online_cpus();
2178 }
2179
2180 out:
2181 /*
2182 * The ring buffer resize can happen with the ring buffer
2183 * enabled, so that the update disturbs the tracing as little
2184 * as possible. But if the buffer is disabled, we do not need
2185 * to worry about that, and we can take the time to verify
2186 * that the buffer is not corrupt.
2187 */
2188 if (atomic_read(&buffer->record_disabled)) {
2189 atomic_inc(&buffer->record_disabled);
2190 /*
2191 * Even though the buffer was disabled, we must make sure
2192 * that it is truly disabled before calling rb_check_pages.
2193 * There could have been a race between checking
2194 * record_disable and incrementing it.
2195 */
2196 synchronize_rcu();
2197 for_each_buffer_cpu(buffer, cpu) {
2198 cpu_buffer = buffer->buffers[cpu];
2199 rb_check_pages(cpu_buffer);
2200 }
2201 atomic_dec(&buffer->record_disabled);
2202 }
2203
2204 atomic_dec(&buffer->resizing);
2205 mutex_unlock(&buffer->mutex);
2206 return 0;
2207
2208 out_err:
2209 for_each_buffer_cpu(buffer, cpu) {
2210 struct buffer_page *bpage, *tmp;
2211
2212 cpu_buffer = buffer->buffers[cpu];
2213 cpu_buffer->nr_pages_to_update = 0;
2214
2215 if (list_empty(&cpu_buffer->new_pages))
2216 continue;
2217
2218 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2219 list) {
2220 list_del_init(&bpage->list);
2221 free_buffer_page(bpage);
2222 }
2223 }
2224 out_err_unlock:
2225 atomic_dec(&buffer->resizing);
2226 mutex_unlock(&buffer->mutex);
2227 return err;
2228 }
2229 EXPORT_SYMBOL_GPL(ring_buffer_resize);
2230
ring_buffer_change_overwrite(struct trace_buffer * buffer,int val)2231 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
2232 {
2233 mutex_lock(&buffer->mutex);
2234 if (val)
2235 buffer->flags |= RB_FL_OVERWRITE;
2236 else
2237 buffer->flags &= ~RB_FL_OVERWRITE;
2238 mutex_unlock(&buffer->mutex);
2239 }
2240 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2241
__rb_page_index(struct buffer_page * bpage,unsigned index)2242 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2243 {
2244 return bpage->page->data + index;
2245 }
2246
2247 static __always_inline struct ring_buffer_event *
rb_reader_event(struct ring_buffer_per_cpu * cpu_buffer)2248 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2249 {
2250 return __rb_page_index(cpu_buffer->reader_page,
2251 cpu_buffer->reader_page->read);
2252 }
2253
2254 static struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter * iter)2255 rb_iter_head_event(struct ring_buffer_iter *iter)
2256 {
2257 struct ring_buffer_event *event;
2258 struct buffer_page *iter_head_page = iter->head_page;
2259 unsigned long commit;
2260 unsigned length;
2261
2262 if (iter->head != iter->next_event)
2263 return iter->event;
2264
2265 /*
2266 * When the writer goes across pages, it issues a cmpxchg which
2267 * is a mb(), which will synchronize with the rmb here.
2268 * (see rb_tail_page_update() and __rb_reserve_next())
2269 */
2270 commit = rb_page_commit(iter_head_page);
2271 smp_rmb();
2272
2273 /* An event needs to be at least 8 bytes in size */
2274 if (iter->head > commit - 8)
2275 goto reset;
2276
2277 event = __rb_page_index(iter_head_page, iter->head);
2278 length = rb_event_length(event);
2279
2280 /*
2281 * READ_ONCE() doesn't work on functions and we don't want the
2282 * compiler doing any crazy optimizations with length.
2283 */
2284 barrier();
2285
2286 if ((iter->head + length) > commit || length > BUF_PAGE_SIZE)
2287 /* Writer corrupted the read? */
2288 goto reset;
2289
2290 memcpy(iter->event, event, length);
2291 /*
2292 * If the page stamp is still the same after this rmb() then the
2293 * event was safely copied without the writer entering the page.
2294 */
2295 smp_rmb();
2296
2297 /* Make sure the page didn't change since we read this */
2298 if (iter->page_stamp != iter_head_page->page->time_stamp ||
2299 commit > rb_page_commit(iter_head_page))
2300 goto reset;
2301
2302 iter->next_event = iter->head + length;
2303 return iter->event;
2304 reset:
2305 /* Reset to the beginning */
2306 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2307 iter->head = 0;
2308 iter->next_event = 0;
2309 iter->missed_events = 1;
2310 return NULL;
2311 }
2312
2313 /* Size is determined by what has been committed */
rb_page_size(struct buffer_page * bpage)2314 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2315 {
2316 return rb_page_commit(bpage);
2317 }
2318
2319 static __always_inline unsigned
rb_commit_index(struct ring_buffer_per_cpu * cpu_buffer)2320 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2321 {
2322 return rb_page_commit(cpu_buffer->commit_page);
2323 }
2324
2325 static __always_inline unsigned
rb_event_index(struct ring_buffer_event * event)2326 rb_event_index(struct ring_buffer_event *event)
2327 {
2328 unsigned long addr = (unsigned long)event;
2329
2330 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
2331 }
2332
rb_inc_iter(struct ring_buffer_iter * iter)2333 static void rb_inc_iter(struct ring_buffer_iter *iter)
2334 {
2335 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2336
2337 /*
2338 * The iterator could be on the reader page (it starts there).
2339 * But the head could have moved, since the reader was
2340 * found. Check for this case and assign the iterator
2341 * to the head page instead of next.
2342 */
2343 if (iter->head_page == cpu_buffer->reader_page)
2344 iter->head_page = rb_set_head_page(cpu_buffer);
2345 else
2346 rb_inc_page(cpu_buffer, &iter->head_page);
2347
2348 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2349 iter->head = 0;
2350 iter->next_event = 0;
2351 }
2352
2353 /*
2354 * rb_handle_head_page - writer hit the head page
2355 *
2356 * Returns: +1 to retry page
2357 * 0 to continue
2358 * -1 on error
2359 */
2360 static int
rb_handle_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)2361 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2362 struct buffer_page *tail_page,
2363 struct buffer_page *next_page)
2364 {
2365 struct buffer_page *new_head;
2366 int entries;
2367 int type;
2368 int ret;
2369
2370 entries = rb_page_entries(next_page);
2371
2372 /*
2373 * The hard part is here. We need to move the head
2374 * forward, and protect against both readers on
2375 * other CPUs and writers coming in via interrupts.
2376 */
2377 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2378 RB_PAGE_HEAD);
2379
2380 /*
2381 * type can be one of four:
2382 * NORMAL - an interrupt already moved it for us
2383 * HEAD - we are the first to get here.
2384 * UPDATE - we are the interrupt interrupting
2385 * a current move.
2386 * MOVED - a reader on another CPU moved the next
2387 * pointer to its reader page. Give up
2388 * and try again.
2389 */
2390
2391 switch (type) {
2392 case RB_PAGE_HEAD:
2393 /*
2394 * We changed the head to UPDATE, thus
2395 * it is our responsibility to update
2396 * the counters.
2397 */
2398 local_add(entries, &cpu_buffer->overrun);
2399 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
2400 local_inc(&cpu_buffer->pages_lost);
2401
2402 /*
2403 * The entries will be zeroed out when we move the
2404 * tail page.
2405 */
2406
2407 /* still more to do */
2408 break;
2409
2410 case RB_PAGE_UPDATE:
2411 /*
2412 * This is an interrupt that interrupt the
2413 * previous update. Still more to do.
2414 */
2415 break;
2416 case RB_PAGE_NORMAL:
2417 /*
2418 * An interrupt came in before the update
2419 * and processed this for us.
2420 * Nothing left to do.
2421 */
2422 return 1;
2423 case RB_PAGE_MOVED:
2424 /*
2425 * The reader is on another CPU and just did
2426 * a swap with our next_page.
2427 * Try again.
2428 */
2429 return 1;
2430 default:
2431 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2432 return -1;
2433 }
2434
2435 /*
2436 * Now that we are here, the old head pointer is
2437 * set to UPDATE. This will keep the reader from
2438 * swapping the head page with the reader page.
2439 * The reader (on another CPU) will spin till
2440 * we are finished.
2441 *
2442 * We just need to protect against interrupts
2443 * doing the job. We will set the next pointer
2444 * to HEAD. After that, we set the old pointer
2445 * to NORMAL, but only if it was HEAD before.
2446 * otherwise we are an interrupt, and only
2447 * want the outer most commit to reset it.
2448 */
2449 new_head = next_page;
2450 rb_inc_page(cpu_buffer, &new_head);
2451
2452 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2453 RB_PAGE_NORMAL);
2454
2455 /*
2456 * Valid returns are:
2457 * HEAD - an interrupt came in and already set it.
2458 * NORMAL - One of two things:
2459 * 1) We really set it.
2460 * 2) A bunch of interrupts came in and moved
2461 * the page forward again.
2462 */
2463 switch (ret) {
2464 case RB_PAGE_HEAD:
2465 case RB_PAGE_NORMAL:
2466 /* OK */
2467 break;
2468 default:
2469 RB_WARN_ON(cpu_buffer, 1);
2470 return -1;
2471 }
2472
2473 /*
2474 * It is possible that an interrupt came in,
2475 * set the head up, then more interrupts came in
2476 * and moved it again. When we get back here,
2477 * the page would have been set to NORMAL but we
2478 * just set it back to HEAD.
2479 *
2480 * How do you detect this? Well, if that happened
2481 * the tail page would have moved.
2482 */
2483 if (ret == RB_PAGE_NORMAL) {
2484 struct buffer_page *buffer_tail_page;
2485
2486 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2487 /*
2488 * If the tail had moved passed next, then we need
2489 * to reset the pointer.
2490 */
2491 if (buffer_tail_page != tail_page &&
2492 buffer_tail_page != next_page)
2493 rb_head_page_set_normal(cpu_buffer, new_head,
2494 next_page,
2495 RB_PAGE_HEAD);
2496 }
2497
2498 /*
2499 * If this was the outer most commit (the one that
2500 * changed the original pointer from HEAD to UPDATE),
2501 * then it is up to us to reset it to NORMAL.
2502 */
2503 if (type == RB_PAGE_HEAD) {
2504 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2505 tail_page,
2506 RB_PAGE_UPDATE);
2507 if (RB_WARN_ON(cpu_buffer,
2508 ret != RB_PAGE_UPDATE))
2509 return -1;
2510 }
2511
2512 return 0;
2513 }
2514
2515 static inline void
rb_reset_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2516 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2517 unsigned long tail, struct rb_event_info *info)
2518 {
2519 struct buffer_page *tail_page = info->tail_page;
2520 struct ring_buffer_event *event;
2521 unsigned long length = info->length;
2522
2523 /*
2524 * Only the event that crossed the page boundary
2525 * must fill the old tail_page with padding.
2526 */
2527 if (tail >= BUF_PAGE_SIZE) {
2528 /*
2529 * If the page was filled, then we still need
2530 * to update the real_end. Reset it to zero
2531 * and the reader will ignore it.
2532 */
2533 if (tail == BUF_PAGE_SIZE)
2534 tail_page->real_end = 0;
2535
2536 local_sub(length, &tail_page->write);
2537 return;
2538 }
2539
2540 event = __rb_page_index(tail_page, tail);
2541
2542 /*
2543 * Save the original length to the meta data.
2544 * This will be used by the reader to add lost event
2545 * counter.
2546 */
2547 tail_page->real_end = tail;
2548
2549 /*
2550 * If this event is bigger than the minimum size, then
2551 * we need to be careful that we don't subtract the
2552 * write counter enough to allow another writer to slip
2553 * in on this page.
2554 * We put in a discarded commit instead, to make sure
2555 * that this space is not used again, and this space will
2556 * not be accounted into 'entries_bytes'.
2557 *
2558 * If we are less than the minimum size, we don't need to
2559 * worry about it.
2560 */
2561 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2562 /* No room for any events */
2563
2564 /* Mark the rest of the page with padding */
2565 rb_event_set_padding(event);
2566
2567 /* Make sure the padding is visible before the write update */
2568 smp_wmb();
2569
2570 /* Set the write back to the previous setting */
2571 local_sub(length, &tail_page->write);
2572 return;
2573 }
2574
2575 /* Put in a discarded event */
2576 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2577 event->type_len = RINGBUF_TYPE_PADDING;
2578 /* time delta must be non zero */
2579 event->time_delta = 1;
2580
2581 /* account for padding bytes */
2582 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2583
2584 /* Make sure the padding is visible before the tail_page->write update */
2585 smp_wmb();
2586
2587 /* Set write to end of buffer */
2588 length = (tail + length) - BUF_PAGE_SIZE;
2589 local_sub(length, &tail_page->write);
2590 }
2591
2592 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2593
2594 /*
2595 * This is the slow path, force gcc not to inline it.
2596 */
2597 static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2598 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2599 unsigned long tail, struct rb_event_info *info)
2600 {
2601 struct buffer_page *tail_page = info->tail_page;
2602 struct buffer_page *commit_page = cpu_buffer->commit_page;
2603 struct trace_buffer *buffer = cpu_buffer->buffer;
2604 struct buffer_page *next_page;
2605 int ret;
2606
2607 next_page = tail_page;
2608
2609 rb_inc_page(cpu_buffer, &next_page);
2610
2611 /*
2612 * If for some reason, we had an interrupt storm that made
2613 * it all the way around the buffer, bail, and warn
2614 * about it.
2615 */
2616 if (unlikely(next_page == commit_page)) {
2617 local_inc(&cpu_buffer->commit_overrun);
2618 goto out_reset;
2619 }
2620
2621 /*
2622 * This is where the fun begins!
2623 *
2624 * We are fighting against races between a reader that
2625 * could be on another CPU trying to swap its reader
2626 * page with the buffer head.
2627 *
2628 * We are also fighting against interrupts coming in and
2629 * moving the head or tail on us as well.
2630 *
2631 * If the next page is the head page then we have filled
2632 * the buffer, unless the commit page is still on the
2633 * reader page.
2634 */
2635 if (rb_is_head_page(cpu_buffer, next_page, &tail_page->list)) {
2636
2637 /*
2638 * If the commit is not on the reader page, then
2639 * move the header page.
2640 */
2641 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2642 /*
2643 * If we are not in overwrite mode,
2644 * this is easy, just stop here.
2645 */
2646 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2647 local_inc(&cpu_buffer->dropped_events);
2648 goto out_reset;
2649 }
2650
2651 ret = rb_handle_head_page(cpu_buffer,
2652 tail_page,
2653 next_page);
2654 if (ret < 0)
2655 goto out_reset;
2656 if (ret)
2657 goto out_again;
2658 } else {
2659 /*
2660 * We need to be careful here too. The
2661 * commit page could still be on the reader
2662 * page. We could have a small buffer, and
2663 * have filled up the buffer with events
2664 * from interrupts and such, and wrapped.
2665 *
2666 * Note, if the tail page is also the on the
2667 * reader_page, we let it move out.
2668 */
2669 if (unlikely((cpu_buffer->commit_page !=
2670 cpu_buffer->tail_page) &&
2671 (cpu_buffer->commit_page ==
2672 cpu_buffer->reader_page))) {
2673 local_inc(&cpu_buffer->commit_overrun);
2674 goto out_reset;
2675 }
2676 }
2677 }
2678
2679 rb_tail_page_update(cpu_buffer, tail_page, next_page);
2680
2681 out_again:
2682
2683 rb_reset_tail(cpu_buffer, tail, info);
2684
2685 /* Commit what we have for now. */
2686 rb_end_commit(cpu_buffer);
2687 /* rb_end_commit() decs committing */
2688 local_inc(&cpu_buffer->committing);
2689
2690 /* fail and let the caller try again */
2691 return ERR_PTR(-EAGAIN);
2692
2693 out_reset:
2694 /* reset write */
2695 rb_reset_tail(cpu_buffer, tail, info);
2696
2697 return NULL;
2698 }
2699
2700 /* Slow path */
2701 static struct ring_buffer_event *
rb_add_time_stamp(struct ring_buffer_event * event,u64 delta,bool abs)2702 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2703 {
2704 if (abs)
2705 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2706 else
2707 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2708
2709 /* Not the first event on the page, or not delta? */
2710 if (abs || rb_event_index(event)) {
2711 event->time_delta = delta & TS_MASK;
2712 event->array[0] = delta >> TS_SHIFT;
2713 } else {
2714 /* nope, just zero it */
2715 event->time_delta = 0;
2716 event->array[0] = 0;
2717 }
2718
2719 return skip_time_extend(event);
2720 }
2721
2722 static inline bool rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2723 struct ring_buffer_event *event);
2724
2725 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
sched_clock_stable(void)2726 static inline bool sched_clock_stable(void)
2727 {
2728 return true;
2729 }
2730 #endif
2731
2732 static void
rb_check_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)2733 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2734 struct rb_event_info *info)
2735 {
2736 u64 write_stamp;
2737
2738 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
2739 (unsigned long long)info->delta,
2740 (unsigned long long)info->ts,
2741 (unsigned long long)info->before,
2742 (unsigned long long)info->after,
2743 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
2744 sched_clock_stable() ? "" :
2745 "If you just came from a suspend/resume,\n"
2746 "please switch to the trace global clock:\n"
2747 " echo global > /sys/kernel/debug/tracing/trace_clock\n"
2748 "or add trace_clock=global to the kernel command line\n");
2749 }
2750
rb_add_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event ** event,struct rb_event_info * info,u64 * delta,unsigned int * length)2751 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2752 struct ring_buffer_event **event,
2753 struct rb_event_info *info,
2754 u64 *delta,
2755 unsigned int *length)
2756 {
2757 bool abs = info->add_timestamp &
2758 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2759
2760 if (unlikely(info->delta > (1ULL << 59))) {
2761 /* did the clock go backwards */
2762 if (info->before == info->after && info->before > info->ts) {
2763 /* not interrupted */
2764 static int once;
2765
2766 /*
2767 * This is possible with a recalibrating of the TSC.
2768 * Do not produce a call stack, but just report it.
2769 */
2770 if (!once) {
2771 once++;
2772 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2773 info->before, info->ts);
2774 }
2775 } else
2776 rb_check_timestamp(cpu_buffer, info);
2777 if (!abs)
2778 info->delta = 0;
2779 }
2780 *event = rb_add_time_stamp(*event, info->delta, abs);
2781 *length -= RB_LEN_TIME_EXTEND;
2782 *delta = 0;
2783 }
2784
2785 /**
2786 * rb_update_event - update event type and data
2787 * @cpu_buffer: The per cpu buffer of the @event
2788 * @event: the event to update
2789 * @info: The info to update the @event with (contains length and delta)
2790 *
2791 * Update the type and data fields of the @event. The length
2792 * is the actual size that is written to the ring buffer,
2793 * and with this, we can determine what to place into the
2794 * data field.
2795 */
2796 static void
rb_update_event(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event,struct rb_event_info * info)2797 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2798 struct ring_buffer_event *event,
2799 struct rb_event_info *info)
2800 {
2801 unsigned length = info->length;
2802 u64 delta = info->delta;
2803
2804 /*
2805 * If we need to add a timestamp, then we
2806 * add it to the start of the reserved space.
2807 */
2808 if (unlikely(info->add_timestamp))
2809 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
2810
2811 event->time_delta = delta;
2812 length -= RB_EVNT_HDR_SIZE;
2813 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2814 event->type_len = 0;
2815 event->array[0] = length;
2816 } else
2817 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2818 }
2819
rb_calculate_event_length(unsigned length)2820 static unsigned rb_calculate_event_length(unsigned length)
2821 {
2822 struct ring_buffer_event event; /* Used only for sizeof array */
2823
2824 /* zero length can cause confusions */
2825 if (!length)
2826 length++;
2827
2828 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2829 length += sizeof(event.array[0]);
2830
2831 length += RB_EVNT_HDR_SIZE;
2832 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2833
2834 /*
2835 * In case the time delta is larger than the 27 bits for it
2836 * in the header, we need to add a timestamp. If another
2837 * event comes in when trying to discard this one to increase
2838 * the length, then the timestamp will be added in the allocated
2839 * space of this event. If length is bigger than the size needed
2840 * for the TIME_EXTEND, then padding has to be used. The events
2841 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2842 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2843 * As length is a multiple of 4, we only need to worry if it
2844 * is 12 (RB_LEN_TIME_EXTEND + 4).
2845 */
2846 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2847 length += RB_ALIGNMENT;
2848
2849 return length;
2850 }
2851
2852 static __always_inline bool
rb_event_is_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2853 rb_event_is_commit(struct ring_buffer_per_cpu *cpu_buffer,
2854 struct ring_buffer_event *event)
2855 {
2856 unsigned long addr = (unsigned long)event;
2857 unsigned long index;
2858
2859 index = rb_event_index(event);
2860 addr &= PAGE_MASK;
2861
2862 return cpu_buffer->commit_page->page == (void *)addr &&
2863 rb_commit_index(cpu_buffer) == index;
2864 }
2865
rb_time_delta(struct ring_buffer_event * event)2866 static u64 rb_time_delta(struct ring_buffer_event *event)
2867 {
2868 switch (event->type_len) {
2869 case RINGBUF_TYPE_PADDING:
2870 return 0;
2871
2872 case RINGBUF_TYPE_TIME_EXTEND:
2873 return ring_buffer_event_time_stamp(event);
2874
2875 case RINGBUF_TYPE_TIME_STAMP:
2876 return 0;
2877
2878 case RINGBUF_TYPE_DATA:
2879 return event->time_delta;
2880 default:
2881 return 0;
2882 }
2883 }
2884
2885 static inline int
rb_try_to_discard(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2886 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2887 struct ring_buffer_event *event)
2888 {
2889 unsigned long new_index, old_index;
2890 struct buffer_page *bpage;
2891 unsigned long index;
2892 unsigned long addr;
2893 u64 write_stamp;
2894 u64 delta;
2895
2896 new_index = rb_event_index(event);
2897 old_index = new_index + rb_event_ts_length(event);
2898 addr = (unsigned long)event;
2899 addr &= PAGE_MASK;
2900
2901 bpage = READ_ONCE(cpu_buffer->tail_page);
2902
2903 delta = rb_time_delta(event);
2904
2905 if (!rb_time_read(&cpu_buffer->write_stamp, &write_stamp))
2906 return 0;
2907
2908 /* Make sure the write stamp is read before testing the location */
2909 barrier();
2910
2911 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2912 unsigned long write_mask =
2913 local_read(&bpage->write) & ~RB_WRITE_MASK;
2914 unsigned long event_length = rb_event_length(event);
2915
2916 /*
2917 * For the before_stamp to be different than the write_stamp
2918 * to make sure that the next event adds an absolute
2919 * value and does not rely on the saved write stamp, which
2920 * is now going to be bogus.
2921 */
2922 rb_time_set(&cpu_buffer->before_stamp, 0);
2923
2924 /* Something came in, can't discard */
2925 if (!rb_time_cmpxchg(&cpu_buffer->write_stamp,
2926 write_stamp, write_stamp - delta))
2927 return 0;
2928
2929 /*
2930 * If an event were to come in now, it would see that the
2931 * write_stamp and the before_stamp are different, and assume
2932 * that this event just added itself before updating
2933 * the write stamp. The interrupting event will fix the
2934 * write stamp for us, and use the before stamp as its delta.
2935 */
2936
2937 /*
2938 * This is on the tail page. It is possible that
2939 * a write could come in and move the tail page
2940 * and write to the next page. That is fine
2941 * because we just shorten what is on this page.
2942 */
2943 old_index += write_mask;
2944 new_index += write_mask;
2945 index = local_cmpxchg(&bpage->write, old_index, new_index);
2946 if (index == old_index) {
2947 /* update counters */
2948 local_sub(event_length, &cpu_buffer->entries_bytes);
2949 return 1;
2950 }
2951 }
2952
2953 /* could not discard */
2954 return 0;
2955 }
2956
rb_start_commit(struct ring_buffer_per_cpu * cpu_buffer)2957 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2958 {
2959 local_inc(&cpu_buffer->committing);
2960 local_inc(&cpu_buffer->commits);
2961 }
2962
2963 static __always_inline void
rb_set_commit_to_write(struct ring_buffer_per_cpu * cpu_buffer)2964 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2965 {
2966 unsigned long max_count;
2967
2968 /*
2969 * We only race with interrupts and NMIs on this CPU.
2970 * If we own the commit event, then we can commit
2971 * all others that interrupted us, since the interruptions
2972 * are in stack format (they finish before they come
2973 * back to us). This allows us to do a simple loop to
2974 * assign the commit to the tail.
2975 */
2976 again:
2977 max_count = cpu_buffer->nr_pages * 100;
2978
2979 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2980 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2981 return;
2982 if (RB_WARN_ON(cpu_buffer,
2983 rb_is_reader_page(cpu_buffer->tail_page)))
2984 return;
2985 /*
2986 * No need for a memory barrier here, as the update
2987 * of the tail_page did it for this page.
2988 */
2989 local_set(&cpu_buffer->commit_page->page->commit,
2990 rb_page_write(cpu_buffer->commit_page));
2991 rb_inc_page(cpu_buffer, &cpu_buffer->commit_page);
2992 /* add barrier to keep gcc from optimizing too much */
2993 barrier();
2994 }
2995 while (rb_commit_index(cpu_buffer) !=
2996 rb_page_write(cpu_buffer->commit_page)) {
2997
2998 /* Make sure the readers see the content of what is committed. */
2999 smp_wmb();
3000 local_set(&cpu_buffer->commit_page->page->commit,
3001 rb_page_write(cpu_buffer->commit_page));
3002 RB_WARN_ON(cpu_buffer,
3003 local_read(&cpu_buffer->commit_page->page->commit) &
3004 ~RB_WRITE_MASK);
3005 barrier();
3006 }
3007
3008 /* again, keep gcc from optimizing */
3009 barrier();
3010
3011 /*
3012 * If an interrupt came in just after the first while loop
3013 * and pushed the tail page forward, we will be left with
3014 * a dangling commit that will never go forward.
3015 */
3016 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
3017 goto again;
3018 }
3019
rb_end_commit(struct ring_buffer_per_cpu * cpu_buffer)3020 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
3021 {
3022 unsigned long commits;
3023
3024 if (RB_WARN_ON(cpu_buffer,
3025 !local_read(&cpu_buffer->committing)))
3026 return;
3027
3028 again:
3029 commits = local_read(&cpu_buffer->commits);
3030 /* synchronize with interrupts */
3031 barrier();
3032 if (local_read(&cpu_buffer->committing) == 1)
3033 rb_set_commit_to_write(cpu_buffer);
3034
3035 local_dec(&cpu_buffer->committing);
3036
3037 /* synchronize with interrupts */
3038 barrier();
3039
3040 /*
3041 * Need to account for interrupts coming in between the
3042 * updating of the commit page and the clearing of the
3043 * committing counter.
3044 */
3045 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3046 !local_read(&cpu_buffer->committing)) {
3047 local_inc(&cpu_buffer->committing);
3048 goto again;
3049 }
3050 }
3051
rb_event_discard(struct ring_buffer_event * event)3052 static inline void rb_event_discard(struct ring_buffer_event *event)
3053 {
3054 if (extended_time(event))
3055 event = skip_time_extend(event);
3056
3057 /* array[0] holds the actual length for the discarded event */
3058 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3059 event->type_len = RINGBUF_TYPE_PADDING;
3060 /* time delta must be non zero */
3061 if (!event->time_delta)
3062 event->time_delta = 1;
3063 }
3064
rb_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3065 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
3066 struct ring_buffer_event *event)
3067 {
3068 local_inc(&cpu_buffer->entries);
3069 rb_end_commit(cpu_buffer);
3070 }
3071
3072 static __always_inline void
rb_wakeups(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer)3073 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
3074 {
3075 if (buffer->irq_work.waiters_pending) {
3076 buffer->irq_work.waiters_pending = false;
3077 /* irq_work_queue() supplies it's own memory barriers */
3078 irq_work_queue(&buffer->irq_work.work);
3079 }
3080
3081 if (cpu_buffer->irq_work.waiters_pending) {
3082 cpu_buffer->irq_work.waiters_pending = false;
3083 /* irq_work_queue() supplies it's own memory barriers */
3084 irq_work_queue(&cpu_buffer->irq_work.work);
3085 }
3086
3087 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3088 return;
3089
3090 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3091 return;
3092
3093 if (!cpu_buffer->irq_work.full_waiters_pending)
3094 return;
3095
3096 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3097
3098 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
3099 return;
3100
3101 cpu_buffer->irq_work.wakeup_full = true;
3102 cpu_buffer->irq_work.full_waiters_pending = false;
3103 /* irq_work_queue() supplies it's own memory barriers */
3104 irq_work_queue(&cpu_buffer->irq_work.work);
3105 }
3106
3107 /*
3108 * The lock and unlock are done within a preempt disable section.
3109 * The current_context per_cpu variable can only be modified
3110 * by the current task between lock and unlock. But it can
3111 * be modified more than once via an interrupt. To pass this
3112 * information from the lock to the unlock without having to
3113 * access the 'in_interrupt()' functions again (which do show
3114 * a bit of overhead in something as critical as function tracing,
3115 * we use a bitmask trick.
3116 *
3117 * bit 1 = NMI context
3118 * bit 2 = IRQ context
3119 * bit 3 = SoftIRQ context
3120 * bit 4 = normal context.
3121 *
3122 * This works because this is the order of contexts that can
3123 * preempt other contexts. A SoftIRQ never preempts an IRQ
3124 * context.
3125 *
3126 * When the context is determined, the corresponding bit is
3127 * checked and set (if it was set, then a recursion of that context
3128 * happened).
3129 *
3130 * On unlock, we need to clear this bit. To do so, just subtract
3131 * 1 from the current_context and AND it to itself.
3132 *
3133 * (binary)
3134 * 101 - 1 = 100
3135 * 101 & 100 = 100 (clearing bit zero)
3136 *
3137 * 1010 - 1 = 1001
3138 * 1010 & 1001 = 1000 (clearing bit 1)
3139 *
3140 * The least significant bit can be cleared this way, and it
3141 * just so happens that it is the same bit corresponding to
3142 * the current context.
3143 *
3144 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3145 * is set when a recursion is detected at the current context, and if
3146 * the TRANSITION bit is already set, it will fail the recursion.
3147 * This is needed because there's a lag between the changing of
3148 * interrupt context and updating the preempt count. In this case,
3149 * a false positive will be found. To handle this, one extra recursion
3150 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3151 * bit is already set, then it is considered a recursion and the function
3152 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3153 *
3154 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3155 * to be cleared. Even if it wasn't the context that set it. That is,
3156 * if an interrupt comes in while NORMAL bit is set and the ring buffer
3157 * is called before preempt_count() is updated, since the check will
3158 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3159 * NMI then comes in, it will set the NMI bit, but when the NMI code
3160 * does the trace_recursive_unlock() it will clear the TRANSTION bit
3161 * and leave the NMI bit set. But this is fine, because the interrupt
3162 * code that set the TRANSITION bit will then clear the NMI bit when it
3163 * calls trace_recursive_unlock(). If another NMI comes in, it will
3164 * set the TRANSITION bit and continue.
3165 *
3166 * Note: The TRANSITION bit only handles a single transition between context.
3167 */
3168
3169 static __always_inline int
trace_recursive_lock(struct ring_buffer_per_cpu * cpu_buffer)3170 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3171 {
3172 unsigned int val = cpu_buffer->current_context;
3173 unsigned long pc = preempt_count();
3174 int bit;
3175
3176 if (!(pc & (NMI_MASK | HARDIRQ_MASK | SOFTIRQ_OFFSET)))
3177 bit = RB_CTX_NORMAL;
3178 else
3179 bit = pc & NMI_MASK ? RB_CTX_NMI :
3180 pc & HARDIRQ_MASK ? RB_CTX_IRQ : RB_CTX_SOFTIRQ;
3181
3182 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3183 /*
3184 * It is possible that this was called by transitioning
3185 * between interrupt context, and preempt_count() has not
3186 * been updated yet. In this case, use the TRANSITION bit.
3187 */
3188 bit = RB_CTX_TRANSITION;
3189 if (val & (1 << (bit + cpu_buffer->nest)))
3190 return 1;
3191 }
3192
3193 val |= (1 << (bit + cpu_buffer->nest));
3194 cpu_buffer->current_context = val;
3195
3196 return 0;
3197 }
3198
3199 static __always_inline void
trace_recursive_unlock(struct ring_buffer_per_cpu * cpu_buffer)3200 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3201 {
3202 cpu_buffer->current_context &=
3203 cpu_buffer->current_context - (1 << cpu_buffer->nest);
3204 }
3205
3206 /* The recursive locking above uses 5 bits */
3207 #define NESTED_BITS 5
3208
3209 /**
3210 * ring_buffer_nest_start - Allow to trace while nested
3211 * @buffer: The ring buffer to modify
3212 *
3213 * The ring buffer has a safety mechanism to prevent recursion.
3214 * But there may be a case where a trace needs to be done while
3215 * tracing something else. In this case, calling this function
3216 * will allow this function to nest within a currently active
3217 * ring_buffer_lock_reserve().
3218 *
3219 * Call this function before calling another ring_buffer_lock_reserve() and
3220 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3221 */
ring_buffer_nest_start(struct trace_buffer * buffer)3222 void ring_buffer_nest_start(struct trace_buffer *buffer)
3223 {
3224 struct ring_buffer_per_cpu *cpu_buffer;
3225 int cpu;
3226
3227 /* Enabled by ring_buffer_nest_end() */
3228 preempt_disable_notrace();
3229 cpu = raw_smp_processor_id();
3230 cpu_buffer = buffer->buffers[cpu];
3231 /* This is the shift value for the above recursive locking */
3232 cpu_buffer->nest += NESTED_BITS;
3233 }
3234
3235 /**
3236 * ring_buffer_nest_end - Allow to trace while nested
3237 * @buffer: The ring buffer to modify
3238 *
3239 * Must be called after ring_buffer_nest_start() and after the
3240 * ring_buffer_unlock_commit().
3241 */
ring_buffer_nest_end(struct trace_buffer * buffer)3242 void ring_buffer_nest_end(struct trace_buffer *buffer)
3243 {
3244 struct ring_buffer_per_cpu *cpu_buffer;
3245 int cpu;
3246
3247 /* disabled by ring_buffer_nest_start() */
3248 cpu = raw_smp_processor_id();
3249 cpu_buffer = buffer->buffers[cpu];
3250 /* This is the shift value for the above recursive locking */
3251 cpu_buffer->nest -= NESTED_BITS;
3252 preempt_enable_notrace();
3253 }
3254
3255 /**
3256 * ring_buffer_unlock_commit - commit a reserved
3257 * @buffer: The buffer to commit to
3258 * @event: The event pointer to commit.
3259 *
3260 * This commits the data to the ring buffer, and releases any locks held.
3261 *
3262 * Must be paired with ring_buffer_lock_reserve.
3263 */
ring_buffer_unlock_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3264 int ring_buffer_unlock_commit(struct trace_buffer *buffer,
3265 struct ring_buffer_event *event)
3266 {
3267 struct ring_buffer_per_cpu *cpu_buffer;
3268 int cpu = raw_smp_processor_id();
3269
3270 cpu_buffer = buffer->buffers[cpu];
3271
3272 rb_commit(cpu_buffer, event);
3273
3274 rb_wakeups(buffer, cpu_buffer);
3275
3276 trace_recursive_unlock(cpu_buffer);
3277
3278 preempt_enable_notrace();
3279
3280 return 0;
3281 }
3282 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3283
3284 static struct ring_buffer_event *
__rb_reserve_next(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)3285 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
3286 struct rb_event_info *info)
3287 {
3288 struct ring_buffer_event *event;
3289 struct buffer_page *tail_page;
3290 unsigned long tail, write, w;
3291 bool a_ok;
3292 bool b_ok;
3293
3294 /* Don't let the compiler play games with cpu_buffer->tail_page */
3295 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
3296
3297 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
3298 barrier();
3299 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3300 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3301 barrier();
3302 info->ts = rb_time_stamp(cpu_buffer->buffer);
3303
3304 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
3305 info->delta = info->ts;
3306 } else {
3307 /*
3308 * If interrupting an event time update, we may need an
3309 * absolute timestamp.
3310 * Don't bother if this is the start of a new page (w == 0).
3311 */
3312 if (!w) {
3313 /* Use the sub-buffer timestamp */
3314 info->delta = 0;
3315 } else if (unlikely(!a_ok || !b_ok || info->before != info->after)) {
3316 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3317 info->length += RB_LEN_TIME_EXTEND;
3318 } else {
3319 info->delta = info->ts - info->after;
3320 if (unlikely(test_time_stamp(info->delta))) {
3321 info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3322 info->length += RB_LEN_TIME_EXTEND;
3323 }
3324 }
3325 }
3326
3327 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);
3328
3329 /*C*/ write = local_add_return(info->length, &tail_page->write);
3330
3331 /* set write to only the index of the write */
3332 write &= RB_WRITE_MASK;
3333
3334 tail = write - info->length;
3335
3336 /* See if we shot pass the end of this buffer page */
3337 if (unlikely(write > BUF_PAGE_SIZE)) {
3338 /* before and after may now different, fix it up*/
3339 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3340 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3341 if (a_ok && b_ok && info->before != info->after)
3342 (void)rb_time_cmpxchg(&cpu_buffer->before_stamp,
3343 info->before, info->after);
3344 return rb_move_tail(cpu_buffer, tail, info);
3345 }
3346
3347 if (likely(tail == w)) {
3348 u64 save_before;
3349 bool s_ok;
3350
3351 /* Nothing interrupted us between A and C */
3352 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts);
3353 barrier();
3354 /*E*/ s_ok = rb_time_read(&cpu_buffer->before_stamp, &save_before);
3355 RB_WARN_ON(cpu_buffer, !s_ok);
3356 if (likely(!(info->add_timestamp &
3357 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3358 /* This did not interrupt any time update */
3359 info->delta = info->ts - info->after;
3360 else
3361 /* Just use full timestamp for inerrupting event */
3362 info->delta = info->ts;
3363 barrier();
3364 if (unlikely(info->ts != save_before)) {
3365 /* SLOW PATH - Interrupted between C and E */
3366
3367 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3368 RB_WARN_ON(cpu_buffer, !a_ok);
3369
3370 /* Write stamp must only go forward */
3371 if (save_before > info->after) {
3372 /*
3373 * We do not care about the result, only that
3374 * it gets updated atomically.
3375 */
3376 (void)rb_time_cmpxchg(&cpu_buffer->write_stamp,
3377 info->after, save_before);
3378 }
3379 }
3380 } else {
3381 u64 ts;
3382 /* SLOW PATH - Interrupted between A and C */
3383 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3384 /* Was interrupted before here, write_stamp must be valid */
3385 RB_WARN_ON(cpu_buffer, !a_ok);
3386 ts = rb_time_stamp(cpu_buffer->buffer);
3387 barrier();
3388 /*E*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3389 info->after < ts &&
3390 rb_time_cmpxchg(&cpu_buffer->write_stamp,
3391 info->after, ts)) {
3392 /* Nothing came after this event between C and E */
3393 info->delta = ts - info->after;
3394 info->ts = ts;
3395 } else {
3396 /*
3397 * Interrupted beween C and E:
3398 * Lost the previous events time stamp. Just set the
3399 * delta to zero, and this will be the same time as
3400 * the event this event interrupted. And the events that
3401 * came after this will still be correct (as they would
3402 * have built their delta on the previous event.
3403 */
3404 info->delta = 0;
3405 }
3406 info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
3407 }
3408
3409 /*
3410 * If this is the first commit on the page, then it has the same
3411 * timestamp as the page itself.
3412 */
3413 if (unlikely(!tail && !(info->add_timestamp &
3414 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3415 info->delta = 0;
3416
3417 /* We reserved something on the buffer */
3418
3419 event = __rb_page_index(tail_page, tail);
3420 rb_update_event(cpu_buffer, event, info);
3421
3422 local_inc(&tail_page->entries);
3423
3424 /*
3425 * If this is the first commit on the page, then update
3426 * its timestamp.
3427 */
3428 if (unlikely(!tail))
3429 tail_page->page->time_stamp = info->ts;
3430
3431 /* account for these added bytes */
3432 local_add(info->length, &cpu_buffer->entries_bytes);
3433
3434 return event;
3435 }
3436
3437 static __always_inline struct ring_buffer_event *
rb_reserve_next_event(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer,unsigned long length)3438 rb_reserve_next_event(struct trace_buffer *buffer,
3439 struct ring_buffer_per_cpu *cpu_buffer,
3440 unsigned long length)
3441 {
3442 struct ring_buffer_event *event;
3443 struct rb_event_info info;
3444 int nr_loops = 0;
3445 int add_ts_default;
3446
3447 /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3448 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3449 (unlikely(in_nmi()))) {
3450 return NULL;
3451 }
3452
3453 rb_start_commit(cpu_buffer);
3454 /* The commit page can not change after this */
3455
3456 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3457 /*
3458 * Due to the ability to swap a cpu buffer from a buffer
3459 * it is possible it was swapped before we committed.
3460 * (committing stops a swap). We check for it here and
3461 * if it happened, we have to fail the write.
3462 */
3463 barrier();
3464 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3465 local_dec(&cpu_buffer->committing);
3466 local_dec(&cpu_buffer->commits);
3467 return NULL;
3468 }
3469 #endif
3470
3471 info.length = rb_calculate_event_length(length);
3472
3473 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3474 add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3475 info.length += RB_LEN_TIME_EXTEND;
3476 if (info.length > BUF_MAX_DATA_SIZE)
3477 goto out_fail;
3478 } else {
3479 add_ts_default = RB_ADD_STAMP_NONE;
3480 }
3481
3482 again:
3483 info.add_timestamp = add_ts_default;
3484 info.delta = 0;
3485
3486 /*
3487 * We allow for interrupts to reenter here and do a trace.
3488 * If one does, it will cause this original code to loop
3489 * back here. Even with heavy interrupts happening, this
3490 * should only happen a few times in a row. If this happens
3491 * 1000 times in a row, there must be either an interrupt
3492 * storm or we have something buggy.
3493 * Bail!
3494 */
3495 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3496 goto out_fail;
3497
3498 event = __rb_reserve_next(cpu_buffer, &info);
3499
3500 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3501 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
3502 info.length -= RB_LEN_TIME_EXTEND;
3503 goto again;
3504 }
3505
3506 if (likely(event))
3507 return event;
3508 out_fail:
3509 rb_end_commit(cpu_buffer);
3510 return NULL;
3511 }
3512
3513 /**
3514 * ring_buffer_lock_reserve - reserve a part of the buffer
3515 * @buffer: the ring buffer to reserve from
3516 * @length: the length of the data to reserve (excluding event header)
3517 *
3518 * Returns a reserved event on the ring buffer to copy directly to.
3519 * The user of this interface will need to get the body to write into
3520 * and can use the ring_buffer_event_data() interface.
3521 *
3522 * The length is the length of the data needed, not the event length
3523 * which also includes the event header.
3524 *
3525 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3526 * If NULL is returned, then nothing has been allocated or locked.
3527 */
3528 struct ring_buffer_event *
ring_buffer_lock_reserve(struct trace_buffer * buffer,unsigned long length)3529 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
3530 {
3531 struct ring_buffer_per_cpu *cpu_buffer;
3532 struct ring_buffer_event *event;
3533 int cpu;
3534
3535 /* If we are tracing schedule, we don't want to recurse */
3536 preempt_disable_notrace();
3537
3538 if (unlikely(atomic_read(&buffer->record_disabled)))
3539 goto out;
3540
3541 cpu = raw_smp_processor_id();
3542
3543 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3544 goto out;
3545
3546 cpu_buffer = buffer->buffers[cpu];
3547
3548 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3549 goto out;
3550
3551 if (unlikely(length > BUF_MAX_DATA_SIZE))
3552 goto out;
3553
3554 if (unlikely(trace_recursive_lock(cpu_buffer)))
3555 goto out;
3556
3557 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3558 if (!event)
3559 goto out_unlock;
3560
3561 return event;
3562
3563 out_unlock:
3564 trace_recursive_unlock(cpu_buffer);
3565 out:
3566 preempt_enable_notrace();
3567 return NULL;
3568 }
3569 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3570
3571 /*
3572 * Decrement the entries to the page that an event is on.
3573 * The event does not even need to exist, only the pointer
3574 * to the page it is on. This may only be called before the commit
3575 * takes place.
3576 */
3577 static inline void
rb_decrement_entry(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3578 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3579 struct ring_buffer_event *event)
3580 {
3581 unsigned long addr = (unsigned long)event;
3582 struct buffer_page *bpage = cpu_buffer->commit_page;
3583 struct buffer_page *start;
3584
3585 addr &= PAGE_MASK;
3586
3587 /* Do the likely case first */
3588 if (likely(bpage->page == (void *)addr)) {
3589 local_dec(&bpage->entries);
3590 return;
3591 }
3592
3593 /*
3594 * Because the commit page may be on the reader page we
3595 * start with the next page and check the end loop there.
3596 */
3597 rb_inc_page(cpu_buffer, &bpage);
3598 start = bpage;
3599 do {
3600 if (bpage->page == (void *)addr) {
3601 local_dec(&bpage->entries);
3602 return;
3603 }
3604 rb_inc_page(cpu_buffer, &bpage);
3605 } while (bpage != start);
3606
3607 /* commit not part of this buffer?? */
3608 RB_WARN_ON(cpu_buffer, 1);
3609 }
3610
3611 /**
3612 * ring_buffer_commit_discard - discard an event that has not been committed
3613 * @buffer: the ring buffer
3614 * @event: non committed event to discard
3615 *
3616 * Sometimes an event that is in the ring buffer needs to be ignored.
3617 * This function lets the user discard an event in the ring buffer
3618 * and then that event will not be read later.
3619 *
3620 * This function only works if it is called before the item has been
3621 * committed. It will try to free the event from the ring buffer
3622 * if another event has not been added behind it.
3623 *
3624 * If another event has been added behind it, it will set the event
3625 * up as discarded, and perform the commit.
3626 *
3627 * If this function is called, do not call ring_buffer_unlock_commit on
3628 * the event.
3629 */
ring_buffer_discard_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3630 void ring_buffer_discard_commit(struct trace_buffer *buffer,
3631 struct ring_buffer_event *event)
3632 {
3633 struct ring_buffer_per_cpu *cpu_buffer;
3634 int cpu;
3635
3636 /* The event is discarded regardless */
3637 rb_event_discard(event);
3638
3639 cpu = smp_processor_id();
3640 cpu_buffer = buffer->buffers[cpu];
3641
3642 /*
3643 * This must only be called if the event has not been
3644 * committed yet. Thus we can assume that preemption
3645 * is still disabled.
3646 */
3647 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3648
3649 rb_decrement_entry(cpu_buffer, event);
3650 if (rb_try_to_discard(cpu_buffer, event))
3651 goto out;
3652
3653 out:
3654 rb_end_commit(cpu_buffer);
3655
3656 trace_recursive_unlock(cpu_buffer);
3657
3658 preempt_enable_notrace();
3659
3660 }
3661 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3662
3663 /**
3664 * ring_buffer_write - write data to the buffer without reserving
3665 * @buffer: The ring buffer to write to.
3666 * @length: The length of the data being written (excluding the event header)
3667 * @data: The data to write to the buffer.
3668 *
3669 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3670 * one function. If you already have the data to write to the buffer, it
3671 * may be easier to simply call this function.
3672 *
3673 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3674 * and not the length of the event which would hold the header.
3675 */
ring_buffer_write(struct trace_buffer * buffer,unsigned long length,void * data)3676 int ring_buffer_write(struct trace_buffer *buffer,
3677 unsigned long length,
3678 void *data)
3679 {
3680 struct ring_buffer_per_cpu *cpu_buffer;
3681 struct ring_buffer_event *event;
3682 void *body;
3683 int ret = -EBUSY;
3684 int cpu;
3685
3686 preempt_disable_notrace();
3687
3688 if (atomic_read(&buffer->record_disabled))
3689 goto out;
3690
3691 cpu = raw_smp_processor_id();
3692
3693 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3694 goto out;
3695
3696 cpu_buffer = buffer->buffers[cpu];
3697
3698 if (atomic_read(&cpu_buffer->record_disabled))
3699 goto out;
3700
3701 if (length > BUF_MAX_DATA_SIZE)
3702 goto out;
3703
3704 if (unlikely(trace_recursive_lock(cpu_buffer)))
3705 goto out;
3706
3707 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3708 if (!event)
3709 goto out_unlock;
3710
3711 body = rb_event_data(event);
3712
3713 memcpy(body, data, length);
3714
3715 rb_commit(cpu_buffer, event);
3716
3717 rb_wakeups(buffer, cpu_buffer);
3718
3719 ret = 0;
3720
3721 out_unlock:
3722 trace_recursive_unlock(cpu_buffer);
3723
3724 out:
3725 preempt_enable_notrace();
3726
3727 return ret;
3728 }
3729 EXPORT_SYMBOL_GPL(ring_buffer_write);
3730
rb_per_cpu_empty(struct ring_buffer_per_cpu * cpu_buffer)3731 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3732 {
3733 struct buffer_page *reader = cpu_buffer->reader_page;
3734 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3735 struct buffer_page *commit = cpu_buffer->commit_page;
3736
3737 /* In case of error, head will be NULL */
3738 if (unlikely(!head))
3739 return true;
3740
3741 /* Reader should exhaust content in reader page */
3742 if (reader->read != rb_page_commit(reader))
3743 return false;
3744
3745 /*
3746 * If writers are committing on the reader page, knowing all
3747 * committed content has been read, the ring buffer is empty.
3748 */
3749 if (commit == reader)
3750 return true;
3751
3752 /*
3753 * If writers are committing on a page other than reader page
3754 * and head page, there should always be content to read.
3755 */
3756 if (commit != head)
3757 return false;
3758
3759 /*
3760 * Writers are committing on the head page, we just need
3761 * to care about there're committed data, and the reader will
3762 * swap reader page with head page when it is to read data.
3763 */
3764 return rb_page_commit(commit) == 0;
3765 }
3766
3767 /**
3768 * ring_buffer_record_disable - stop all writes into the buffer
3769 * @buffer: The ring buffer to stop writes to.
3770 *
3771 * This prevents all writes to the buffer. Any attempt to write
3772 * to the buffer after this will fail and return NULL.
3773 *
3774 * The caller should call synchronize_rcu() after this.
3775 */
ring_buffer_record_disable(struct trace_buffer * buffer)3776 void ring_buffer_record_disable(struct trace_buffer *buffer)
3777 {
3778 atomic_inc(&buffer->record_disabled);
3779 }
3780 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3781
3782 /**
3783 * ring_buffer_record_enable - enable writes to the buffer
3784 * @buffer: The ring buffer to enable writes
3785 *
3786 * Note, multiple disables will need the same number of enables
3787 * to truly enable the writing (much like preempt_disable).
3788 */
ring_buffer_record_enable(struct trace_buffer * buffer)3789 void ring_buffer_record_enable(struct trace_buffer *buffer)
3790 {
3791 atomic_dec(&buffer->record_disabled);
3792 }
3793 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3794
3795 /**
3796 * ring_buffer_record_off - stop all writes into the buffer
3797 * @buffer: The ring buffer to stop writes to.
3798 *
3799 * This prevents all writes to the buffer. Any attempt to write
3800 * to the buffer after this will fail and return NULL.
3801 *
3802 * This is different than ring_buffer_record_disable() as
3803 * it works like an on/off switch, where as the disable() version
3804 * must be paired with a enable().
3805 */
ring_buffer_record_off(struct trace_buffer * buffer)3806 void ring_buffer_record_off(struct trace_buffer *buffer)
3807 {
3808 unsigned int rd;
3809 unsigned int new_rd;
3810
3811 do {
3812 rd = atomic_read(&buffer->record_disabled);
3813 new_rd = rd | RB_BUFFER_OFF;
3814 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3815 }
3816 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3817
3818 /**
3819 * ring_buffer_record_on - restart writes into the buffer
3820 * @buffer: The ring buffer to start writes to.
3821 *
3822 * This enables all writes to the buffer that was disabled by
3823 * ring_buffer_record_off().
3824 *
3825 * This is different than ring_buffer_record_enable() as
3826 * it works like an on/off switch, where as the enable() version
3827 * must be paired with a disable().
3828 */
ring_buffer_record_on(struct trace_buffer * buffer)3829 void ring_buffer_record_on(struct trace_buffer *buffer)
3830 {
3831 unsigned int rd;
3832 unsigned int new_rd;
3833
3834 do {
3835 rd = atomic_read(&buffer->record_disabled);
3836 new_rd = rd & ~RB_BUFFER_OFF;
3837 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3838 }
3839 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
3840
3841 /**
3842 * ring_buffer_record_is_on - return true if the ring buffer can write
3843 * @buffer: The ring buffer to see if write is enabled
3844 *
3845 * Returns true if the ring buffer is in a state that it accepts writes.
3846 */
ring_buffer_record_is_on(struct trace_buffer * buffer)3847 bool ring_buffer_record_is_on(struct trace_buffer *buffer)
3848 {
3849 return !atomic_read(&buffer->record_disabled);
3850 }
3851
3852 /**
3853 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
3854 * @buffer: The ring buffer to see if write is set enabled
3855 *
3856 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
3857 * Note that this does NOT mean it is in a writable state.
3858 *
3859 * It may return true when the ring buffer has been disabled by
3860 * ring_buffer_record_disable(), as that is a temporary disabling of
3861 * the ring buffer.
3862 */
ring_buffer_record_is_set_on(struct trace_buffer * buffer)3863 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
3864 {
3865 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
3866 }
3867
3868 /**
3869 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
3870 * @buffer: The ring buffer to stop writes to.
3871 * @cpu: The CPU buffer to stop
3872 *
3873 * This prevents all writes to the buffer. Any attempt to write
3874 * to the buffer after this will fail and return NULL.
3875 *
3876 * The caller should call synchronize_rcu() after this.
3877 */
ring_buffer_record_disable_cpu(struct trace_buffer * buffer,int cpu)3878 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
3879 {
3880 struct ring_buffer_per_cpu *cpu_buffer;
3881
3882 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3883 return;
3884
3885 cpu_buffer = buffer->buffers[cpu];
3886 atomic_inc(&cpu_buffer->record_disabled);
3887 }
3888 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
3889
3890 /**
3891 * ring_buffer_record_enable_cpu - enable writes to the buffer
3892 * @buffer: The ring buffer to enable writes
3893 * @cpu: The CPU to enable.
3894 *
3895 * Note, multiple disables will need the same number of enables
3896 * to truly enable the writing (much like preempt_disable).
3897 */
ring_buffer_record_enable_cpu(struct trace_buffer * buffer,int cpu)3898 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
3899 {
3900 struct ring_buffer_per_cpu *cpu_buffer;
3901
3902 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3903 return;
3904
3905 cpu_buffer = buffer->buffers[cpu];
3906 atomic_dec(&cpu_buffer->record_disabled);
3907 }
3908 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
3909
3910 /*
3911 * The total entries in the ring buffer is the running counter
3912 * of entries entered into the ring buffer, minus the sum of
3913 * the entries read from the ring buffer and the number of
3914 * entries that were overwritten.
3915 */
3916 static inline unsigned long
rb_num_of_entries(struct ring_buffer_per_cpu * cpu_buffer)3917 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
3918 {
3919 return local_read(&cpu_buffer->entries) -
3920 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
3921 }
3922
3923 /**
3924 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
3925 * @buffer: The ring buffer
3926 * @cpu: The per CPU buffer to read from.
3927 */
ring_buffer_oldest_event_ts(struct trace_buffer * buffer,int cpu)3928 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
3929 {
3930 unsigned long flags;
3931 struct ring_buffer_per_cpu *cpu_buffer;
3932 struct buffer_page *bpage;
3933 u64 ret = 0;
3934
3935 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3936 return 0;
3937
3938 cpu_buffer = buffer->buffers[cpu];
3939 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
3940 /*
3941 * if the tail is on reader_page, oldest time stamp is on the reader
3942 * page
3943 */
3944 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
3945 bpage = cpu_buffer->reader_page;
3946 else
3947 bpage = rb_set_head_page(cpu_buffer);
3948 if (bpage)
3949 ret = bpage->page->time_stamp;
3950 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
3951
3952 return ret;
3953 }
3954 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
3955
3956 /**
3957 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
3958 * @buffer: The ring buffer
3959 * @cpu: The per CPU buffer to read from.
3960 */
ring_buffer_bytes_cpu(struct trace_buffer * buffer,int cpu)3961 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
3962 {
3963 struct ring_buffer_per_cpu *cpu_buffer;
3964 unsigned long ret;
3965
3966 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3967 return 0;
3968
3969 cpu_buffer = buffer->buffers[cpu];
3970 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
3971
3972 return ret;
3973 }
3974 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
3975
3976 /**
3977 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
3978 * @buffer: The ring buffer
3979 * @cpu: The per CPU buffer to get the entries from.
3980 */
ring_buffer_entries_cpu(struct trace_buffer * buffer,int cpu)3981 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
3982 {
3983 struct ring_buffer_per_cpu *cpu_buffer;
3984
3985 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3986 return 0;
3987
3988 cpu_buffer = buffer->buffers[cpu];
3989
3990 return rb_num_of_entries(cpu_buffer);
3991 }
3992 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
3993
3994 /**
3995 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
3996 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
3997 * @buffer: The ring buffer
3998 * @cpu: The per CPU buffer to get the number of overruns from
3999 */
ring_buffer_overrun_cpu(struct trace_buffer * buffer,int cpu)4000 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
4001 {
4002 struct ring_buffer_per_cpu *cpu_buffer;
4003 unsigned long ret;
4004
4005 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4006 return 0;
4007
4008 cpu_buffer = buffer->buffers[cpu];
4009 ret = local_read(&cpu_buffer->overrun);
4010
4011 return ret;
4012 }
4013 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
4014
4015 /**
4016 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4017 * commits failing due to the buffer wrapping around while there are uncommitted
4018 * events, such as during an interrupt storm.
4019 * @buffer: The ring buffer
4020 * @cpu: The per CPU buffer to get the number of overruns from
4021 */
4022 unsigned long
ring_buffer_commit_overrun_cpu(struct trace_buffer * buffer,int cpu)4023 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
4024 {
4025 struct ring_buffer_per_cpu *cpu_buffer;
4026 unsigned long ret;
4027
4028 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4029 return 0;
4030
4031 cpu_buffer = buffer->buffers[cpu];
4032 ret = local_read(&cpu_buffer->commit_overrun);
4033
4034 return ret;
4035 }
4036 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4037
4038 /**
4039 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4040 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4041 * @buffer: The ring buffer
4042 * @cpu: The per CPU buffer to get the number of overruns from
4043 */
4044 unsigned long
ring_buffer_dropped_events_cpu(struct trace_buffer * buffer,int cpu)4045 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
4046 {
4047 struct ring_buffer_per_cpu *cpu_buffer;
4048 unsigned long ret;
4049
4050 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4051 return 0;
4052
4053 cpu_buffer = buffer->buffers[cpu];
4054 ret = local_read(&cpu_buffer->dropped_events);
4055
4056 return ret;
4057 }
4058 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4059
4060 /**
4061 * ring_buffer_read_events_cpu - get the number of events successfully read
4062 * @buffer: The ring buffer
4063 * @cpu: The per CPU buffer to get the number of events read
4064 */
4065 unsigned long
ring_buffer_read_events_cpu(struct trace_buffer * buffer,int cpu)4066 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
4067 {
4068 struct ring_buffer_per_cpu *cpu_buffer;
4069
4070 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4071 return 0;
4072
4073 cpu_buffer = buffer->buffers[cpu];
4074 return cpu_buffer->read;
4075 }
4076 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4077
4078 /**
4079 * ring_buffer_entries - get the number of entries in a buffer
4080 * @buffer: The ring buffer
4081 *
4082 * Returns the total number of entries in the ring buffer
4083 * (all CPU entries)
4084 */
ring_buffer_entries(struct trace_buffer * buffer)4085 unsigned long ring_buffer_entries(struct trace_buffer *buffer)
4086 {
4087 struct ring_buffer_per_cpu *cpu_buffer;
4088 unsigned long entries = 0;
4089 int cpu;
4090
4091 /* if you care about this being correct, lock the buffer */
4092 for_each_buffer_cpu(buffer, cpu) {
4093 cpu_buffer = buffer->buffers[cpu];
4094 entries += rb_num_of_entries(cpu_buffer);
4095 }
4096
4097 return entries;
4098 }
4099 EXPORT_SYMBOL_GPL(ring_buffer_entries);
4100
4101 /**
4102 * ring_buffer_overruns - get the number of overruns in buffer
4103 * @buffer: The ring buffer
4104 *
4105 * Returns the total number of overruns in the ring buffer
4106 * (all CPU entries)
4107 */
ring_buffer_overruns(struct trace_buffer * buffer)4108 unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
4109 {
4110 struct ring_buffer_per_cpu *cpu_buffer;
4111 unsigned long overruns = 0;
4112 int cpu;
4113
4114 /* if you care about this being correct, lock the buffer */
4115 for_each_buffer_cpu(buffer, cpu) {
4116 cpu_buffer = buffer->buffers[cpu];
4117 overruns += local_read(&cpu_buffer->overrun);
4118 }
4119
4120 return overruns;
4121 }
4122 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
4123
rb_iter_reset(struct ring_buffer_iter * iter)4124 static void rb_iter_reset(struct ring_buffer_iter *iter)
4125 {
4126 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4127
4128 /* Iterator usage is expected to have record disabled */
4129 iter->head_page = cpu_buffer->reader_page;
4130 iter->head = cpu_buffer->reader_page->read;
4131 iter->next_event = iter->head;
4132
4133 iter->cache_reader_page = iter->head_page;
4134 iter->cache_read = cpu_buffer->read;
4135 iter->cache_pages_removed = cpu_buffer->pages_removed;
4136
4137 if (iter->head) {
4138 iter->read_stamp = cpu_buffer->read_stamp;
4139 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4140 } else {
4141 iter->read_stamp = iter->head_page->page->time_stamp;
4142 iter->page_stamp = iter->read_stamp;
4143 }
4144 }
4145
4146 /**
4147 * ring_buffer_iter_reset - reset an iterator
4148 * @iter: The iterator to reset
4149 *
4150 * Resets the iterator, so that it will start from the beginning
4151 * again.
4152 */
ring_buffer_iter_reset(struct ring_buffer_iter * iter)4153 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4154 {
4155 struct ring_buffer_per_cpu *cpu_buffer;
4156 unsigned long flags;
4157
4158 if (!iter)
4159 return;
4160
4161 cpu_buffer = iter->cpu_buffer;
4162
4163 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4164 rb_iter_reset(iter);
4165 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4166 }
4167 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
4168
4169 /**
4170 * ring_buffer_iter_empty - check if an iterator has no more to read
4171 * @iter: The iterator to check
4172 */
ring_buffer_iter_empty(struct ring_buffer_iter * iter)4173 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4174 {
4175 struct ring_buffer_per_cpu *cpu_buffer;
4176 struct buffer_page *reader;
4177 struct buffer_page *head_page;
4178 struct buffer_page *commit_page;
4179 struct buffer_page *curr_commit_page;
4180 unsigned commit;
4181 u64 curr_commit_ts;
4182 u64 commit_ts;
4183
4184 cpu_buffer = iter->cpu_buffer;
4185 reader = cpu_buffer->reader_page;
4186 head_page = cpu_buffer->head_page;
4187 commit_page = cpu_buffer->commit_page;
4188 commit_ts = commit_page->page->time_stamp;
4189
4190 /*
4191 * When the writer goes across pages, it issues a cmpxchg which
4192 * is a mb(), which will synchronize with the rmb here.
4193 * (see rb_tail_page_update())
4194 */
4195 smp_rmb();
4196 commit = rb_page_commit(commit_page);
4197 /* We want to make sure that the commit page doesn't change */
4198 smp_rmb();
4199
4200 /* Make sure commit page didn't change */
4201 curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4202 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4203
4204 /* If the commit page changed, then there's more data */
4205 if (curr_commit_page != commit_page ||
4206 curr_commit_ts != commit_ts)
4207 return 0;
4208
4209 /* Still racy, as it may return a false positive, but that's OK */
4210 return ((iter->head_page == commit_page && iter->head >= commit) ||
4211 (iter->head_page == reader && commit_page == head_page &&
4212 head_page->read == commit &&
4213 iter->head == rb_page_commit(cpu_buffer->reader_page)));
4214 }
4215 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
4216
4217 static void
rb_update_read_stamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)4218 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4219 struct ring_buffer_event *event)
4220 {
4221 u64 delta;
4222
4223 switch (event->type_len) {
4224 case RINGBUF_TYPE_PADDING:
4225 return;
4226
4227 case RINGBUF_TYPE_TIME_EXTEND:
4228 delta = ring_buffer_event_time_stamp(event);
4229 cpu_buffer->read_stamp += delta;
4230 return;
4231
4232 case RINGBUF_TYPE_TIME_STAMP:
4233 delta = ring_buffer_event_time_stamp(event);
4234 cpu_buffer->read_stamp = delta;
4235 return;
4236
4237 case RINGBUF_TYPE_DATA:
4238 cpu_buffer->read_stamp += event->time_delta;
4239 return;
4240
4241 default:
4242 RB_WARN_ON(cpu_buffer, 1);
4243 }
4244 return;
4245 }
4246
4247 static void
rb_update_iter_read_stamp(struct ring_buffer_iter * iter,struct ring_buffer_event * event)4248 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4249 struct ring_buffer_event *event)
4250 {
4251 u64 delta;
4252
4253 switch (event->type_len) {
4254 case RINGBUF_TYPE_PADDING:
4255 return;
4256
4257 case RINGBUF_TYPE_TIME_EXTEND:
4258 delta = ring_buffer_event_time_stamp(event);
4259 iter->read_stamp += delta;
4260 return;
4261
4262 case RINGBUF_TYPE_TIME_STAMP:
4263 delta = ring_buffer_event_time_stamp(event);
4264 iter->read_stamp = delta;
4265 return;
4266
4267 case RINGBUF_TYPE_DATA:
4268 iter->read_stamp += event->time_delta;
4269 return;
4270
4271 default:
4272 RB_WARN_ON(iter->cpu_buffer, 1);
4273 }
4274 return;
4275 }
4276
4277 static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu * cpu_buffer)4278 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4279 {
4280 struct buffer_page *reader = NULL;
4281 unsigned long overwrite;
4282 unsigned long flags;
4283 int nr_loops = 0;
4284 int ret;
4285
4286 local_irq_save(flags);
4287 arch_spin_lock(&cpu_buffer->lock);
4288
4289 again:
4290 /*
4291 * This should normally only loop twice. But because the
4292 * start of the reader inserts an empty page, it causes
4293 * a case where we will loop three times. There should be no
4294 * reason to loop four times (that I know of).
4295 */
4296 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
4297 reader = NULL;
4298 goto out;
4299 }
4300
4301 reader = cpu_buffer->reader_page;
4302
4303 /* If there's more to read, return this page */
4304 if (cpu_buffer->reader_page->read < rb_page_size(reader))
4305 goto out;
4306
4307 /* Never should we have an index greater than the size */
4308 if (RB_WARN_ON(cpu_buffer,
4309 cpu_buffer->reader_page->read > rb_page_size(reader)))
4310 goto out;
4311
4312 /* check if we caught up to the tail */
4313 reader = NULL;
4314 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
4315 goto out;
4316
4317 /* Don't bother swapping if the ring buffer is empty */
4318 if (rb_num_of_entries(cpu_buffer) == 0)
4319 goto out;
4320
4321 /*
4322 * Reset the reader page to size zero.
4323 */
4324 local_set(&cpu_buffer->reader_page->write, 0);
4325 local_set(&cpu_buffer->reader_page->entries, 0);
4326 local_set(&cpu_buffer->reader_page->page->commit, 0);
4327 cpu_buffer->reader_page->real_end = 0;
4328
4329 spin:
4330 /*
4331 * Splice the empty reader page into the list around the head.
4332 */
4333 reader = rb_set_head_page(cpu_buffer);
4334 if (!reader)
4335 goto out;
4336 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
4337 cpu_buffer->reader_page->list.prev = reader->list.prev;
4338
4339 /*
4340 * cpu_buffer->pages just needs to point to the buffer, it
4341 * has no specific buffer page to point to. Lets move it out
4342 * of our way so we don't accidentally swap it.
4343 */
4344 cpu_buffer->pages = reader->list.prev;
4345
4346 /* The reader page will be pointing to the new head */
4347 rb_set_list_to_head(cpu_buffer, &cpu_buffer->reader_page->list);
4348
4349 /*
4350 * We want to make sure we read the overruns after we set up our
4351 * pointers to the next object. The writer side does a
4352 * cmpxchg to cross pages which acts as the mb on the writer
4353 * side. Note, the reader will constantly fail the swap
4354 * while the writer is updating the pointers, so this
4355 * guarantees that the overwrite recorded here is the one we
4356 * want to compare with the last_overrun.
4357 */
4358 smp_mb();
4359 overwrite = local_read(&(cpu_buffer->overrun));
4360
4361 /*
4362 * Here's the tricky part.
4363 *
4364 * We need to move the pointer past the header page.
4365 * But we can only do that if a writer is not currently
4366 * moving it. The page before the header page has the
4367 * flag bit '1' set if it is pointing to the page we want.
4368 * but if the writer is in the process of moving it
4369 * than it will be '2' or already moved '0'.
4370 */
4371
4372 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
4373
4374 /*
4375 * If we did not convert it, then we must try again.
4376 */
4377 if (!ret)
4378 goto spin;
4379
4380 /*
4381 * Yay! We succeeded in replacing the page.
4382 *
4383 * Now make the new head point back to the reader page.
4384 */
4385 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
4386 rb_inc_page(cpu_buffer, &cpu_buffer->head_page);
4387
4388 local_inc(&cpu_buffer->pages_read);
4389
4390 /* Finally update the reader page to the new head */
4391 cpu_buffer->reader_page = reader;
4392 cpu_buffer->reader_page->read = 0;
4393
4394 if (overwrite != cpu_buffer->last_overrun) {
4395 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4396 cpu_buffer->last_overrun = overwrite;
4397 }
4398
4399 goto again;
4400
4401 out:
4402 /* Update the read_stamp on the first event */
4403 if (reader && reader->read == 0)
4404 cpu_buffer->read_stamp = reader->page->time_stamp;
4405
4406 arch_spin_unlock(&cpu_buffer->lock);
4407 local_irq_restore(flags);
4408
4409 /*
4410 * The writer has preempt disable, wait for it. But not forever
4411 * Although, 1 second is pretty much "forever"
4412 */
4413 #define USECS_WAIT 1000000
4414 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4415 /* If the write is past the end of page, a writer is still updating it */
4416 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
4417 break;
4418
4419 udelay(1);
4420
4421 /* Get the latest version of the reader write value */
4422 smp_rmb();
4423 }
4424
4425 /* The writer is not moving forward? Something is wrong */
4426 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4427 reader = NULL;
4428
4429 /*
4430 * Make sure we see any padding after the write update
4431 * (see rb_reset_tail()).
4432 *
4433 * In addition, a writer may be writing on the reader page
4434 * if the page has not been fully filled, so the read barrier
4435 * is also needed to make sure we see the content of what is
4436 * committed by the writer (see rb_set_commit_to_write()).
4437 */
4438 smp_rmb();
4439
4440
4441 return reader;
4442 }
4443
rb_advance_reader(struct ring_buffer_per_cpu * cpu_buffer)4444 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4445 {
4446 struct ring_buffer_event *event;
4447 struct buffer_page *reader;
4448 unsigned length;
4449
4450 reader = rb_get_reader_page(cpu_buffer);
4451
4452 /* This function should not be called when buffer is empty */
4453 if (RB_WARN_ON(cpu_buffer, !reader))
4454 return;
4455
4456 event = rb_reader_event(cpu_buffer);
4457
4458 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4459 cpu_buffer->read++;
4460
4461 rb_update_read_stamp(cpu_buffer, event);
4462
4463 length = rb_event_length(event);
4464 cpu_buffer->reader_page->read += length;
4465 cpu_buffer->read_bytes += length;
4466 }
4467
rb_advance_iter(struct ring_buffer_iter * iter)4468 static void rb_advance_iter(struct ring_buffer_iter *iter)
4469 {
4470 struct ring_buffer_per_cpu *cpu_buffer;
4471
4472 cpu_buffer = iter->cpu_buffer;
4473
4474 /* If head == next_event then we need to jump to the next event */
4475 if (iter->head == iter->next_event) {
4476 /* If the event gets overwritten again, there's nothing to do */
4477 if (rb_iter_head_event(iter) == NULL)
4478 return;
4479 }
4480
4481 iter->head = iter->next_event;
4482
4483 /*
4484 * Check if we are at the end of the buffer.
4485 */
4486 if (iter->next_event >= rb_page_size(iter->head_page)) {
4487 /* discarded commits can make the page empty */
4488 if (iter->head_page == cpu_buffer->commit_page)
4489 return;
4490 rb_inc_iter(iter);
4491 return;
4492 }
4493
4494 rb_update_iter_read_stamp(iter, iter->event);
4495 }
4496
rb_lost_events(struct ring_buffer_per_cpu * cpu_buffer)4497 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4498 {
4499 return cpu_buffer->lost_events;
4500 }
4501
4502 static struct ring_buffer_event *
rb_buffer_peek(struct ring_buffer_per_cpu * cpu_buffer,u64 * ts,unsigned long * lost_events)4503 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4504 unsigned long *lost_events)
4505 {
4506 struct ring_buffer_event *event;
4507 struct buffer_page *reader;
4508 int nr_loops = 0;
4509
4510 if (ts)
4511 *ts = 0;
4512 again:
4513 /*
4514 * We repeat when a time extend is encountered.
4515 * Since the time extend is always attached to a data event,
4516 * we should never loop more than once.
4517 * (We never hit the following condition more than twice).
4518 */
4519 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4520 return NULL;
4521
4522 reader = rb_get_reader_page(cpu_buffer);
4523 if (!reader)
4524 return NULL;
4525
4526 event = rb_reader_event(cpu_buffer);
4527
4528 switch (event->type_len) {
4529 case RINGBUF_TYPE_PADDING:
4530 if (rb_null_event(event))
4531 RB_WARN_ON(cpu_buffer, 1);
4532 /*
4533 * Because the writer could be discarding every
4534 * event it creates (which would probably be bad)
4535 * if we were to go back to "again" then we may never
4536 * catch up, and will trigger the warn on, or lock
4537 * the box. Return the padding, and we will release
4538 * the current locks, and try again.
4539 */
4540 return event;
4541
4542 case RINGBUF_TYPE_TIME_EXTEND:
4543 /* Internal data, OK to advance */
4544 rb_advance_reader(cpu_buffer);
4545 goto again;
4546
4547 case RINGBUF_TYPE_TIME_STAMP:
4548 if (ts) {
4549 *ts = ring_buffer_event_time_stamp(event);
4550 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4551 cpu_buffer->cpu, ts);
4552 }
4553 /* Internal data, OK to advance */
4554 rb_advance_reader(cpu_buffer);
4555 goto again;
4556
4557 case RINGBUF_TYPE_DATA:
4558 if (ts && !(*ts)) {
4559 *ts = cpu_buffer->read_stamp + event->time_delta;
4560 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4561 cpu_buffer->cpu, ts);
4562 }
4563 if (lost_events)
4564 *lost_events = rb_lost_events(cpu_buffer);
4565 return event;
4566
4567 default:
4568 RB_WARN_ON(cpu_buffer, 1);
4569 }
4570
4571 return NULL;
4572 }
4573 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4574
4575 static struct ring_buffer_event *
rb_iter_peek(struct ring_buffer_iter * iter,u64 * ts)4576 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4577 {
4578 struct trace_buffer *buffer;
4579 struct ring_buffer_per_cpu *cpu_buffer;
4580 struct ring_buffer_event *event;
4581 int nr_loops = 0;
4582
4583 if (ts)
4584 *ts = 0;
4585
4586 cpu_buffer = iter->cpu_buffer;
4587 buffer = cpu_buffer->buffer;
4588
4589 /*
4590 * Check if someone performed a consuming read to the buffer
4591 * or removed some pages from the buffer. In these cases,
4592 * iterator was invalidated and we need to reset it.
4593 */
4594 if (unlikely(iter->cache_read != cpu_buffer->read ||
4595 iter->cache_reader_page != cpu_buffer->reader_page ||
4596 iter->cache_pages_removed != cpu_buffer->pages_removed))
4597 rb_iter_reset(iter);
4598
4599 again:
4600 if (ring_buffer_iter_empty(iter))
4601 return NULL;
4602
4603 /*
4604 * As the writer can mess with what the iterator is trying
4605 * to read, just give up if we fail to get an event after
4606 * three tries. The iterator is not as reliable when reading
4607 * the ring buffer with an active write as the consumer is.
4608 * Do not warn if the three failures is reached.
4609 */
4610 if (++nr_loops > 3)
4611 return NULL;
4612
4613 if (rb_per_cpu_empty(cpu_buffer))
4614 return NULL;
4615
4616 if (iter->head >= rb_page_size(iter->head_page)) {
4617 rb_inc_iter(iter);
4618 goto again;
4619 }
4620
4621 event = rb_iter_head_event(iter);
4622 if (!event)
4623 goto again;
4624
4625 switch (event->type_len) {
4626 case RINGBUF_TYPE_PADDING:
4627 if (rb_null_event(event)) {
4628 rb_inc_iter(iter);
4629 goto again;
4630 }
4631 rb_advance_iter(iter);
4632 return event;
4633
4634 case RINGBUF_TYPE_TIME_EXTEND:
4635 /* Internal data, OK to advance */
4636 rb_advance_iter(iter);
4637 goto again;
4638
4639 case RINGBUF_TYPE_TIME_STAMP:
4640 if (ts) {
4641 *ts = ring_buffer_event_time_stamp(event);
4642 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4643 cpu_buffer->cpu, ts);
4644 }
4645 /* Internal data, OK to advance */
4646 rb_advance_iter(iter);
4647 goto again;
4648
4649 case RINGBUF_TYPE_DATA:
4650 if (ts && !(*ts)) {
4651 *ts = iter->read_stamp + event->time_delta;
4652 ring_buffer_normalize_time_stamp(buffer,
4653 cpu_buffer->cpu, ts);
4654 }
4655 return event;
4656
4657 default:
4658 RB_WARN_ON(cpu_buffer, 1);
4659 }
4660
4661 return NULL;
4662 }
4663 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4664
rb_reader_lock(struct ring_buffer_per_cpu * cpu_buffer)4665 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4666 {
4667 if (likely(!in_nmi())) {
4668 raw_spin_lock(&cpu_buffer->reader_lock);
4669 return true;
4670 }
4671
4672 /*
4673 * If an NMI die dumps out the content of the ring buffer
4674 * trylock must be used to prevent a deadlock if the NMI
4675 * preempted a task that holds the ring buffer locks. If
4676 * we get the lock then all is fine, if not, then continue
4677 * to do the read, but this can corrupt the ring buffer,
4678 * so it must be permanently disabled from future writes.
4679 * Reading from NMI is a oneshot deal.
4680 */
4681 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4682 return true;
4683
4684 /* Continue without locking, but disable the ring buffer */
4685 atomic_inc(&cpu_buffer->record_disabled);
4686 return false;
4687 }
4688
4689 static inline void
rb_reader_unlock(struct ring_buffer_per_cpu * cpu_buffer,bool locked)4690 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4691 {
4692 if (likely(locked))
4693 raw_spin_unlock(&cpu_buffer->reader_lock);
4694 return;
4695 }
4696
4697 /**
4698 * ring_buffer_peek - peek at the next event to be read
4699 * @buffer: The ring buffer to read
4700 * @cpu: The cpu to peak at
4701 * @ts: The timestamp counter of this event.
4702 * @lost_events: a variable to store if events were lost (may be NULL)
4703 *
4704 * This will return the event that will be read next, but does
4705 * not consume the data.
4706 */
4707 struct ring_buffer_event *
ring_buffer_peek(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)4708 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
4709 unsigned long *lost_events)
4710 {
4711 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
4712 struct ring_buffer_event *event;
4713 unsigned long flags;
4714 bool dolock;
4715
4716 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4717 return NULL;
4718
4719 again:
4720 local_irq_save(flags);
4721 dolock = rb_reader_lock(cpu_buffer);
4722 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4723 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4724 rb_advance_reader(cpu_buffer);
4725 rb_reader_unlock(cpu_buffer, dolock);
4726 local_irq_restore(flags);
4727
4728 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4729 goto again;
4730
4731 return event;
4732 }
4733
4734 /** ring_buffer_iter_dropped - report if there are dropped events
4735 * @iter: The ring buffer iterator
4736 *
4737 * Returns true if there was dropped events since the last peek.
4738 */
ring_buffer_iter_dropped(struct ring_buffer_iter * iter)4739 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
4740 {
4741 bool ret = iter->missed_events != 0;
4742
4743 iter->missed_events = 0;
4744 return ret;
4745 }
4746 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
4747
4748 /**
4749 * ring_buffer_iter_peek - peek at the next event to be read
4750 * @iter: The ring buffer iterator
4751 * @ts: The timestamp counter of this event.
4752 *
4753 * This will return the event that will be read next, but does
4754 * not increment the iterator.
4755 */
4756 struct ring_buffer_event *
ring_buffer_iter_peek(struct ring_buffer_iter * iter,u64 * ts)4757 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4758 {
4759 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4760 struct ring_buffer_event *event;
4761 unsigned long flags;
4762
4763 again:
4764 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4765 event = rb_iter_peek(iter, ts);
4766 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4767
4768 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4769 goto again;
4770
4771 return event;
4772 }
4773
4774 /**
4775 * ring_buffer_consume - return an event and consume it
4776 * @buffer: The ring buffer to get the next event from
4777 * @cpu: the cpu to read the buffer from
4778 * @ts: a variable to store the timestamp (may be NULL)
4779 * @lost_events: a variable to store if events were lost (may be NULL)
4780 *
4781 * Returns the next event in the ring buffer, and that event is consumed.
4782 * Meaning, that sequential reads will keep returning a different event,
4783 * and eventually empty the ring buffer if the producer is slower.
4784 */
4785 struct ring_buffer_event *
ring_buffer_consume(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)4786 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
4787 unsigned long *lost_events)
4788 {
4789 struct ring_buffer_per_cpu *cpu_buffer;
4790 struct ring_buffer_event *event = NULL;
4791 unsigned long flags;
4792 bool dolock;
4793
4794 again:
4795 /* might be called in atomic */
4796 preempt_disable();
4797
4798 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4799 goto out;
4800
4801 cpu_buffer = buffer->buffers[cpu];
4802 local_irq_save(flags);
4803 dolock = rb_reader_lock(cpu_buffer);
4804
4805 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
4806 if (event) {
4807 cpu_buffer->lost_events = 0;
4808 rb_advance_reader(cpu_buffer);
4809 }
4810
4811 rb_reader_unlock(cpu_buffer, dolock);
4812 local_irq_restore(flags);
4813
4814 out:
4815 preempt_enable();
4816
4817 if (event && event->type_len == RINGBUF_TYPE_PADDING)
4818 goto again;
4819
4820 return event;
4821 }
4822 EXPORT_SYMBOL_GPL(ring_buffer_consume);
4823
4824 /**
4825 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
4826 * @buffer: The ring buffer to read from
4827 * @cpu: The cpu buffer to iterate over
4828 * @flags: gfp flags to use for memory allocation
4829 *
4830 * This performs the initial preparations necessary to iterate
4831 * through the buffer. Memory is allocated, buffer recording
4832 * is disabled, and the iterator pointer is returned to the caller.
4833 *
4834 * Disabling buffer recording prevents the reading from being
4835 * corrupted. This is not a consuming read, so a producer is not
4836 * expected.
4837 *
4838 * After a sequence of ring_buffer_read_prepare calls, the user is
4839 * expected to make at least one call to ring_buffer_read_prepare_sync.
4840 * Afterwards, ring_buffer_read_start is invoked to get things going
4841 * for real.
4842 *
4843 * This overall must be paired with ring_buffer_read_finish.
4844 */
4845 struct ring_buffer_iter *
ring_buffer_read_prepare(struct trace_buffer * buffer,int cpu,gfp_t flags)4846 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
4847 {
4848 struct ring_buffer_per_cpu *cpu_buffer;
4849 struct ring_buffer_iter *iter;
4850
4851 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4852 return NULL;
4853
4854 iter = kzalloc(sizeof(*iter), flags);
4855 if (!iter)
4856 return NULL;
4857
4858 /* Holds the entire event: data and meta data */
4859 iter->event = kmalloc(BUF_PAGE_SIZE, flags);
4860 if (!iter->event) {
4861 kfree(iter);
4862 return NULL;
4863 }
4864
4865 cpu_buffer = buffer->buffers[cpu];
4866
4867 iter->cpu_buffer = cpu_buffer;
4868
4869 atomic_inc(&cpu_buffer->resize_disabled);
4870
4871 return iter;
4872 }
4873 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
4874
4875 /**
4876 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
4877 *
4878 * All previously invoked ring_buffer_read_prepare calls to prepare
4879 * iterators will be synchronized. Afterwards, read_buffer_read_start
4880 * calls on those iterators are allowed.
4881 */
4882 void
ring_buffer_read_prepare_sync(void)4883 ring_buffer_read_prepare_sync(void)
4884 {
4885 synchronize_rcu();
4886 }
4887 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
4888
4889 /**
4890 * ring_buffer_read_start - start a non consuming read of the buffer
4891 * @iter: The iterator returned by ring_buffer_read_prepare
4892 *
4893 * This finalizes the startup of an iteration through the buffer.
4894 * The iterator comes from a call to ring_buffer_read_prepare and
4895 * an intervening ring_buffer_read_prepare_sync must have been
4896 * performed.
4897 *
4898 * Must be paired with ring_buffer_read_finish.
4899 */
4900 void
ring_buffer_read_start(struct ring_buffer_iter * iter)4901 ring_buffer_read_start(struct ring_buffer_iter *iter)
4902 {
4903 struct ring_buffer_per_cpu *cpu_buffer;
4904 unsigned long flags;
4905
4906 if (!iter)
4907 return;
4908
4909 cpu_buffer = iter->cpu_buffer;
4910
4911 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4912 arch_spin_lock(&cpu_buffer->lock);
4913 rb_iter_reset(iter);
4914 arch_spin_unlock(&cpu_buffer->lock);
4915 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4916 }
4917 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
4918
4919 /**
4920 * ring_buffer_read_finish - finish reading the iterator of the buffer
4921 * @iter: The iterator retrieved by ring_buffer_start
4922 *
4923 * This re-enables the recording to the buffer, and frees the
4924 * iterator.
4925 */
4926 void
ring_buffer_read_finish(struct ring_buffer_iter * iter)4927 ring_buffer_read_finish(struct ring_buffer_iter *iter)
4928 {
4929 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4930 unsigned long flags;
4931
4932 /*
4933 * Ring buffer is disabled from recording, here's a good place
4934 * to check the integrity of the ring buffer.
4935 * Must prevent readers from trying to read, as the check
4936 * clears the HEAD page and readers require it.
4937 */
4938 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4939 rb_check_pages(cpu_buffer);
4940 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4941
4942 atomic_dec(&cpu_buffer->resize_disabled);
4943 kfree(iter->event);
4944 kfree(iter);
4945 }
4946 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
4947
4948 /**
4949 * ring_buffer_iter_advance - advance the iterator to the next location
4950 * @iter: The ring buffer iterator
4951 *
4952 * Move the location of the iterator such that the next read will
4953 * be the next location of the iterator.
4954 */
ring_buffer_iter_advance(struct ring_buffer_iter * iter)4955 void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
4956 {
4957 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4958 unsigned long flags;
4959
4960 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4961
4962 rb_advance_iter(iter);
4963
4964 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4965 }
4966 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
4967
4968 /**
4969 * ring_buffer_size - return the size of the ring buffer (in bytes)
4970 * @buffer: The ring buffer.
4971 * @cpu: The CPU to get ring buffer size from.
4972 */
ring_buffer_size(struct trace_buffer * buffer,int cpu)4973 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
4974 {
4975 /*
4976 * Earlier, this method returned
4977 * BUF_PAGE_SIZE * buffer->nr_pages
4978 * Since the nr_pages field is now removed, we have converted this to
4979 * return the per cpu buffer value.
4980 */
4981 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4982 return 0;
4983
4984 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
4985 }
4986 EXPORT_SYMBOL_GPL(ring_buffer_size);
4987
rb_clear_buffer_page(struct buffer_page * page)4988 static void rb_clear_buffer_page(struct buffer_page *page)
4989 {
4990 local_set(&page->write, 0);
4991 local_set(&page->entries, 0);
4992 rb_init_page(page->page);
4993 page->read = 0;
4994 }
4995
4996 static void
rb_reset_cpu(struct ring_buffer_per_cpu * cpu_buffer)4997 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
4998 {
4999 struct buffer_page *page;
5000
5001 rb_head_page_deactivate(cpu_buffer);
5002
5003 cpu_buffer->head_page
5004 = list_entry(cpu_buffer->pages, struct buffer_page, list);
5005 rb_clear_buffer_page(cpu_buffer->head_page);
5006 list_for_each_entry(page, cpu_buffer->pages, list) {
5007 rb_clear_buffer_page(page);
5008 }
5009
5010 cpu_buffer->tail_page = cpu_buffer->head_page;
5011 cpu_buffer->commit_page = cpu_buffer->head_page;
5012
5013 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5014 INIT_LIST_HEAD(&cpu_buffer->new_pages);
5015 rb_clear_buffer_page(cpu_buffer->reader_page);
5016
5017 local_set(&cpu_buffer->entries_bytes, 0);
5018 local_set(&cpu_buffer->overrun, 0);
5019 local_set(&cpu_buffer->commit_overrun, 0);
5020 local_set(&cpu_buffer->dropped_events, 0);
5021 local_set(&cpu_buffer->entries, 0);
5022 local_set(&cpu_buffer->committing, 0);
5023 local_set(&cpu_buffer->commits, 0);
5024 local_set(&cpu_buffer->pages_touched, 0);
5025 local_set(&cpu_buffer->pages_lost, 0);
5026 local_set(&cpu_buffer->pages_read, 0);
5027 cpu_buffer->last_pages_touch = 0;
5028 cpu_buffer->shortest_full = 0;
5029 cpu_buffer->read = 0;
5030 cpu_buffer->read_bytes = 0;
5031
5032 rb_time_set(&cpu_buffer->write_stamp, 0);
5033 rb_time_set(&cpu_buffer->before_stamp, 0);
5034
5035 cpu_buffer->lost_events = 0;
5036 cpu_buffer->last_overrun = 0;
5037
5038 rb_head_page_activate(cpu_buffer);
5039 cpu_buffer->pages_removed = 0;
5040 }
5041
5042 /* Must have disabled the cpu buffer then done a synchronize_rcu */
reset_disabled_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)5043 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5044 {
5045 unsigned long flags;
5046
5047 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5048
5049 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5050 goto out;
5051
5052 arch_spin_lock(&cpu_buffer->lock);
5053
5054 rb_reset_cpu(cpu_buffer);
5055
5056 arch_spin_unlock(&cpu_buffer->lock);
5057
5058 out:
5059 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5060 }
5061
5062 /**
5063 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5064 * @buffer: The ring buffer to reset a per cpu buffer of
5065 * @cpu: The CPU buffer to be reset
5066 */
ring_buffer_reset_cpu(struct trace_buffer * buffer,int cpu)5067 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
5068 {
5069 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5070
5071 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5072 return;
5073
5074 /* prevent another thread from changing buffer sizes */
5075 mutex_lock(&buffer->mutex);
5076
5077 atomic_inc(&cpu_buffer->resize_disabled);
5078 atomic_inc(&cpu_buffer->record_disabled);
5079
5080 /* Make sure all commits have finished */
5081 synchronize_rcu();
5082
5083 reset_disabled_cpu_buffer(cpu_buffer);
5084
5085 atomic_dec(&cpu_buffer->record_disabled);
5086 atomic_dec(&cpu_buffer->resize_disabled);
5087
5088 mutex_unlock(&buffer->mutex);
5089 }
5090 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
5091
5092 /* Flag to ensure proper resetting of atomic variables */
5093 #define RESET_BIT (1 << 30)
5094
5095 /**
5096 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5097 * @buffer: The ring buffer to reset a per cpu buffer of
5098 * @cpu: The CPU buffer to be reset
5099 */
ring_buffer_reset_online_cpus(struct trace_buffer * buffer)5100 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5101 {
5102 struct ring_buffer_per_cpu *cpu_buffer;
5103 int cpu;
5104
5105 /* prevent another thread from changing buffer sizes */
5106 mutex_lock(&buffer->mutex);
5107
5108 for_each_online_buffer_cpu(buffer, cpu) {
5109 cpu_buffer = buffer->buffers[cpu];
5110
5111 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
5112 atomic_inc(&cpu_buffer->record_disabled);
5113 }
5114
5115 /* Make sure all commits have finished */
5116 synchronize_rcu();
5117
5118 for_each_buffer_cpu(buffer, cpu) {
5119 cpu_buffer = buffer->buffers[cpu];
5120
5121 /*
5122 * If a CPU came online during the synchronize_rcu(), then
5123 * ignore it.
5124 */
5125 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5126 continue;
5127
5128 reset_disabled_cpu_buffer(cpu_buffer);
5129
5130 atomic_dec(&cpu_buffer->record_disabled);
5131 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
5132 }
5133
5134 mutex_unlock(&buffer->mutex);
5135 }
5136
5137 /**
5138 * ring_buffer_reset - reset a ring buffer
5139 * @buffer: The ring buffer to reset all cpu buffers
5140 */
ring_buffer_reset(struct trace_buffer * buffer)5141 void ring_buffer_reset(struct trace_buffer *buffer)
5142 {
5143 struct ring_buffer_per_cpu *cpu_buffer;
5144 int cpu;
5145
5146 /* prevent another thread from changing buffer sizes */
5147 mutex_lock(&buffer->mutex);
5148
5149 for_each_buffer_cpu(buffer, cpu) {
5150 cpu_buffer = buffer->buffers[cpu];
5151
5152 atomic_inc(&cpu_buffer->resize_disabled);
5153 atomic_inc(&cpu_buffer->record_disabled);
5154 }
5155
5156 /* Make sure all commits have finished */
5157 synchronize_rcu();
5158
5159 for_each_buffer_cpu(buffer, cpu) {
5160 cpu_buffer = buffer->buffers[cpu];
5161
5162 reset_disabled_cpu_buffer(cpu_buffer);
5163
5164 atomic_dec(&cpu_buffer->record_disabled);
5165 atomic_dec(&cpu_buffer->resize_disabled);
5166 }
5167
5168 mutex_unlock(&buffer->mutex);
5169 }
5170 EXPORT_SYMBOL_GPL(ring_buffer_reset);
5171
5172 /**
5173 * rind_buffer_empty - is the ring buffer empty?
5174 * @buffer: The ring buffer to test
5175 */
ring_buffer_empty(struct trace_buffer * buffer)5176 bool ring_buffer_empty(struct trace_buffer *buffer)
5177 {
5178 struct ring_buffer_per_cpu *cpu_buffer;
5179 unsigned long flags;
5180 bool dolock;
5181 int cpu;
5182 int ret;
5183
5184 /* yes this is racy, but if you don't like the race, lock the buffer */
5185 for_each_buffer_cpu(buffer, cpu) {
5186 cpu_buffer = buffer->buffers[cpu];
5187 local_irq_save(flags);
5188 dolock = rb_reader_lock(cpu_buffer);
5189 ret = rb_per_cpu_empty(cpu_buffer);
5190 rb_reader_unlock(cpu_buffer, dolock);
5191 local_irq_restore(flags);
5192
5193 if (!ret)
5194 return false;
5195 }
5196
5197 return true;
5198 }
5199 EXPORT_SYMBOL_GPL(ring_buffer_empty);
5200
5201 /**
5202 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5203 * @buffer: The ring buffer
5204 * @cpu: The CPU buffer to test
5205 */
ring_buffer_empty_cpu(struct trace_buffer * buffer,int cpu)5206 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
5207 {
5208 struct ring_buffer_per_cpu *cpu_buffer;
5209 unsigned long flags;
5210 bool dolock;
5211 int ret;
5212
5213 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5214 return true;
5215
5216 cpu_buffer = buffer->buffers[cpu];
5217 local_irq_save(flags);
5218 dolock = rb_reader_lock(cpu_buffer);
5219 ret = rb_per_cpu_empty(cpu_buffer);
5220 rb_reader_unlock(cpu_buffer, dolock);
5221 local_irq_restore(flags);
5222
5223 return ret;
5224 }
5225 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
5226
5227 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
5228 /**
5229 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5230 * @buffer_a: One buffer to swap with
5231 * @buffer_b: The other buffer to swap with
5232 * @cpu: the CPU of the buffers to swap
5233 *
5234 * This function is useful for tracers that want to take a "snapshot"
5235 * of a CPU buffer and has another back up buffer lying around.
5236 * it is expected that the tracer handles the cpu buffer not being
5237 * used at the moment.
5238 */
ring_buffer_swap_cpu(struct trace_buffer * buffer_a,struct trace_buffer * buffer_b,int cpu)5239 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5240 struct trace_buffer *buffer_b, int cpu)
5241 {
5242 struct ring_buffer_per_cpu *cpu_buffer_a;
5243 struct ring_buffer_per_cpu *cpu_buffer_b;
5244 int ret = -EINVAL;
5245
5246 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5247 !cpumask_test_cpu(cpu, buffer_b->cpumask))
5248 goto out;
5249
5250 cpu_buffer_a = buffer_a->buffers[cpu];
5251 cpu_buffer_b = buffer_b->buffers[cpu];
5252
5253 /* At least make sure the two buffers are somewhat the same */
5254 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
5255 goto out;
5256
5257 ret = -EAGAIN;
5258
5259 if (atomic_read(&buffer_a->record_disabled))
5260 goto out;
5261
5262 if (atomic_read(&buffer_b->record_disabled))
5263 goto out;
5264
5265 if (atomic_read(&cpu_buffer_a->record_disabled))
5266 goto out;
5267
5268 if (atomic_read(&cpu_buffer_b->record_disabled))
5269 goto out;
5270
5271 /*
5272 * We can't do a synchronize_rcu here because this
5273 * function can be called in atomic context.
5274 * Normally this will be called from the same CPU as cpu.
5275 * If not it's up to the caller to protect this.
5276 */
5277 atomic_inc(&cpu_buffer_a->record_disabled);
5278 atomic_inc(&cpu_buffer_b->record_disabled);
5279
5280 ret = -EBUSY;
5281 if (local_read(&cpu_buffer_a->committing))
5282 goto out_dec;
5283 if (local_read(&cpu_buffer_b->committing))
5284 goto out_dec;
5285
5286 /*
5287 * When resize is in progress, we cannot swap it because
5288 * it will mess the state of the cpu buffer.
5289 */
5290 if (atomic_read(&buffer_a->resizing))
5291 goto out_dec;
5292 if (atomic_read(&buffer_b->resizing))
5293 goto out_dec;
5294
5295 buffer_a->buffers[cpu] = cpu_buffer_b;
5296 buffer_b->buffers[cpu] = cpu_buffer_a;
5297
5298 cpu_buffer_b->buffer = buffer_a;
5299 cpu_buffer_a->buffer = buffer_b;
5300
5301 ret = 0;
5302
5303 out_dec:
5304 atomic_dec(&cpu_buffer_a->record_disabled);
5305 atomic_dec(&cpu_buffer_b->record_disabled);
5306 out:
5307 return ret;
5308 }
5309 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
5310 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
5311
5312 /**
5313 * ring_buffer_alloc_read_page - allocate a page to read from buffer
5314 * @buffer: the buffer to allocate for.
5315 * @cpu: the cpu buffer to allocate.
5316 *
5317 * This function is used in conjunction with ring_buffer_read_page.
5318 * When reading a full page from the ring buffer, these functions
5319 * can be used to speed up the process. The calling function should
5320 * allocate a few pages first with this function. Then when it
5321 * needs to get pages from the ring buffer, it passes the result
5322 * of this function into ring_buffer_read_page, which will swap
5323 * the page that was allocated, with the read page of the buffer.
5324 *
5325 * Returns:
5326 * The page allocated, or ERR_PTR
5327 */
ring_buffer_alloc_read_page(struct trace_buffer * buffer,int cpu)5328 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
5329 {
5330 struct ring_buffer_per_cpu *cpu_buffer;
5331 struct buffer_data_page *bpage = NULL;
5332 unsigned long flags;
5333 struct page *page;
5334
5335 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5336 return ERR_PTR(-ENODEV);
5337
5338 cpu_buffer = buffer->buffers[cpu];
5339 local_irq_save(flags);
5340 arch_spin_lock(&cpu_buffer->lock);
5341
5342 if (cpu_buffer->free_page) {
5343 bpage = cpu_buffer->free_page;
5344 cpu_buffer->free_page = NULL;
5345 }
5346
5347 arch_spin_unlock(&cpu_buffer->lock);
5348 local_irq_restore(flags);
5349
5350 if (bpage)
5351 goto out;
5352
5353 page = alloc_pages_node(cpu_to_node(cpu),
5354 GFP_KERNEL | __GFP_NORETRY, 0);
5355 if (!page)
5356 return ERR_PTR(-ENOMEM);
5357
5358 bpage = page_address(page);
5359
5360 out:
5361 rb_init_page(bpage);
5362
5363 return bpage;
5364 }
5365 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
5366
5367 /**
5368 * ring_buffer_free_read_page - free an allocated read page
5369 * @buffer: the buffer the page was allocate for
5370 * @cpu: the cpu buffer the page came from
5371 * @data: the page to free
5372 *
5373 * Free a page allocated from ring_buffer_alloc_read_page.
5374 */
ring_buffer_free_read_page(struct trace_buffer * buffer,int cpu,void * data)5375 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data)
5376 {
5377 struct ring_buffer_per_cpu *cpu_buffer;
5378 struct buffer_data_page *bpage = data;
5379 struct page *page = virt_to_page(bpage);
5380 unsigned long flags;
5381
5382 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5383 return;
5384
5385 cpu_buffer = buffer->buffers[cpu];
5386
5387 /* If the page is still in use someplace else, we can't reuse it */
5388 if (page_ref_count(page) > 1)
5389 goto out;
5390
5391 local_irq_save(flags);
5392 arch_spin_lock(&cpu_buffer->lock);
5393
5394 if (!cpu_buffer->free_page) {
5395 cpu_buffer->free_page = bpage;
5396 bpage = NULL;
5397 }
5398
5399 arch_spin_unlock(&cpu_buffer->lock);
5400 local_irq_restore(flags);
5401
5402 out:
5403 free_page((unsigned long)bpage);
5404 }
5405 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
5406
5407 /**
5408 * ring_buffer_read_page - extract a page from the ring buffer
5409 * @buffer: buffer to extract from
5410 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
5411 * @len: amount to extract
5412 * @cpu: the cpu of the buffer to extract
5413 * @full: should the extraction only happen when the page is full.
5414 *
5415 * This function will pull out a page from the ring buffer and consume it.
5416 * @data_page must be the address of the variable that was returned
5417 * from ring_buffer_alloc_read_page. This is because the page might be used
5418 * to swap with a page in the ring buffer.
5419 *
5420 * for example:
5421 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
5422 * if (IS_ERR(rpage))
5423 * return PTR_ERR(rpage);
5424 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
5425 * if (ret >= 0)
5426 * process_page(rpage, ret);
5427 *
5428 * When @full is set, the function will not return true unless
5429 * the writer is off the reader page.
5430 *
5431 * Note: it is up to the calling functions to handle sleeps and wakeups.
5432 * The ring buffer can be used anywhere in the kernel and can not
5433 * blindly call wake_up. The layer that uses the ring buffer must be
5434 * responsible for that.
5435 *
5436 * Returns:
5437 * >=0 if data has been transferred, returns the offset of consumed data.
5438 * <0 if no data has been transferred.
5439 */
ring_buffer_read_page(struct trace_buffer * buffer,void ** data_page,size_t len,int cpu,int full)5440 int ring_buffer_read_page(struct trace_buffer *buffer,
5441 void **data_page, size_t len, int cpu, int full)
5442 {
5443 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5444 struct ring_buffer_event *event;
5445 struct buffer_data_page *bpage;
5446 struct buffer_page *reader;
5447 unsigned long missed_events;
5448 unsigned long flags;
5449 unsigned int commit;
5450 unsigned int read;
5451 u64 save_timestamp;
5452 int ret = -1;
5453
5454 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5455 goto out;
5456
5457 /*
5458 * If len is not big enough to hold the page header, then
5459 * we can not copy anything.
5460 */
5461 if (len <= BUF_PAGE_HDR_SIZE)
5462 goto out;
5463
5464 len -= BUF_PAGE_HDR_SIZE;
5465
5466 if (!data_page)
5467 goto out;
5468
5469 bpage = *data_page;
5470 if (!bpage)
5471 goto out;
5472
5473 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5474
5475 reader = rb_get_reader_page(cpu_buffer);
5476 if (!reader)
5477 goto out_unlock;
5478
5479 event = rb_reader_event(cpu_buffer);
5480
5481 read = reader->read;
5482 commit = rb_page_commit(reader);
5483
5484 /* Check if any events were dropped */
5485 missed_events = cpu_buffer->lost_events;
5486
5487 /*
5488 * If this page has been partially read or
5489 * if len is not big enough to read the rest of the page or
5490 * a writer is still on the page, then
5491 * we must copy the data from the page to the buffer.
5492 * Otherwise, we can simply swap the page with the one passed in.
5493 */
5494 if (read || (len < (commit - read)) ||
5495 cpu_buffer->reader_page == cpu_buffer->commit_page) {
5496 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
5497 unsigned int rpos = read;
5498 unsigned int pos = 0;
5499 unsigned int size;
5500
5501 /*
5502 * If a full page is expected, this can still be returned
5503 * if there's been a previous partial read and the
5504 * rest of the page can be read and the commit page is off
5505 * the reader page.
5506 */
5507 if (full &&
5508 (!read || (len < (commit - read)) ||
5509 cpu_buffer->reader_page == cpu_buffer->commit_page))
5510 goto out_unlock;
5511
5512 if (len > (commit - read))
5513 len = (commit - read);
5514
5515 /* Always keep the time extend and data together */
5516 size = rb_event_ts_length(event);
5517
5518 if (len < size)
5519 goto out_unlock;
5520
5521 /* save the current timestamp, since the user will need it */
5522 save_timestamp = cpu_buffer->read_stamp;
5523
5524 /* Need to copy one event at a time */
5525 do {
5526 /* We need the size of one event, because
5527 * rb_advance_reader only advances by one event,
5528 * whereas rb_event_ts_length may include the size of
5529 * one or two events.
5530 * We have already ensured there's enough space if this
5531 * is a time extend. */
5532 size = rb_event_length(event);
5533 memcpy(bpage->data + pos, rpage->data + rpos, size);
5534
5535 len -= size;
5536
5537 rb_advance_reader(cpu_buffer);
5538 rpos = reader->read;
5539 pos += size;
5540
5541 if (rpos >= commit)
5542 break;
5543
5544 event = rb_reader_event(cpu_buffer);
5545 /* Always keep the time extend and data together */
5546 size = rb_event_ts_length(event);
5547 } while (len >= size);
5548
5549 /* update bpage */
5550 local_set(&bpage->commit, pos);
5551 bpage->time_stamp = save_timestamp;
5552
5553 /* we copied everything to the beginning */
5554 read = 0;
5555 } else {
5556 /* update the entry counter */
5557 cpu_buffer->read += rb_page_entries(reader);
5558 cpu_buffer->read_bytes += rb_page_commit(reader);
5559
5560 /* swap the pages */
5561 rb_init_page(bpage);
5562 bpage = reader->page;
5563 reader->page = *data_page;
5564 local_set(&reader->write, 0);
5565 local_set(&reader->entries, 0);
5566 reader->read = 0;
5567 *data_page = bpage;
5568
5569 /*
5570 * Use the real_end for the data size,
5571 * This gives us a chance to store the lost events
5572 * on the page.
5573 */
5574 if (reader->real_end)
5575 local_set(&bpage->commit, reader->real_end);
5576 }
5577 ret = read;
5578
5579 cpu_buffer->lost_events = 0;
5580
5581 commit = local_read(&bpage->commit);
5582 /*
5583 * Set a flag in the commit field if we lost events
5584 */
5585 if (missed_events) {
5586 /* If there is room at the end of the page to save the
5587 * missed events, then record it there.
5588 */
5589 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5590 memcpy(&bpage->data[commit], &missed_events,
5591 sizeof(missed_events));
5592 local_add(RB_MISSED_STORED, &bpage->commit);
5593 commit += sizeof(missed_events);
5594 }
5595 local_add(RB_MISSED_EVENTS, &bpage->commit);
5596 }
5597
5598 /*
5599 * This page may be off to user land. Zero it out here.
5600 */
5601 if (commit < BUF_PAGE_SIZE)
5602 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5603
5604 out_unlock:
5605 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5606
5607 out:
5608 return ret;
5609 }
5610 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5611
5612 /*
5613 * We only allocate new buffers, never free them if the CPU goes down.
5614 * If we were to free the buffer, then the user would lose any trace that was in
5615 * the buffer.
5616 */
trace_rb_cpu_prepare(unsigned int cpu,struct hlist_node * node)5617 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5618 {
5619 struct trace_buffer *buffer;
5620 long nr_pages_same;
5621 int cpu_i;
5622 unsigned long nr_pages;
5623
5624 buffer = container_of(node, struct trace_buffer, node);
5625 if (cpumask_test_cpu(cpu, buffer->cpumask))
5626 return 0;
5627
5628 nr_pages = 0;
5629 nr_pages_same = 1;
5630 /* check if all cpu sizes are same */
5631 for_each_buffer_cpu(buffer, cpu_i) {
5632 /* fill in the size from first enabled cpu */
5633 if (nr_pages == 0)
5634 nr_pages = buffer->buffers[cpu_i]->nr_pages;
5635 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5636 nr_pages_same = 0;
5637 break;
5638 }
5639 }
5640 /* allocate minimum pages, user can later expand it */
5641 if (!nr_pages_same)
5642 nr_pages = 2;
5643 buffer->buffers[cpu] =
5644 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
5645 if (!buffer->buffers[cpu]) {
5646 WARN(1, "failed to allocate ring buffer on CPU %u\n",
5647 cpu);
5648 return -ENOMEM;
5649 }
5650 smp_wmb();
5651 cpumask_set_cpu(cpu, buffer->cpumask);
5652 return 0;
5653 }
5654
5655 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
5656 /*
5657 * This is a basic integrity check of the ring buffer.
5658 * Late in the boot cycle this test will run when configured in.
5659 * It will kick off a thread per CPU that will go into a loop
5660 * writing to the per cpu ring buffer various sizes of data.
5661 * Some of the data will be large items, some small.
5662 *
5663 * Another thread is created that goes into a spin, sending out
5664 * IPIs to the other CPUs to also write into the ring buffer.
5665 * this is to test the nesting ability of the buffer.
5666 *
5667 * Basic stats are recorded and reported. If something in the
5668 * ring buffer should happen that's not expected, a big warning
5669 * is displayed and all ring buffers are disabled.
5670 */
5671 static struct task_struct *rb_threads[NR_CPUS] __initdata;
5672
5673 struct rb_test_data {
5674 struct trace_buffer *buffer;
5675 unsigned long events;
5676 unsigned long bytes_written;
5677 unsigned long bytes_alloc;
5678 unsigned long bytes_dropped;
5679 unsigned long events_nested;
5680 unsigned long bytes_written_nested;
5681 unsigned long bytes_alloc_nested;
5682 unsigned long bytes_dropped_nested;
5683 int min_size_nested;
5684 int max_size_nested;
5685 int max_size;
5686 int min_size;
5687 int cpu;
5688 int cnt;
5689 };
5690
5691 static struct rb_test_data rb_data[NR_CPUS] __initdata;
5692
5693 /* 1 meg per cpu */
5694 #define RB_TEST_BUFFER_SIZE 1048576
5695
5696 static char rb_string[] __initdata =
5697 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
5698 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
5699 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
5700
5701 static bool rb_test_started __initdata;
5702
5703 struct rb_item {
5704 int size;
5705 char str[];
5706 };
5707
rb_write_something(struct rb_test_data * data,bool nested)5708 static __init int rb_write_something(struct rb_test_data *data, bool nested)
5709 {
5710 struct ring_buffer_event *event;
5711 struct rb_item *item;
5712 bool started;
5713 int event_len;
5714 int size;
5715 int len;
5716 int cnt;
5717
5718 /* Have nested writes different that what is written */
5719 cnt = data->cnt + (nested ? 27 : 0);
5720
5721 /* Multiply cnt by ~e, to make some unique increment */
5722 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
5723
5724 len = size + sizeof(struct rb_item);
5725
5726 started = rb_test_started;
5727 /* read rb_test_started before checking buffer enabled */
5728 smp_rmb();
5729
5730 event = ring_buffer_lock_reserve(data->buffer, len);
5731 if (!event) {
5732 /* Ignore dropped events before test starts. */
5733 if (started) {
5734 if (nested)
5735 data->bytes_dropped += len;
5736 else
5737 data->bytes_dropped_nested += len;
5738 }
5739 return len;
5740 }
5741
5742 event_len = ring_buffer_event_length(event);
5743
5744 if (RB_WARN_ON(data->buffer, event_len < len))
5745 goto out;
5746
5747 item = ring_buffer_event_data(event);
5748 item->size = size;
5749 memcpy(item->str, rb_string, size);
5750
5751 if (nested) {
5752 data->bytes_alloc_nested += event_len;
5753 data->bytes_written_nested += len;
5754 data->events_nested++;
5755 if (!data->min_size_nested || len < data->min_size_nested)
5756 data->min_size_nested = len;
5757 if (len > data->max_size_nested)
5758 data->max_size_nested = len;
5759 } else {
5760 data->bytes_alloc += event_len;
5761 data->bytes_written += len;
5762 data->events++;
5763 if (!data->min_size || len < data->min_size)
5764 data->max_size = len;
5765 if (len > data->max_size)
5766 data->max_size = len;
5767 }
5768
5769 out:
5770 ring_buffer_unlock_commit(data->buffer, event);
5771
5772 return 0;
5773 }
5774
rb_test(void * arg)5775 static __init int rb_test(void *arg)
5776 {
5777 struct rb_test_data *data = arg;
5778
5779 while (!kthread_should_stop()) {
5780 rb_write_something(data, false);
5781 data->cnt++;
5782
5783 set_current_state(TASK_INTERRUPTIBLE);
5784 /* Now sleep between a min of 100-300us and a max of 1ms */
5785 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
5786 }
5787
5788 return 0;
5789 }
5790
rb_ipi(void * ignore)5791 static __init void rb_ipi(void *ignore)
5792 {
5793 struct rb_test_data *data;
5794 int cpu = smp_processor_id();
5795
5796 data = &rb_data[cpu];
5797 rb_write_something(data, true);
5798 }
5799
rb_hammer_test(void * arg)5800 static __init int rb_hammer_test(void *arg)
5801 {
5802 while (!kthread_should_stop()) {
5803
5804 /* Send an IPI to all cpus to write data! */
5805 smp_call_function(rb_ipi, NULL, 1);
5806 /* No sleep, but for non preempt, let others run */
5807 schedule();
5808 }
5809
5810 return 0;
5811 }
5812
test_ringbuffer(void)5813 static __init int test_ringbuffer(void)
5814 {
5815 struct task_struct *rb_hammer;
5816 struct trace_buffer *buffer;
5817 int cpu;
5818 int ret = 0;
5819
5820 if (security_locked_down(LOCKDOWN_TRACEFS)) {
5821 pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
5822 return 0;
5823 }
5824
5825 pr_info("Running ring buffer tests...\n");
5826
5827 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
5828 if (WARN_ON(!buffer))
5829 return 0;
5830
5831 /* Disable buffer so that threads can't write to it yet */
5832 ring_buffer_record_off(buffer);
5833
5834 for_each_online_cpu(cpu) {
5835 rb_data[cpu].buffer = buffer;
5836 rb_data[cpu].cpu = cpu;
5837 rb_data[cpu].cnt = cpu;
5838 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
5839 "rbtester/%d", cpu);
5840 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
5841 pr_cont("FAILED\n");
5842 ret = PTR_ERR(rb_threads[cpu]);
5843 goto out_free;
5844 }
5845
5846 kthread_bind(rb_threads[cpu], cpu);
5847 wake_up_process(rb_threads[cpu]);
5848 }
5849
5850 /* Now create the rb hammer! */
5851 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
5852 if (WARN_ON(IS_ERR(rb_hammer))) {
5853 pr_cont("FAILED\n");
5854 ret = PTR_ERR(rb_hammer);
5855 goto out_free;
5856 }
5857
5858 ring_buffer_record_on(buffer);
5859 /*
5860 * Show buffer is enabled before setting rb_test_started.
5861 * Yes there's a small race window where events could be
5862 * dropped and the thread wont catch it. But when a ring
5863 * buffer gets enabled, there will always be some kind of
5864 * delay before other CPUs see it. Thus, we don't care about
5865 * those dropped events. We care about events dropped after
5866 * the threads see that the buffer is active.
5867 */
5868 smp_wmb();
5869 rb_test_started = true;
5870
5871 set_current_state(TASK_INTERRUPTIBLE);
5872 /* Just run for 10 seconds */;
5873 schedule_timeout(10 * HZ);
5874
5875 kthread_stop(rb_hammer);
5876
5877 out_free:
5878 for_each_online_cpu(cpu) {
5879 if (!rb_threads[cpu])
5880 break;
5881 kthread_stop(rb_threads[cpu]);
5882 }
5883 if (ret) {
5884 ring_buffer_free(buffer);
5885 return ret;
5886 }
5887
5888 /* Report! */
5889 pr_info("finished\n");
5890 for_each_online_cpu(cpu) {
5891 struct ring_buffer_event *event;
5892 struct rb_test_data *data = &rb_data[cpu];
5893 struct rb_item *item;
5894 unsigned long total_events;
5895 unsigned long total_dropped;
5896 unsigned long total_written;
5897 unsigned long total_alloc;
5898 unsigned long total_read = 0;
5899 unsigned long total_size = 0;
5900 unsigned long total_len = 0;
5901 unsigned long total_lost = 0;
5902 unsigned long lost;
5903 int big_event_size;
5904 int small_event_size;
5905
5906 ret = -1;
5907
5908 total_events = data->events + data->events_nested;
5909 total_written = data->bytes_written + data->bytes_written_nested;
5910 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
5911 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
5912
5913 big_event_size = data->max_size + data->max_size_nested;
5914 small_event_size = data->min_size + data->min_size_nested;
5915
5916 pr_info("CPU %d:\n", cpu);
5917 pr_info(" events: %ld\n", total_events);
5918 pr_info(" dropped bytes: %ld\n", total_dropped);
5919 pr_info(" alloced bytes: %ld\n", total_alloc);
5920 pr_info(" written bytes: %ld\n", total_written);
5921 pr_info(" biggest event: %d\n", big_event_size);
5922 pr_info(" smallest event: %d\n", small_event_size);
5923
5924 if (RB_WARN_ON(buffer, total_dropped))
5925 break;
5926
5927 ret = 0;
5928
5929 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
5930 total_lost += lost;
5931 item = ring_buffer_event_data(event);
5932 total_len += ring_buffer_event_length(event);
5933 total_size += item->size + sizeof(struct rb_item);
5934 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
5935 pr_info("FAILED!\n");
5936 pr_info("buffer had: %.*s\n", item->size, item->str);
5937 pr_info("expected: %.*s\n", item->size, rb_string);
5938 RB_WARN_ON(buffer, 1);
5939 ret = -1;
5940 break;
5941 }
5942 total_read++;
5943 }
5944 if (ret)
5945 break;
5946
5947 ret = -1;
5948
5949 pr_info(" read events: %ld\n", total_read);
5950 pr_info(" lost events: %ld\n", total_lost);
5951 pr_info(" total events: %ld\n", total_lost + total_read);
5952 pr_info(" recorded len bytes: %ld\n", total_len);
5953 pr_info(" recorded size bytes: %ld\n", total_size);
5954 if (total_lost)
5955 pr_info(" With dropped events, record len and size may not match\n"
5956 " alloced and written from above\n");
5957 if (!total_lost) {
5958 if (RB_WARN_ON(buffer, total_len != total_alloc ||
5959 total_size != total_written))
5960 break;
5961 }
5962 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
5963 break;
5964
5965 ret = 0;
5966 }
5967 if (!ret)
5968 pr_info("Ring buffer PASSED!\n");
5969
5970 ring_buffer_free(buffer);
5971 return 0;
5972 }
5973
5974 late_initcall(test_ringbuffer);
5975 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
5976