1 // SPDX-License-Identifier: GPL-2.0
2 /*
3 * Generic ring buffer
4 *
5 * Copyright (C) 2008 Steven Rostedt <srostedt@redhat.com>
6 */
7 #include <linux/trace_recursion.h>
8 #include <linux/ring_buffer_ext.h>
9 #include <linux/trace_events.h>
10 #include <linux/ring_buffer.h>
11 #include <linux/trace_clock.h>
12 #include <linux/sched/clock.h>
13 #include <linux/trace_seq.h>
14 #include <linux/spinlock.h>
15 #include <linux/irq_work.h>
16 #include <linux/security.h>
17 #include <linux/uaccess.h>
18 #include <linux/hardirq.h>
19 #include <linux/kthread.h> /* for self test */
20 #include <linux/module.h>
21 #include <linux/percpu.h>
22 #include <linux/mutex.h>
23 #include <linux/delay.h>
24 #include <linux/slab.h>
25 #include <linux/init.h>
26 #include <linux/hash.h>
27 #include <linux/list.h>
28 #include <linux/cpu.h>
29 #include <linux/oom.h>
30
31 #include <asm/local.h>
32
33 static void update_pages_handler(struct work_struct *work);
34
35 /*
36 * The ring buffer header is special. We must manually up keep it.
37 */
ring_buffer_print_entry_header(struct trace_seq * s)38 int ring_buffer_print_entry_header(struct trace_seq *s)
39 {
40 trace_seq_puts(s, "# compressed entry header\n");
41 trace_seq_puts(s, "\ttype_len : 5 bits\n");
42 trace_seq_puts(s, "\ttime_delta : 27 bits\n");
43 trace_seq_puts(s, "\tarray : 32 bits\n");
44 trace_seq_putc(s, '\n');
45 trace_seq_printf(s, "\tpadding : type == %d\n",
46 RINGBUF_TYPE_PADDING);
47 trace_seq_printf(s, "\ttime_extend : type == %d\n",
48 RINGBUF_TYPE_TIME_EXTEND);
49 trace_seq_printf(s, "\ttime_stamp : type == %d\n",
50 RINGBUF_TYPE_TIME_STAMP);
51 trace_seq_printf(s, "\tdata max type_len == %d\n",
52 RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
53
54 return !trace_seq_has_overflowed(s);
55 }
56
57 /*
58 * The ring buffer is made up of a list of pages. A separate list of pages is
59 * allocated for each CPU. A writer may only write to a buffer that is
60 * associated with the CPU it is currently executing on. A reader may read
61 * from any per cpu buffer.
62 *
63 * The reader is special. For each per cpu buffer, the reader has its own
64 * reader page. When a reader has read the entire reader page, this reader
65 * page is swapped with another page in the ring buffer.
66 *
67 * Now, as long as the writer is off the reader page, the reader can do what
68 * ever it wants with that page. The writer will never write to that page
69 * again (as long as it is out of the ring buffer).
70 *
71 * Here's some silly ASCII art.
72 *
73 * +------+
74 * |reader| RING BUFFER
75 * |page |
76 * +------+ +---+ +---+ +---+
77 * | |-->| |-->| |
78 * +---+ +---+ +---+
79 * ^ |
80 * | |
81 * +---------------+
82 *
83 *
84 * +------+
85 * |reader| RING BUFFER
86 * |page |------------------v
87 * +------+ +---+ +---+ +---+
88 * | |-->| |-->| |
89 * +---+ +---+ +---+
90 * ^ |
91 * | |
92 * +---------------+
93 *
94 *
95 * +------+
96 * |reader| RING BUFFER
97 * |page |------------------v
98 * +------+ +---+ +---+ +---+
99 * ^ | |-->| |-->| |
100 * | +---+ +---+ +---+
101 * | |
102 * | |
103 * +------------------------------+
104 *
105 *
106 * +------+
107 * |buffer| RING BUFFER
108 * |page |------------------v
109 * +------+ +---+ +---+ +---+
110 * ^ | | | |-->| |
111 * | New +---+ +---+ +---+
112 * | Reader------^ |
113 * | page |
114 * +------------------------------+
115 *
116 *
117 * After we make this swap, the reader can hand this page off to the splice
118 * code and be done with it. It can even allocate a new page if it needs to
119 * and swap that into the ring buffer.
120 *
121 * We will be using cmpxchg soon to make all this lockless.
122 *
123 */
124
125 /* Used for individual buffers (after the counter) */
126 #define RB_BUFFER_OFF (1 << 20)
127
128 /* define RINGBUF_TYPE_DATA for 'case RINGBUF_TYPE_DATA:' */
129 #define RINGBUF_TYPE_DATA 0 ... RINGBUF_TYPE_DATA_TYPE_LEN_MAX
130
131 enum {
132 RB_LEN_TIME_EXTEND = 8,
133 RB_LEN_TIME_STAMP = 8,
134 };
135
136 #define skip_time_extend(event) \
137 ((struct ring_buffer_event *)((char *)event + RB_LEN_TIME_EXTEND))
138
139 #define extended_time(event) \
140 (event->type_len >= RINGBUF_TYPE_TIME_EXTEND)
141
rb_null_event(struct ring_buffer_event * event)142 static inline int rb_null_event(struct ring_buffer_event *event)
143 {
144 return event->type_len == RINGBUF_TYPE_PADDING && !event->time_delta;
145 }
146
rb_event_set_padding(struct ring_buffer_event * event)147 static void rb_event_set_padding(struct ring_buffer_event *event)
148 {
149 /* padding has a NULL time_delta */
150 event->type_len = RINGBUF_TYPE_PADDING;
151 event->time_delta = 0;
152 }
153
154 static unsigned
rb_event_data_length(struct ring_buffer_event * event)155 rb_event_data_length(struct ring_buffer_event *event)
156 {
157 unsigned length;
158
159 if (event->type_len)
160 length = event->type_len * RB_ALIGNMENT;
161 else
162 length = event->array[0];
163 return length + RB_EVNT_HDR_SIZE;
164 }
165
166 /*
167 * Return the length of the given event. Will return
168 * the length of the time extend if the event is a
169 * time extend.
170 */
171 static inline unsigned
rb_event_length(struct ring_buffer_event * event)172 rb_event_length(struct ring_buffer_event *event)
173 {
174 switch (event->type_len) {
175 case RINGBUF_TYPE_PADDING:
176 if (rb_null_event(event))
177 /* undefined */
178 return -1;
179 return event->array[0] + RB_EVNT_HDR_SIZE;
180
181 case RINGBUF_TYPE_TIME_EXTEND:
182 return RB_LEN_TIME_EXTEND;
183
184 case RINGBUF_TYPE_TIME_STAMP:
185 return RB_LEN_TIME_STAMP;
186
187 case RINGBUF_TYPE_DATA:
188 return rb_event_data_length(event);
189 default:
190 WARN_ON_ONCE(1);
191 }
192 /* not hit */
193 return 0;
194 }
195
196 /*
197 * Return total length of time extend and data,
198 * or just the event length for all other events.
199 */
200 static inline unsigned
rb_event_ts_length(struct ring_buffer_event * event)201 rb_event_ts_length(struct ring_buffer_event *event)
202 {
203 unsigned len = 0;
204
205 if (extended_time(event)) {
206 /* time extends include the data event after it */
207 len = RB_LEN_TIME_EXTEND;
208 event = skip_time_extend(event);
209 }
210 return len + rb_event_length(event);
211 }
212
213 /**
214 * ring_buffer_event_length - return the length of the event
215 * @event: the event to get the length of
216 *
217 * Returns the size of the data load of a data event.
218 * If the event is something other than a data event, it
219 * returns the size of the event itself. With the exception
220 * of a TIME EXTEND, where it still returns the size of the
221 * data load of the data event after it.
222 */
ring_buffer_event_length(struct ring_buffer_event * event)223 unsigned ring_buffer_event_length(struct ring_buffer_event *event)
224 {
225 unsigned length;
226
227 if (extended_time(event))
228 event = skip_time_extend(event);
229
230 length = rb_event_length(event);
231 if (event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
232 return length;
233 length -= RB_EVNT_HDR_SIZE;
234 if (length > RB_MAX_SMALL_DATA + sizeof(event->array[0]))
235 length -= sizeof(event->array[0]);
236 return length;
237 }
238 EXPORT_SYMBOL_GPL(ring_buffer_event_length);
239
240 /* inline for ring buffer fast paths */
241 static __always_inline void *
rb_event_data(struct ring_buffer_event * event)242 rb_event_data(struct ring_buffer_event *event)
243 {
244 if (extended_time(event))
245 event = skip_time_extend(event);
246 WARN_ON_ONCE(event->type_len > RINGBUF_TYPE_DATA_TYPE_LEN_MAX);
247 /* If length is in len field, then array[0] has the data */
248 if (event->type_len)
249 return (void *)&event->array[0];
250 /* Otherwise length is in array[0] and array[1] has the data */
251 return (void *)&event->array[1];
252 }
253
254 /**
255 * ring_buffer_event_data - return the data of the event
256 * @event: the event to get the data from
257 */
ring_buffer_event_data(struct ring_buffer_event * event)258 void *ring_buffer_event_data(struct ring_buffer_event *event)
259 {
260 return rb_event_data(event);
261 }
262 EXPORT_SYMBOL_GPL(ring_buffer_event_data);
263
264 #define for_each_buffer_cpu(buffer, cpu) \
265 for_each_cpu(cpu, buffer->cpumask)
266
267 #define for_each_online_buffer_cpu(buffer, cpu) \
268 for_each_cpu_and(cpu, buffer->cpumask, cpu_online_mask)
269
rb_event_time_stamp(struct ring_buffer_event * event)270 static u64 rb_event_time_stamp(struct ring_buffer_event *event)
271 {
272 u64 ts;
273
274 ts = event->array[0];
275 ts <<= TS_SHIFT;
276 ts += event->time_delta;
277
278 return ts;
279 }
280
281 /* Flag when events were overwritten */
282 #define RB_MISSED_EVENTS (1 << 31)
283 /* Missed count stored at end */
284 #define RB_MISSED_STORED (1 << 30)
285
286 /*
287 * Note, the buffer_page list must be first. The buffer pages
288 * are allocated in cache lines, which means that each buffer
289 * page will be at the beginning of a cache line, and thus
290 * the least significant bits will be zero. We use this to
291 * add flags in the list struct pointers, to make the ring buffer
292 * lockless.
293 */
294 struct buffer_page {
295 struct list_head list; /* list of buffer pages */
296 local_t write; /* index for next write */
297 unsigned read; /* index for next read */
298 local_t entries; /* entries on this page */
299 unsigned long real_end; /* real end of data */
300 struct buffer_data_page *page; /* Actual data page */
301 };
302
303 /*
304 * The buffer page counters, write and entries, must be reset
305 * atomically when crossing page boundaries. To synchronize this
306 * update, two counters are inserted into the number. One is
307 * the actual counter for the write position or count on the page.
308 *
309 * The other is a counter of updaters. Before an update happens
310 * the update partition of the counter is incremented. This will
311 * allow the updater to update the counter atomically.
312 *
313 * The counter is 20 bits, and the state data is 12.
314 */
315 #define RB_WRITE_MASK 0xfffff
316 #define RB_WRITE_INTCNT (1 << 20)
317
rb_init_page(struct buffer_data_page * bpage)318 static void rb_init_page(struct buffer_data_page *bpage)
319 {
320 local_set(&bpage->commit, 0);
321 }
322
rb_page_commit(struct buffer_page * bpage)323 static __always_inline unsigned int rb_page_commit(struct buffer_page *bpage)
324 {
325 return local_read(&bpage->page->commit);
326 }
327
free_buffer_page(struct buffer_page * bpage)328 static void free_buffer_page(struct buffer_page *bpage)
329 {
330 free_page((unsigned long)bpage->page);
331 kfree(bpage);
332 }
333
334 /* Max payload is BUF_PAGE_SIZE - header (8bytes) */
335 #define BUF_MAX_DATA_SIZE (BUF_PAGE_SIZE - (sizeof(u32) * 2))
336
ring_buffer_print_page_header(struct trace_seq * s)337 int ring_buffer_print_page_header(struct trace_seq *s)
338 {
339 struct buffer_data_page field;
340
341 trace_seq_printf(s, "\tfield: u64 timestamp;\t"
342 "offset:0;\tsize:%u;\tsigned:%u;\n",
343 (unsigned int)sizeof(field.time_stamp),
344 (unsigned int)is_signed_type(u64));
345
346 trace_seq_printf(s, "\tfield: local_t commit;\t"
347 "offset:%u;\tsize:%u;\tsigned:%u;\n",
348 (unsigned int)offsetof(typeof(field), commit),
349 (unsigned int)sizeof(field.commit),
350 (unsigned int)is_signed_type(long));
351
352 trace_seq_printf(s, "\tfield: int overwrite;\t"
353 "offset:%u;\tsize:%u;\tsigned:%u;\n",
354 (unsigned int)offsetof(typeof(field), commit),
355 1,
356 (unsigned int)is_signed_type(long));
357
358 trace_seq_printf(s, "\tfield: char data;\t"
359 "offset:%u;\tsize:%u;\tsigned:%u;\n",
360 (unsigned int)offsetof(typeof(field), data),
361 (unsigned int)BUF_PAGE_SIZE,
362 (unsigned int)is_signed_type(char));
363
364 return !trace_seq_has_overflowed(s);
365 }
366
367 struct rb_irq_work {
368 struct irq_work work;
369 wait_queue_head_t waiters;
370 wait_queue_head_t full_waiters;
371 long wait_index;
372 bool waiters_pending;
373 bool full_waiters_pending;
374 bool wakeup_full;
375 };
376
377 /*
378 * Structure to hold event state and handle nested events.
379 */
380 struct rb_event_info {
381 u64 ts;
382 u64 delta;
383 u64 before;
384 u64 after;
385 unsigned long length;
386 struct buffer_page *tail_page;
387 int add_timestamp;
388 };
389
390 /*
391 * Used for the add_timestamp
392 * NONE
393 * EXTEND - wants a time extend
394 * ABSOLUTE - the buffer requests all events to have absolute time stamps
395 * FORCE - force a full time stamp.
396 */
397 enum {
398 RB_ADD_STAMP_NONE = 0,
399 RB_ADD_STAMP_EXTEND = BIT(1),
400 RB_ADD_STAMP_ABSOLUTE = BIT(2),
401 RB_ADD_STAMP_FORCE = BIT(3)
402 };
403 /*
404 * Used for which event context the event is in.
405 * TRANSITION = 0
406 * NMI = 1
407 * IRQ = 2
408 * SOFTIRQ = 3
409 * NORMAL = 4
410 *
411 * See trace_recursive_lock() comment below for more details.
412 */
413 enum {
414 RB_CTX_TRANSITION,
415 RB_CTX_NMI,
416 RB_CTX_IRQ,
417 RB_CTX_SOFTIRQ,
418 RB_CTX_NORMAL,
419 RB_CTX_MAX
420 };
421
422 #if BITS_PER_LONG == 32
423 #define RB_TIME_32
424 #endif
425
426 /* To test on 64 bit machines */
427 //#define RB_TIME_32
428
429 #ifdef RB_TIME_32
430
431 struct rb_time_struct {
432 local_t cnt;
433 local_t top;
434 local_t bottom;
435 };
436 #else
437 #include <asm/local64.h>
438 struct rb_time_struct {
439 local64_t time;
440 };
441 #endif
442 typedef struct rb_time_struct rb_time_t;
443
444 #define MAX_NEST 5
445
446 /*
447 * head_page == tail_page && head == tail then buffer is empty.
448 */
449 struct ring_buffer_per_cpu {
450 int cpu;
451 atomic_t record_disabled;
452 atomic_t resize_disabled;
453 struct trace_buffer *buffer;
454 raw_spinlock_t reader_lock; /* serialize readers */
455 arch_spinlock_t lock;
456 struct lock_class_key lock_key;
457 struct buffer_data_page *free_page;
458 unsigned long nr_pages;
459 unsigned int current_context;
460 struct list_head *pages;
461 struct buffer_page *head_page; /* read from head */
462 struct buffer_page *tail_page; /* write to tail */
463 struct buffer_page *commit_page; /* committed pages */
464 struct buffer_page *reader_page;
465 unsigned long lost_events;
466 unsigned long last_overrun;
467 unsigned long nest;
468 local_t entries_bytes;
469 local_t entries;
470 local_t overrun;
471 local_t commit_overrun;
472 local_t dropped_events;
473 local_t committing;
474 local_t commits;
475 local_t pages_touched;
476 local_t pages_lost;
477 local_t pages_read;
478 long last_pages_touch;
479 size_t shortest_full;
480 unsigned long read;
481 unsigned long read_bytes;
482 rb_time_t write_stamp;
483 rb_time_t before_stamp;
484 u64 event_stamp[MAX_NEST];
485 u64 read_stamp;
486 /* ring buffer pages to update, > 0 to add, < 0 to remove */
487 long nr_pages_to_update;
488 struct list_head new_pages; /* new pages to add */
489 struct work_struct update_pages_work;
490 struct completion update_done;
491
492 struct rb_irq_work irq_work;
493 };
494
495 struct trace_buffer {
496 unsigned flags;
497 int cpus;
498 atomic_t record_disabled;
499 atomic_t resizing;
500 cpumask_var_t cpumask;
501
502 struct lock_class_key *reader_lock_key;
503
504 struct mutex mutex;
505
506 struct ring_buffer_per_cpu **buffers;
507
508 struct hlist_node node;
509 u64 (*clock)(void);
510
511 struct rb_irq_work irq_work;
512 bool time_stamp_abs;
513
514 struct ring_buffer_ext_cb *ext_cb;
515 };
516
517 struct ring_buffer_iter {
518 struct ring_buffer_per_cpu *cpu_buffer;
519 unsigned long head;
520 unsigned long next_event;
521 struct buffer_page *head_page;
522 struct buffer_page *cache_reader_page;
523 unsigned long cache_read;
524 u64 read_stamp;
525 u64 page_stamp;
526 struct ring_buffer_event *event;
527 int missed_events;
528 };
529
530 #ifdef RB_TIME_32
531
532 /*
533 * On 32 bit machines, local64_t is very expensive. As the ring
534 * buffer doesn't need all the features of a true 64 bit atomic,
535 * on 32 bit, it uses these functions (64 still uses local64_t).
536 *
537 * For the ring buffer, 64 bit required operations for the time is
538 * the following:
539 *
540 * - Only need 59 bits (uses 60 to make it even).
541 * - Reads may fail if it interrupted a modification of the time stamp.
542 * It will succeed if it did not interrupt another write even if
543 * the read itself is interrupted by a write.
544 * It returns whether it was successful or not.
545 *
546 * - Writes always succeed and will overwrite other writes and writes
547 * that were done by events interrupting the current write.
548 *
549 * - A write followed by a read of the same time stamp will always succeed,
550 * but may not contain the same value.
551 *
552 * - A cmpxchg will fail if it interrupted another write or cmpxchg.
553 * Other than that, it acts like a normal cmpxchg.
554 *
555 * The 60 bit time stamp is broken up by 30 bits in a top and bottom half
556 * (bottom being the least significant 30 bits of the 60 bit time stamp).
557 *
558 * The two most significant bits of each half holds a 2 bit counter (0-3).
559 * Each update will increment this counter by one.
560 * When reading the top and bottom, if the two counter bits match then the
561 * top and bottom together make a valid 60 bit number.
562 */
563 #define RB_TIME_SHIFT 30
564 #define RB_TIME_VAL_MASK ((1 << RB_TIME_SHIFT) - 1)
565
rb_time_cnt(unsigned long val)566 static inline int rb_time_cnt(unsigned long val)
567 {
568 return (val >> RB_TIME_SHIFT) & 3;
569 }
570
rb_time_val(unsigned long top,unsigned long bottom)571 static inline u64 rb_time_val(unsigned long top, unsigned long bottom)
572 {
573 u64 val;
574
575 val = top & RB_TIME_VAL_MASK;
576 val <<= RB_TIME_SHIFT;
577 val |= bottom & RB_TIME_VAL_MASK;
578
579 return val;
580 }
581
__rb_time_read(rb_time_t * t,u64 * ret,unsigned long * cnt)582 static inline bool __rb_time_read(rb_time_t *t, u64 *ret, unsigned long *cnt)
583 {
584 unsigned long top, bottom;
585 unsigned long c;
586
587 /*
588 * If the read is interrupted by a write, then the cnt will
589 * be different. Loop until both top and bottom have been read
590 * without interruption.
591 */
592 do {
593 c = local_read(&t->cnt);
594 top = local_read(&t->top);
595 bottom = local_read(&t->bottom);
596 } while (c != local_read(&t->cnt));
597
598 *cnt = rb_time_cnt(top);
599
600 /* If top and bottom counts don't match, this interrupted a write */
601 if (*cnt != rb_time_cnt(bottom))
602 return false;
603
604 *ret = rb_time_val(top, bottom);
605 return true;
606 }
607
rb_time_read(rb_time_t * t,u64 * ret)608 static bool rb_time_read(rb_time_t *t, u64 *ret)
609 {
610 unsigned long cnt;
611
612 return __rb_time_read(t, ret, &cnt);
613 }
614
rb_time_val_cnt(unsigned long val,unsigned long cnt)615 static inline unsigned long rb_time_val_cnt(unsigned long val, unsigned long cnt)
616 {
617 return (val & RB_TIME_VAL_MASK) | ((cnt & 3) << RB_TIME_SHIFT);
618 }
619
rb_time_split(u64 val,unsigned long * top,unsigned long * bottom)620 static inline void rb_time_split(u64 val, unsigned long *top, unsigned long *bottom)
621 {
622 *top = (unsigned long)((val >> RB_TIME_SHIFT) & RB_TIME_VAL_MASK);
623 *bottom = (unsigned long)(val & RB_TIME_VAL_MASK);
624 }
625
rb_time_val_set(local_t * t,unsigned long val,unsigned long cnt)626 static inline void rb_time_val_set(local_t *t, unsigned long val, unsigned long cnt)
627 {
628 val = rb_time_val_cnt(val, cnt);
629 local_set(t, val);
630 }
631
rb_time_set(rb_time_t * t,u64 val)632 static void rb_time_set(rb_time_t *t, u64 val)
633 {
634 unsigned long cnt, top, bottom;
635
636 rb_time_split(val, &top, &bottom);
637
638 /* Writes always succeed with a valid number even if it gets interrupted. */
639 do {
640 cnt = local_inc_return(&t->cnt);
641 rb_time_val_set(&t->top, top, cnt);
642 rb_time_val_set(&t->bottom, bottom, cnt);
643 } while (cnt != local_read(&t->cnt));
644 }
645
646 static inline bool
rb_time_read_cmpxchg(local_t * l,unsigned long expect,unsigned long set)647 rb_time_read_cmpxchg(local_t *l, unsigned long expect, unsigned long set)
648 {
649 unsigned long ret;
650
651 ret = local_cmpxchg(l, expect, set);
652 return ret == expect;
653 }
654
655 #else /* 64 bits */
656
657 /* local64_t always succeeds */
658
rb_time_read(rb_time_t * t,u64 * ret)659 static inline bool rb_time_read(rb_time_t *t, u64 *ret)
660 {
661 *ret = local64_read(&t->time);
662 return true;
663 }
rb_time_set(rb_time_t * t,u64 val)664 static void rb_time_set(rb_time_t *t, u64 val)
665 {
666 local64_set(&t->time, val);
667 }
668 #endif
669
has_ext_writer(struct trace_buffer * buffer)670 static inline bool has_ext_writer(struct trace_buffer *buffer)
671 {
672 return !!buffer->ext_cb;
673 }
674
rb_has_ext_writer(struct ring_buffer_per_cpu * cpu_buffer)675 static inline bool rb_has_ext_writer(struct ring_buffer_per_cpu *cpu_buffer)
676 {
677 return has_ext_writer(cpu_buffer->buffer);
678 }
679
680 /*
681 * Enable this to make sure that the event passed to
682 * ring_buffer_event_time_stamp() is not committed and also
683 * is on the buffer that it passed in.
684 */
685 //#define RB_VERIFY_EVENT
686 #ifdef RB_VERIFY_EVENT
687 static struct list_head *rb_list_head(struct list_head *list);
verify_event(struct ring_buffer_per_cpu * cpu_buffer,void * event)688 static void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
689 void *event)
690 {
691 struct buffer_page *page = cpu_buffer->commit_page;
692 struct buffer_page *tail_page = READ_ONCE(cpu_buffer->tail_page);
693 struct list_head *next;
694 long commit, write;
695 unsigned long addr = (unsigned long)event;
696 bool done = false;
697 int stop = 0;
698
699 /* Make sure the event exists and is not committed yet */
700 do {
701 if (page == tail_page || WARN_ON_ONCE(stop++ > 100))
702 done = true;
703 commit = local_read(&page->page->commit);
704 write = local_read(&page->write);
705 if (addr >= (unsigned long)&page->page->data[commit] &&
706 addr < (unsigned long)&page->page->data[write])
707 return;
708
709 next = rb_list_head(page->list.next);
710 page = list_entry(next, struct buffer_page, list);
711 } while (!done);
712 WARN_ON_ONCE(1);
713 }
714 #else
verify_event(struct ring_buffer_per_cpu * cpu_buffer,void * event)715 static inline void verify_event(struct ring_buffer_per_cpu *cpu_buffer,
716 void *event)
717 {
718 }
719 #endif
720
721
722 static inline u64 rb_time_stamp(struct trace_buffer *buffer);
723
724 /**
725 * ring_buffer_event_time_stamp - return the event's current time stamp
726 * @buffer: The buffer that the event is on
727 * @event: the event to get the time stamp of
728 *
729 * Note, this must be called after @event is reserved, and before it is
730 * committed to the ring buffer. And must be called from the same
731 * context where the event was reserved (normal, softirq, irq, etc).
732 *
733 * Returns the time stamp associated with the current event.
734 * If the event has an extended time stamp, then that is used as
735 * the time stamp to return.
736 * In the highly unlikely case that the event was nested more than
737 * the max nesting, then the write_stamp of the buffer is returned,
738 * otherwise current time is returned, but that really neither of
739 * the last two cases should ever happen.
740 */
ring_buffer_event_time_stamp(struct trace_buffer * buffer,struct ring_buffer_event * event)741 u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
742 struct ring_buffer_event *event)
743 {
744 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[smp_processor_id()];
745 unsigned int nest;
746 u64 ts;
747
748 /* If the event includes an absolute time, then just use that */
749 if (event->type_len == RINGBUF_TYPE_TIME_STAMP)
750 return rb_event_time_stamp(event);
751
752 nest = local_read(&cpu_buffer->committing);
753 verify_event(cpu_buffer, event);
754 if (WARN_ON_ONCE(!nest))
755 goto fail;
756
757 /* Read the current saved nesting level time stamp */
758 if (likely(--nest < MAX_NEST))
759 return cpu_buffer->event_stamp[nest];
760
761 /* Shouldn't happen, warn if it does */
762 WARN_ONCE(1, "nest (%d) greater than max", nest);
763
764 fail:
765 /* Can only fail on 32 bit */
766 if (!rb_time_read(&cpu_buffer->write_stamp, &ts))
767 /* Screw it, just read the current time */
768 ts = rb_time_stamp(cpu_buffer->buffer);
769
770 return ts;
771 }
772
773 /**
774 * ring_buffer_nr_pages - get the number of buffer pages in the ring buffer
775 * @buffer: The ring_buffer to get the number of pages from
776 * @cpu: The cpu of the ring_buffer to get the number of pages from
777 *
778 * Returns the number of pages used by a per_cpu buffer of the ring buffer.
779 */
ring_buffer_nr_pages(struct trace_buffer * buffer,int cpu)780 size_t ring_buffer_nr_pages(struct trace_buffer *buffer, int cpu)
781 {
782 return buffer->buffers[cpu]->nr_pages;
783 }
784
785 /**
786 * ring_buffer_nr_pages_dirty - get the number of used pages in the ring buffer
787 * @buffer: The ring_buffer to get the number of pages from
788 * @cpu: The cpu of the ring_buffer to get the number of pages from
789 *
790 * Returns the number of pages that have content in the ring buffer.
791 */
ring_buffer_nr_dirty_pages(struct trace_buffer * buffer,int cpu)792 size_t ring_buffer_nr_dirty_pages(struct trace_buffer *buffer, int cpu)
793 {
794 size_t read;
795 size_t lost;
796 size_t cnt;
797
798 read = local_read(&buffer->buffers[cpu]->pages_read);
799 lost = local_read(&buffer->buffers[cpu]->pages_lost);
800 cnt = local_read(&buffer->buffers[cpu]->pages_touched);
801
802 if (WARN_ON_ONCE(cnt < lost))
803 return 0;
804
805 cnt -= lost;
806
807 /* The reader can read an empty page, but not more than that */
808 if (cnt < read) {
809 WARN_ON_ONCE(read > cnt + 1);
810 return 0;
811 }
812
813 return cnt - read;
814 }
815
full_hit(struct trace_buffer * buffer,int cpu,int full)816 static __always_inline bool full_hit(struct trace_buffer *buffer, int cpu, int full)
817 {
818 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
819 size_t nr_pages;
820 size_t dirty;
821
822 nr_pages = cpu_buffer->nr_pages;
823 if (!nr_pages || !full)
824 return true;
825
826 /*
827 * Add one as dirty will never equal nr_pages, as the sub-buffer
828 * that the writer is on is not counted as dirty.
829 * This is needed if "buffer_percent" is set to 100.
830 */
831 dirty = ring_buffer_nr_dirty_pages(buffer, cpu) + 1;
832
833 return (dirty * 100) >= (full * nr_pages);
834 }
835
836 /*
837 * rb_wake_up_waiters - wake up tasks waiting for ring buffer input
838 *
839 * Schedules a delayed work to wake up any task that is blocked on the
840 * ring buffer waiters queue.
841 */
rb_wake_up_waiters(struct irq_work * work)842 static void rb_wake_up_waiters(struct irq_work *work)
843 {
844 struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
845
846 wake_up_all(&rbwork->waiters);
847 if (rbwork->full_waiters_pending || rbwork->wakeup_full) {
848 rbwork->wakeup_full = false;
849 rbwork->full_waiters_pending = false;
850 wake_up_all(&rbwork->full_waiters);
851 }
852 }
853
854 /**
855 * ring_buffer_wake_waiters - wake up any waiters on this ring buffer
856 * @buffer: The ring buffer to wake waiters on
857 *
858 * In the case of a file that represents a ring buffer is closing,
859 * it is prudent to wake up any waiters that are on this.
860 */
ring_buffer_wake_waiters(struct trace_buffer * buffer,int cpu)861 void ring_buffer_wake_waiters(struct trace_buffer *buffer, int cpu)
862 {
863 struct ring_buffer_per_cpu *cpu_buffer;
864 struct rb_irq_work *rbwork;
865
866 if (!buffer)
867 return;
868
869 if (cpu == RING_BUFFER_ALL_CPUS) {
870
871 /* Wake up individual ones too. One level recursion */
872 for_each_buffer_cpu(buffer, cpu)
873 ring_buffer_wake_waiters(buffer, cpu);
874
875 rbwork = &buffer->irq_work;
876 } else {
877 if (WARN_ON_ONCE(!buffer->buffers))
878 return;
879 if (WARN_ON_ONCE(cpu >= nr_cpu_ids))
880 return;
881
882 cpu_buffer = buffer->buffers[cpu];
883 /* The CPU buffer may not have been initialized yet */
884 if (!cpu_buffer)
885 return;
886 rbwork = &cpu_buffer->irq_work;
887 }
888
889 rbwork->wait_index++;
890 /* make sure the waiters see the new index */
891 smp_wmb();
892
893 /* This can be called in any context */
894 irq_work_queue(&rbwork->work);
895 }
896
897 /**
898 * ring_buffer_wait - wait for input to the ring buffer
899 * @buffer: buffer to wait on
900 * @cpu: the cpu buffer to wait on
901 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
902 *
903 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
904 * as data is added to any of the @buffer's cpu buffers. Otherwise
905 * it will wait for data to be added to a specific cpu buffer.
906 */
ring_buffer_wait(struct trace_buffer * buffer,int cpu,int full)907 int ring_buffer_wait(struct trace_buffer *buffer, int cpu, int full)
908 {
909 struct ring_buffer_per_cpu *cpu_buffer;
910 DEFINE_WAIT(wait);
911 struct rb_irq_work *work;
912 long wait_index;
913 int ret = 0;
914
915 /*
916 * Depending on what the caller is waiting for, either any
917 * data in any cpu buffer, or a specific buffer, put the
918 * caller on the appropriate wait queue.
919 */
920 if (cpu == RING_BUFFER_ALL_CPUS) {
921 work = &buffer->irq_work;
922 /* Full only makes sense on per cpu reads */
923 full = 0;
924 } else {
925 if (!cpumask_test_cpu(cpu, buffer->cpumask))
926 return -ENODEV;
927 cpu_buffer = buffer->buffers[cpu];
928 work = &cpu_buffer->irq_work;
929 }
930
931 wait_index = READ_ONCE(work->wait_index);
932
933 while (true) {
934 if (full)
935 prepare_to_wait(&work->full_waiters, &wait, TASK_INTERRUPTIBLE);
936 else
937 prepare_to_wait(&work->waiters, &wait, TASK_INTERRUPTIBLE);
938
939 /*
940 * The events can happen in critical sections where
941 * checking a work queue can cause deadlocks.
942 * After adding a task to the queue, this flag is set
943 * only to notify events to try to wake up the queue
944 * using irq_work.
945 *
946 * We don't clear it even if the buffer is no longer
947 * empty. The flag only causes the next event to run
948 * irq_work to do the work queue wake up. The worse
949 * that can happen if we race with !trace_empty() is that
950 * an event will cause an irq_work to try to wake up
951 * an empty queue.
952 *
953 * There's no reason to protect this flag either, as
954 * the work queue and irq_work logic will do the necessary
955 * synchronization for the wake ups. The only thing
956 * that is necessary is that the wake up happens after
957 * a task has been queued. It's OK for spurious wake ups.
958 */
959 if (full)
960 work->full_waiters_pending = true;
961 else
962 work->waiters_pending = true;
963
964 if (signal_pending(current)) {
965 ret = -EINTR;
966 break;
967 }
968
969 if (cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer))
970 break;
971
972 if (cpu != RING_BUFFER_ALL_CPUS &&
973 !ring_buffer_empty_cpu(buffer, cpu)) {
974 unsigned long flags;
975 bool pagebusy;
976 bool done;
977
978 if (!full)
979 break;
980
981 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
982 pagebusy = cpu_buffer->reader_page == cpu_buffer->commit_page;
983 done = !pagebusy && full_hit(buffer, cpu, full);
984
985 if (!cpu_buffer->shortest_full ||
986 cpu_buffer->shortest_full > full)
987 cpu_buffer->shortest_full = full;
988 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
989 if (done)
990 break;
991 }
992
993 schedule();
994
995 /* Make sure to see the new wait index */
996 smp_rmb();
997 if (wait_index != work->wait_index)
998 break;
999 }
1000
1001 if (full)
1002 finish_wait(&work->full_waiters, &wait);
1003 else
1004 finish_wait(&work->waiters, &wait);
1005
1006 return ret;
1007 }
1008
1009 /**
1010 * ring_buffer_poll_wait - poll on buffer input
1011 * @buffer: buffer to wait on
1012 * @cpu: the cpu buffer to wait on
1013 * @filp: the file descriptor
1014 * @poll_table: The poll descriptor
1015 * @full: wait until the percentage of pages are available, if @cpu != RING_BUFFER_ALL_CPUS
1016 *
1017 * If @cpu == RING_BUFFER_ALL_CPUS then the task will wake up as soon
1018 * as data is added to any of the @buffer's cpu buffers. Otherwise
1019 * it will wait for data to be added to a specific cpu buffer.
1020 *
1021 * Returns EPOLLIN | EPOLLRDNORM if data exists in the buffers,
1022 * zero otherwise.
1023 */
ring_buffer_poll_wait(struct trace_buffer * buffer,int cpu,struct file * filp,poll_table * poll_table,int full)1024 __poll_t ring_buffer_poll_wait(struct trace_buffer *buffer, int cpu,
1025 struct file *filp, poll_table *poll_table, int full)
1026 {
1027 struct ring_buffer_per_cpu *cpu_buffer;
1028 struct rb_irq_work *work;
1029
1030 if (cpu == RING_BUFFER_ALL_CPUS) {
1031 work = &buffer->irq_work;
1032 full = 0;
1033 } else {
1034 if (!cpumask_test_cpu(cpu, buffer->cpumask))
1035 return EPOLLERR;
1036
1037 cpu_buffer = buffer->buffers[cpu];
1038 work = &cpu_buffer->irq_work;
1039 }
1040
1041 if (full) {
1042 poll_wait(filp, &work->full_waiters, poll_table);
1043 work->full_waiters_pending = true;
1044 if (!cpu_buffer->shortest_full ||
1045 cpu_buffer->shortest_full > full)
1046 cpu_buffer->shortest_full = full;
1047 } else {
1048 poll_wait(filp, &work->waiters, poll_table);
1049 work->waiters_pending = true;
1050 }
1051
1052 /*
1053 * There's a tight race between setting the waiters_pending and
1054 * checking if the ring buffer is empty. Once the waiters_pending bit
1055 * is set, the next event will wake the task up, but we can get stuck
1056 * if there's only a single event in.
1057 *
1058 * FIXME: Ideally, we need a memory barrier on the writer side as well,
1059 * but adding a memory barrier to all events will cause too much of a
1060 * performance hit in the fast path. We only need a memory barrier when
1061 * the buffer goes from empty to having content. But as this race is
1062 * extremely small, and it's not a problem if another event comes in, we
1063 * will fix it later.
1064 */
1065 smp_mb();
1066
1067 if (full)
1068 return full_hit(buffer, cpu, full) ? EPOLLIN | EPOLLRDNORM : 0;
1069
1070 if ((cpu == RING_BUFFER_ALL_CPUS && !ring_buffer_empty(buffer)) ||
1071 (cpu != RING_BUFFER_ALL_CPUS && !ring_buffer_empty_cpu(buffer, cpu)))
1072 return EPOLLIN | EPOLLRDNORM;
1073 return 0;
1074 }
1075
1076 /* buffer may be either ring_buffer or ring_buffer_per_cpu */
1077 #define RB_WARN_ON(b, cond) \
1078 ({ \
1079 int _____ret = unlikely(cond); \
1080 if (_____ret) { \
1081 if (__same_type(*(b), struct ring_buffer_per_cpu)) { \
1082 struct ring_buffer_per_cpu *__b = \
1083 (void *)b; \
1084 atomic_inc(&__b->buffer->record_disabled); \
1085 } else \
1086 atomic_inc(&b->record_disabled); \
1087 WARN_ON(1); \
1088 } \
1089 _____ret; \
1090 })
1091
1092 /* Up this if you want to test the TIME_EXTENTS and normalization */
1093 #define DEBUG_SHIFT 0
1094
rb_time_stamp(struct trace_buffer * buffer)1095 static inline u64 rb_time_stamp(struct trace_buffer *buffer)
1096 {
1097 u64 ts;
1098
1099 /* Skip retpolines :-( */
1100 if (IS_ENABLED(CONFIG_RETPOLINE) && likely(buffer->clock == trace_clock_local))
1101 ts = trace_clock_local();
1102 else
1103 ts = buffer->clock();
1104
1105 /* shift to debug/test normalization and TIME_EXTENTS */
1106 return ts << DEBUG_SHIFT;
1107 }
1108
ring_buffer_time_stamp(struct trace_buffer * buffer)1109 u64 ring_buffer_time_stamp(struct trace_buffer *buffer)
1110 {
1111 u64 time;
1112
1113 preempt_disable_notrace();
1114 time = rb_time_stamp(buffer);
1115 preempt_enable_notrace();
1116
1117 return time;
1118 }
1119 EXPORT_SYMBOL_GPL(ring_buffer_time_stamp);
1120
ring_buffer_normalize_time_stamp(struct trace_buffer * buffer,int cpu,u64 * ts)1121 void ring_buffer_normalize_time_stamp(struct trace_buffer *buffer,
1122 int cpu, u64 *ts)
1123 {
1124 /* Just stupid testing the normalize function and deltas */
1125 *ts >>= DEBUG_SHIFT;
1126 }
1127 EXPORT_SYMBOL_GPL(ring_buffer_normalize_time_stamp);
1128
1129 /*
1130 * Making the ring buffer lockless makes things tricky.
1131 * Although writes only happen on the CPU that they are on,
1132 * and they only need to worry about interrupts. Reads can
1133 * happen on any CPU.
1134 *
1135 * The reader page is always off the ring buffer, but when the
1136 * reader finishes with a page, it needs to swap its page with
1137 * a new one from the buffer. The reader needs to take from
1138 * the head (writes go to the tail). But if a writer is in overwrite
1139 * mode and wraps, it must push the head page forward.
1140 *
1141 * Here lies the problem.
1142 *
1143 * The reader must be careful to replace only the head page, and
1144 * not another one. As described at the top of the file in the
1145 * ASCII art, the reader sets its old page to point to the next
1146 * page after head. It then sets the page after head to point to
1147 * the old reader page. But if the writer moves the head page
1148 * during this operation, the reader could end up with the tail.
1149 *
1150 * We use cmpxchg to help prevent this race. We also do something
1151 * special with the page before head. We set the LSB to 1.
1152 *
1153 * When the writer must push the page forward, it will clear the
1154 * bit that points to the head page, move the head, and then set
1155 * the bit that points to the new head page.
1156 *
1157 * We also don't want an interrupt coming in and moving the head
1158 * page on another writer. Thus we use the second LSB to catch
1159 * that too. Thus:
1160 *
1161 * head->list->prev->next bit 1 bit 0
1162 * ------- -------
1163 * Normal page 0 0
1164 * Points to head page 0 1
1165 * New head page 1 0
1166 *
1167 * Note we can not trust the prev pointer of the head page, because:
1168 *
1169 * +----+ +-----+ +-----+
1170 * | |------>| T |---X--->| N |
1171 * | |<------| | | |
1172 * +----+ +-----+ +-----+
1173 * ^ ^ |
1174 * | +-----+ | |
1175 * +----------| R |----------+ |
1176 * | |<-----------+
1177 * +-----+
1178 *
1179 * Key: ---X--> HEAD flag set in pointer
1180 * T Tail page
1181 * R Reader page
1182 * N Next page
1183 *
1184 * (see __rb_reserve_next() to see where this happens)
1185 *
1186 * What the above shows is that the reader just swapped out
1187 * the reader page with a page in the buffer, but before it
1188 * could make the new header point back to the new page added
1189 * it was preempted by a writer. The writer moved forward onto
1190 * the new page added by the reader and is about to move forward
1191 * again.
1192 *
1193 * You can see, it is legitimate for the previous pointer of
1194 * the head (or any page) not to point back to itself. But only
1195 * temporarily.
1196 */
1197
1198 #define RB_PAGE_NORMAL 0UL
1199 #define RB_PAGE_HEAD 1UL
1200 #define RB_PAGE_UPDATE 2UL
1201
1202
1203 #define RB_FLAG_MASK 3UL
1204
1205 /* PAGE_MOVED is not part of the mask */
1206 #define RB_PAGE_MOVED 4UL
1207
1208 /*
1209 * rb_list_head - remove any bit
1210 */
rb_list_head(struct list_head * list)1211 static struct list_head *rb_list_head(struct list_head *list)
1212 {
1213 unsigned long val = (unsigned long)list;
1214
1215 return (struct list_head *)(val & ~RB_FLAG_MASK);
1216 }
1217
1218 /*
1219 * rb_is_head_page - test if the given page is the head page
1220 *
1221 * Because the reader may move the head_page pointer, we can
1222 * not trust what the head page is (it may be pointing to
1223 * the reader page). But if the next page is a header page,
1224 * its flags will be non zero.
1225 */
1226 static inline int
rb_is_head_page(struct buffer_page * page,struct list_head * list)1227 rb_is_head_page(struct buffer_page *page, struct list_head *list)
1228 {
1229 unsigned long val;
1230
1231 val = (unsigned long)list->next;
1232
1233 if ((val & ~RB_FLAG_MASK) != (unsigned long)&page->list)
1234 return RB_PAGE_MOVED;
1235
1236 return val & RB_FLAG_MASK;
1237 }
1238
1239 /*
1240 * rb_is_reader_page
1241 *
1242 * The unique thing about the reader page, is that, if the
1243 * writer is ever on it, the previous pointer never points
1244 * back to the reader page.
1245 */
rb_is_reader_page(struct buffer_page * page)1246 static bool rb_is_reader_page(struct buffer_page *page)
1247 {
1248 struct list_head *list = page->list.prev;
1249
1250 return rb_list_head(list->next) != &page->list;
1251 }
1252
1253 /*
1254 * rb_set_list_to_head - set a list_head to be pointing to head.
1255 */
rb_set_list_to_head(struct list_head * list)1256 static void rb_set_list_to_head(struct list_head *list)
1257 {
1258 unsigned long *ptr;
1259
1260 ptr = (unsigned long *)&list->next;
1261 *ptr |= RB_PAGE_HEAD;
1262 *ptr &= ~RB_PAGE_UPDATE;
1263 }
1264
1265 /*
1266 * rb_head_page_activate - sets up head page
1267 */
rb_head_page_activate(struct ring_buffer_per_cpu * cpu_buffer)1268 static void rb_head_page_activate(struct ring_buffer_per_cpu *cpu_buffer)
1269 {
1270 struct buffer_page *head;
1271
1272 head = cpu_buffer->head_page;
1273 if (!head)
1274 return;
1275
1276 /*
1277 * Set the previous list pointer to have the HEAD flag.
1278 */
1279 rb_set_list_to_head(head->list.prev);
1280 }
1281
rb_list_head_clear(struct list_head * list)1282 static void rb_list_head_clear(struct list_head *list)
1283 {
1284 unsigned long *ptr = (unsigned long *)&list->next;
1285
1286 *ptr &= ~RB_FLAG_MASK;
1287 }
1288
1289 /*
1290 * rb_head_page_deactivate - clears head page ptr (for free list)
1291 */
1292 static void
rb_head_page_deactivate(struct ring_buffer_per_cpu * cpu_buffer)1293 rb_head_page_deactivate(struct ring_buffer_per_cpu *cpu_buffer)
1294 {
1295 struct list_head *hd;
1296
1297 /* Go through the whole list and clear any pointers found. */
1298 rb_list_head_clear(cpu_buffer->pages);
1299
1300 list_for_each(hd, cpu_buffer->pages)
1301 rb_list_head_clear(hd);
1302 }
1303
rb_head_page_set(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag,int new_flag)1304 static int rb_head_page_set(struct ring_buffer_per_cpu *cpu_buffer,
1305 struct buffer_page *head,
1306 struct buffer_page *prev,
1307 int old_flag, int new_flag)
1308 {
1309 struct list_head *list;
1310 unsigned long val = (unsigned long)&head->list;
1311 unsigned long ret;
1312
1313 list = &prev->list;
1314
1315 val &= ~RB_FLAG_MASK;
1316
1317 ret = cmpxchg((unsigned long *)&list->next,
1318 val | old_flag, val | new_flag);
1319
1320 /* check if the reader took the page */
1321 if ((ret & ~RB_FLAG_MASK) != val)
1322 return RB_PAGE_MOVED;
1323
1324 return ret & RB_FLAG_MASK;
1325 }
1326
rb_head_page_set_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1327 static int rb_head_page_set_update(struct ring_buffer_per_cpu *cpu_buffer,
1328 struct buffer_page *head,
1329 struct buffer_page *prev,
1330 int old_flag)
1331 {
1332 return rb_head_page_set(cpu_buffer, head, prev,
1333 old_flag, RB_PAGE_UPDATE);
1334 }
1335
rb_head_page_set_head(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1336 static int rb_head_page_set_head(struct ring_buffer_per_cpu *cpu_buffer,
1337 struct buffer_page *head,
1338 struct buffer_page *prev,
1339 int old_flag)
1340 {
1341 return rb_head_page_set(cpu_buffer, head, prev,
1342 old_flag, RB_PAGE_HEAD);
1343 }
1344
rb_head_page_set_normal(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * head,struct buffer_page * prev,int old_flag)1345 static int rb_head_page_set_normal(struct ring_buffer_per_cpu *cpu_buffer,
1346 struct buffer_page *head,
1347 struct buffer_page *prev,
1348 int old_flag)
1349 {
1350 return rb_head_page_set(cpu_buffer, head, prev,
1351 old_flag, RB_PAGE_NORMAL);
1352 }
1353
rb_inc_page(struct buffer_page ** bpage)1354 static inline void rb_inc_page(struct buffer_page **bpage)
1355 {
1356 struct list_head *p = rb_list_head((*bpage)->list.next);
1357
1358 *bpage = list_entry(p, struct buffer_page, list);
1359 }
1360
1361 static struct buffer_page *
rb_set_head_page(struct ring_buffer_per_cpu * cpu_buffer)1362 rb_set_head_page(struct ring_buffer_per_cpu *cpu_buffer)
1363 {
1364 struct buffer_page *head;
1365 struct buffer_page *page;
1366 struct list_head *list;
1367 int i;
1368
1369 if (RB_WARN_ON(cpu_buffer, !cpu_buffer->head_page))
1370 return NULL;
1371
1372 /* sanity check */
1373 list = cpu_buffer->pages;
1374 if (RB_WARN_ON(cpu_buffer, rb_list_head(list->prev->next) != list))
1375 return NULL;
1376
1377 page = head = cpu_buffer->head_page;
1378 /*
1379 * It is possible that the writer moves the header behind
1380 * where we started, and we miss in one loop.
1381 * A second loop should grab the header, but we'll do
1382 * three loops just because I'm paranoid.
1383 */
1384 for (i = 0; i < 3; i++) {
1385 do {
1386 if (rb_is_head_page(page, page->list.prev)) {
1387 cpu_buffer->head_page = page;
1388 return page;
1389 }
1390 rb_inc_page(&page);
1391 } while (page != head);
1392 }
1393
1394 RB_WARN_ON(cpu_buffer, 1);
1395
1396 return NULL;
1397 }
1398
rb_head_page_replace(struct buffer_page * old,struct buffer_page * new)1399 static int rb_head_page_replace(struct buffer_page *old,
1400 struct buffer_page *new)
1401 {
1402 unsigned long *ptr = (unsigned long *)&old->list.prev->next;
1403 unsigned long val;
1404 unsigned long ret;
1405
1406 val = *ptr & ~RB_FLAG_MASK;
1407 val |= RB_PAGE_HEAD;
1408
1409 ret = cmpxchg(ptr, val, (unsigned long)&new->list);
1410
1411 return ret == val;
1412 }
1413
1414 /*
1415 * rb_tail_page_update - move the tail page forward
1416 */
rb_tail_page_update(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)1417 static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
1418 struct buffer_page *tail_page,
1419 struct buffer_page *next_page)
1420 {
1421 unsigned long old_entries;
1422 unsigned long old_write;
1423
1424 /*
1425 * The tail page now needs to be moved forward.
1426 *
1427 * We need to reset the tail page, but without messing
1428 * with possible erasing of data brought in by interrupts
1429 * that have moved the tail page and are currently on it.
1430 *
1431 * We add a counter to the write field to denote this.
1432 */
1433 old_write = local_add_return(RB_WRITE_INTCNT, &next_page->write);
1434 old_entries = local_add_return(RB_WRITE_INTCNT, &next_page->entries);
1435
1436 local_inc(&cpu_buffer->pages_touched);
1437 /*
1438 * Just make sure we have seen our old_write and synchronize
1439 * with any interrupts that come in.
1440 */
1441 barrier();
1442
1443 /*
1444 * If the tail page is still the same as what we think
1445 * it is, then it is up to us to update the tail
1446 * pointer.
1447 */
1448 if (tail_page == READ_ONCE(cpu_buffer->tail_page)) {
1449 /* Zero the write counter */
1450 unsigned long val = old_write & ~RB_WRITE_MASK;
1451 unsigned long eval = old_entries & ~RB_WRITE_MASK;
1452
1453 /*
1454 * This will only succeed if an interrupt did
1455 * not come in and change it. In which case, we
1456 * do not want to modify it.
1457 *
1458 * We add (void) to let the compiler know that we do not care
1459 * about the return value of these functions. We use the
1460 * cmpxchg to only update if an interrupt did not already
1461 * do it for us. If the cmpxchg fails, we don't care.
1462 */
1463 (void)local_cmpxchg(&next_page->write, old_write, val);
1464 (void)local_cmpxchg(&next_page->entries, old_entries, eval);
1465
1466 /*
1467 * No need to worry about races with clearing out the commit.
1468 * it only can increment when a commit takes place. But that
1469 * only happens in the outer most nested commit.
1470 */
1471 local_set(&next_page->page->commit, 0);
1472
1473 /* Again, either we update tail_page or an interrupt does */
1474 (void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
1475 }
1476 }
1477
rb_check_bpage(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * bpage)1478 static int rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
1479 struct buffer_page *bpage)
1480 {
1481 unsigned long val = (unsigned long)bpage;
1482
1483 if (RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK))
1484 return 1;
1485
1486 return 0;
1487 }
1488
1489 /**
1490 * rb_check_pages - integrity check of buffer pages
1491 * @cpu_buffer: CPU buffer with pages to test
1492 *
1493 * As a safety measure we check to make sure the data pages have not
1494 * been corrupted.
1495 */
rb_check_pages(struct ring_buffer_per_cpu * cpu_buffer)1496 static int rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
1497 {
1498 struct list_head *head = rb_list_head(cpu_buffer->pages);
1499 struct list_head *tmp;
1500
1501 if (RB_WARN_ON(cpu_buffer,
1502 rb_list_head(rb_list_head(head->next)->prev) != head))
1503 return -1;
1504
1505 if (RB_WARN_ON(cpu_buffer,
1506 rb_list_head(rb_list_head(head->prev)->next) != head))
1507 return -1;
1508
1509 for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
1510 if (RB_WARN_ON(cpu_buffer,
1511 rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
1512 return -1;
1513
1514 if (RB_WARN_ON(cpu_buffer,
1515 rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
1516 return -1;
1517 }
1518
1519 return 0;
1520 }
1521
__rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,long nr_pages,struct list_head * pages)1522 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1523 long nr_pages, struct list_head *pages)
1524 {
1525 struct buffer_page *bpage, *tmp;
1526 bool user_thread = current->mm != NULL;
1527 gfp_t mflags;
1528 long i;
1529
1530 /*
1531 * Check if the available memory is there first.
1532 * Note, si_mem_available() only gives us a rough estimate of available
1533 * memory. It may not be accurate. But we don't care, we just want
1534 * to prevent doing any allocation when it is obvious that it is
1535 * not going to succeed.
1536 */
1537 i = si_mem_available();
1538 if (i < nr_pages)
1539 return -ENOMEM;
1540
1541 /*
1542 * __GFP_RETRY_MAYFAIL flag makes sure that the allocation fails
1543 * gracefully without invoking oom-killer and the system is not
1544 * destabilized.
1545 */
1546 mflags = GFP_KERNEL | __GFP_RETRY_MAYFAIL;
1547
1548 /*
1549 * If a user thread allocates too much, and si_mem_available()
1550 * reports there's enough memory, even though there is not.
1551 * Make sure the OOM killer kills this thread. This can happen
1552 * even with RETRY_MAYFAIL because another task may be doing
1553 * an allocation after this task has taken all memory.
1554 * This is the task the OOM killer needs to take out during this
1555 * loop, even if it was triggered by an allocation somewhere else.
1556 */
1557 if (user_thread)
1558 set_current_oom_origin();
1559 for (i = 0; i < nr_pages; i++) {
1560 struct page *page;
1561
1562 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1563 mflags, cpu_to_node(cpu_buffer->cpu));
1564 if (!bpage)
1565 goto free_pages;
1566
1567 rb_check_bpage(cpu_buffer, bpage);
1568
1569 list_add(&bpage->list, pages);
1570
1571 page = alloc_pages_node(cpu_to_node(cpu_buffer->cpu), mflags, 0);
1572 if (!page)
1573 goto free_pages;
1574 bpage->page = page_address(page);
1575 rb_init_page(bpage->page);
1576
1577 if (user_thread && fatal_signal_pending(current))
1578 goto free_pages;
1579 }
1580 if (user_thread)
1581 clear_current_oom_origin();
1582
1583 return 0;
1584
1585 free_pages:
1586 list_for_each_entry_safe(bpage, tmp, pages, list) {
1587 list_del_init(&bpage->list);
1588 free_buffer_page(bpage);
1589 }
1590 if (user_thread)
1591 clear_current_oom_origin();
1592
1593 return -ENOMEM;
1594 }
1595
rb_allocate_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1596 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
1597 unsigned long nr_pages)
1598 {
1599 LIST_HEAD(pages);
1600
1601 WARN_ON(!nr_pages);
1602
1603 if (__rb_allocate_pages(cpu_buffer, nr_pages, &pages))
1604 return -ENOMEM;
1605
1606 /*
1607 * The ring buffer page list is a circular list that does not
1608 * start and end with a list head. All page list items point to
1609 * other pages.
1610 */
1611 cpu_buffer->pages = pages.next;
1612 list_del(&pages);
1613
1614 cpu_buffer->nr_pages = nr_pages;
1615
1616 rb_check_pages(cpu_buffer);
1617
1618 return 0;
1619 }
1620
1621 static struct ring_buffer_per_cpu *
rb_allocate_cpu_buffer(struct trace_buffer * buffer,long nr_pages,int cpu)1622 rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
1623 {
1624 struct ring_buffer_per_cpu *cpu_buffer;
1625 struct buffer_page *bpage;
1626 struct page *page;
1627 int ret;
1628
1629 cpu_buffer = kzalloc_node(ALIGN(sizeof(*cpu_buffer), cache_line_size()),
1630 GFP_KERNEL, cpu_to_node(cpu));
1631 if (!cpu_buffer)
1632 return NULL;
1633
1634 cpu_buffer->cpu = cpu;
1635 cpu_buffer->buffer = buffer;
1636 raw_spin_lock_init(&cpu_buffer->reader_lock);
1637 lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
1638 cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
1639 INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
1640 init_completion(&cpu_buffer->update_done);
1641 init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
1642 init_waitqueue_head(&cpu_buffer->irq_work.waiters);
1643 init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
1644
1645 bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
1646 GFP_KERNEL, cpu_to_node(cpu));
1647 if (!bpage)
1648 goto fail_free_buffer;
1649
1650 rb_check_bpage(cpu_buffer, bpage);
1651
1652 cpu_buffer->reader_page = bpage;
1653 page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL, 0);
1654 if (!page)
1655 goto fail_free_reader;
1656 bpage->page = page_address(page);
1657 rb_init_page(bpage->page);
1658
1659 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
1660 INIT_LIST_HEAD(&cpu_buffer->new_pages);
1661
1662 ret = rb_allocate_pages(cpu_buffer, nr_pages);
1663 if (ret < 0)
1664 goto fail_free_reader;
1665
1666 cpu_buffer->head_page
1667 = list_entry(cpu_buffer->pages, struct buffer_page, list);
1668 cpu_buffer->tail_page = cpu_buffer->commit_page = cpu_buffer->head_page;
1669
1670 rb_head_page_activate(cpu_buffer);
1671
1672 return cpu_buffer;
1673
1674 fail_free_reader:
1675 free_buffer_page(cpu_buffer->reader_page);
1676
1677 fail_free_buffer:
1678 kfree(cpu_buffer);
1679 return NULL;
1680 }
1681
rb_free_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)1682 static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
1683 {
1684 struct list_head *head = cpu_buffer->pages;
1685 struct buffer_page *bpage, *tmp;
1686
1687 irq_work_sync(&cpu_buffer->irq_work.work);
1688
1689 free_buffer_page(cpu_buffer->reader_page);
1690
1691 if (head) {
1692 rb_head_page_deactivate(cpu_buffer);
1693
1694 list_for_each_entry_safe(bpage, tmp, head, list) {
1695 list_del_init(&bpage->list);
1696 free_buffer_page(bpage);
1697 }
1698 bpage = list_entry(head, struct buffer_page, list);
1699 free_buffer_page(bpage);
1700 }
1701
1702 free_page((unsigned long)cpu_buffer->free_page);
1703
1704 kfree(cpu_buffer);
1705 }
1706
1707 /**
1708 * __ring_buffer_alloc - allocate a new ring_buffer
1709 * @size: the size in bytes per cpu that is needed.
1710 * @flags: attributes to set for the ring buffer.
1711 * @key: ring buffer reader_lock_key.
1712 *
1713 * Currently the only flag that is available is the RB_FL_OVERWRITE
1714 * flag. This flag means that the buffer will overwrite old data
1715 * when the buffer wraps. If this flag is not set, the buffer will
1716 * drop data when the tail hits the head.
1717 */
__ring_buffer_alloc(unsigned long size,unsigned flags,struct lock_class_key * key)1718 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
1719 struct lock_class_key *key)
1720 {
1721 struct trace_buffer *buffer;
1722 long nr_pages;
1723 int bsize;
1724 int cpu;
1725 int ret;
1726
1727 /* keep it in its own cache line */
1728 buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
1729 GFP_KERNEL);
1730 if (!buffer)
1731 return NULL;
1732
1733 if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
1734 goto fail_free_buffer;
1735
1736 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
1737 buffer->flags = flags;
1738 buffer->clock = trace_clock_local;
1739 buffer->reader_lock_key = key;
1740
1741 init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
1742 init_waitqueue_head(&buffer->irq_work.waiters);
1743
1744 /* need at least two pages */
1745 if (nr_pages < 2)
1746 nr_pages = 2;
1747
1748 buffer->cpus = nr_cpu_ids;
1749
1750 bsize = sizeof(void *) * nr_cpu_ids;
1751 buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
1752 GFP_KERNEL);
1753 if (!buffer->buffers)
1754 goto fail_free_cpumask;
1755
1756 cpu = raw_smp_processor_id();
1757 cpumask_set_cpu(cpu, buffer->cpumask);
1758 buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
1759 if (!buffer->buffers[cpu])
1760 goto fail_free_buffers;
1761
1762 ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
1763 if (ret < 0)
1764 goto fail_free_buffers;
1765
1766 mutex_init(&buffer->mutex);
1767
1768 return buffer;
1769
1770 fail_free_buffers:
1771 for_each_buffer_cpu(buffer, cpu) {
1772 if (buffer->buffers[cpu])
1773 rb_free_cpu_buffer(buffer->buffers[cpu]);
1774 }
1775 kfree(buffer->buffers);
1776
1777 fail_free_cpumask:
1778 free_cpumask_var(buffer->cpumask);
1779
1780 fail_free_buffer:
1781 kfree(buffer);
1782 return NULL;
1783 }
1784 EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
1785
ring_buffer_alloc_ext(unsigned long size,struct ring_buffer_ext_cb * cb)1786 struct trace_buffer *ring_buffer_alloc_ext(unsigned long size,
1787 struct ring_buffer_ext_cb *cb)
1788 {
1789 struct trace_buffer *buffer;
1790
1791 if (!cb || !cb->update_footers || !cb->swap_reader)
1792 return NULL;
1793
1794 buffer = ring_buffer_alloc(size, RB_FL_OVERWRITE);
1795 if (!buffer)
1796 return NULL;
1797
1798 WARN_ON(cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE,
1799 &buffer->node));
1800 buffer->ext_cb = cb;
1801 atomic_set(&buffer->record_disabled, 1);
1802
1803 return buffer;
1804 }
1805
1806 /**
1807 * ring_buffer_free - free a ring buffer.
1808 * @buffer: the buffer to free.
1809 */
1810 void
ring_buffer_free(struct trace_buffer * buffer)1811 ring_buffer_free(struct trace_buffer *buffer)
1812 {
1813 int cpu;
1814
1815 if (!has_ext_writer(buffer))
1816 cpuhp_state_remove_instance(CPUHP_TRACE_RB_PREPARE,
1817 &buffer->node);
1818
1819 irq_work_sync(&buffer->irq_work.work);
1820
1821 for_each_buffer_cpu(buffer, cpu)
1822 rb_free_cpu_buffer(buffer->buffers[cpu]);
1823
1824 kfree(buffer->buffers);
1825 free_cpumask_var(buffer->cpumask);
1826
1827 kfree(buffer);
1828 }
1829 EXPORT_SYMBOL_GPL(ring_buffer_free);
1830
ring_buffer_set_clock(struct trace_buffer * buffer,u64 (* clock)(void))1831 void ring_buffer_set_clock(struct trace_buffer *buffer,
1832 u64 (*clock)(void))
1833 {
1834 buffer->clock = clock;
1835 }
1836
ring_buffer_set_time_stamp_abs(struct trace_buffer * buffer,bool abs)1837 void ring_buffer_set_time_stamp_abs(struct trace_buffer *buffer, bool abs)
1838 {
1839 buffer->time_stamp_abs = abs;
1840 }
1841
ring_buffer_time_stamp_abs(struct trace_buffer * buffer)1842 bool ring_buffer_time_stamp_abs(struct trace_buffer *buffer)
1843 {
1844 return buffer->time_stamp_abs;
1845 }
1846
1847 static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
1848
rb_page_entries(struct buffer_page * bpage)1849 static inline unsigned long rb_page_entries(struct buffer_page *bpage)
1850 {
1851 return local_read(&bpage->entries) & RB_WRITE_MASK;
1852 }
1853
rb_page_write(struct buffer_page * bpage)1854 static inline unsigned long rb_page_write(struct buffer_page *bpage)
1855 {
1856 return local_read(&bpage->write) & RB_WRITE_MASK;
1857 }
1858
1859 static int
rb_remove_pages(struct ring_buffer_per_cpu * cpu_buffer,unsigned long nr_pages)1860 rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
1861 {
1862 struct list_head *tail_page, *to_remove, *next_page;
1863 struct buffer_page *to_remove_page, *tmp_iter_page;
1864 struct buffer_page *last_page, *first_page;
1865 unsigned long nr_removed;
1866 unsigned long head_bit;
1867 int page_entries;
1868
1869 head_bit = 0;
1870
1871 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1872 atomic_inc(&cpu_buffer->record_disabled);
1873 /*
1874 * We don't race with the readers since we have acquired the reader
1875 * lock. We also don't race with writers after disabling recording.
1876 * This makes it easy to figure out the first and the last page to be
1877 * removed from the list. We unlink all the pages in between including
1878 * the first and last pages. This is done in a busy loop so that we
1879 * lose the least number of traces.
1880 * The pages are freed after we restart recording and unlock readers.
1881 */
1882 tail_page = &cpu_buffer->tail_page->list;
1883
1884 /*
1885 * tail page might be on reader page, we remove the next page
1886 * from the ring buffer
1887 */
1888 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
1889 tail_page = rb_list_head(tail_page->next);
1890 to_remove = tail_page;
1891
1892 /* start of pages to remove */
1893 first_page = list_entry(rb_list_head(to_remove->next),
1894 struct buffer_page, list);
1895
1896 for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
1897 to_remove = rb_list_head(to_remove)->next;
1898 head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
1899 }
1900
1901 next_page = rb_list_head(to_remove)->next;
1902
1903 /*
1904 * Now we remove all pages between tail_page and next_page.
1905 * Make sure that we have head_bit value preserved for the
1906 * next page
1907 */
1908 tail_page->next = (struct list_head *)((unsigned long)next_page |
1909 head_bit);
1910 next_page = rb_list_head(next_page);
1911 next_page->prev = tail_page;
1912
1913 /* make sure pages points to a valid page in the ring buffer */
1914 cpu_buffer->pages = next_page;
1915
1916 /* update head page */
1917 if (head_bit)
1918 cpu_buffer->head_page = list_entry(next_page,
1919 struct buffer_page, list);
1920
1921 /*
1922 * change read pointer to make sure any read iterators reset
1923 * themselves
1924 */
1925 cpu_buffer->read = 0;
1926
1927 /* pages are removed, resume tracing and then free the pages */
1928 atomic_dec(&cpu_buffer->record_disabled);
1929 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
1930
1931 RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
1932
1933 /* last buffer page to remove */
1934 last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
1935 list);
1936 tmp_iter_page = first_page;
1937
1938 do {
1939 cond_resched();
1940
1941 to_remove_page = tmp_iter_page;
1942 rb_inc_page(&tmp_iter_page);
1943
1944 /* update the counters */
1945 page_entries = rb_page_entries(to_remove_page);
1946 if (page_entries) {
1947 /*
1948 * If something was added to this page, it was full
1949 * since it is not the tail page. So we deduct the
1950 * bytes consumed in ring buffer from here.
1951 * Increment overrun to account for the lost events.
1952 */
1953 local_add(page_entries, &cpu_buffer->overrun);
1954 local_sub(rb_page_commit(to_remove_page), &cpu_buffer->entries_bytes);
1955 local_inc(&cpu_buffer->pages_lost);
1956 }
1957
1958 /*
1959 * We have already removed references to this list item, just
1960 * free up the buffer_page and its page
1961 */
1962 free_buffer_page(to_remove_page);
1963 nr_removed--;
1964
1965 } while (to_remove_page != last_page);
1966
1967 RB_WARN_ON(cpu_buffer, nr_removed);
1968
1969 return nr_removed == 0;
1970 }
1971
1972 static int
rb_insert_pages(struct ring_buffer_per_cpu * cpu_buffer)1973 rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
1974 {
1975 struct list_head *pages = &cpu_buffer->new_pages;
1976 int retries, success;
1977
1978 raw_spin_lock_irq(&cpu_buffer->reader_lock);
1979 /*
1980 * We are holding the reader lock, so the reader page won't be swapped
1981 * in the ring buffer. Now we are racing with the writer trying to
1982 * move head page and the tail page.
1983 * We are going to adapt the reader page update process where:
1984 * 1. We first splice the start and end of list of new pages between
1985 * the head page and its previous page.
1986 * 2. We cmpxchg the prev_page->next to point from head page to the
1987 * start of new pages list.
1988 * 3. Finally, we update the head->prev to the end of new list.
1989 *
1990 * We will try this process 10 times, to make sure that we don't keep
1991 * spinning.
1992 */
1993 retries = 10;
1994 success = 0;
1995 while (retries--) {
1996 struct list_head *head_page, *prev_page, *r;
1997 struct list_head *last_page, *first_page;
1998 struct list_head *head_page_with_bit;
1999
2000 head_page = &rb_set_head_page(cpu_buffer)->list;
2001 if (!head_page)
2002 break;
2003 prev_page = head_page->prev;
2004
2005 first_page = pages->next;
2006 last_page = pages->prev;
2007
2008 head_page_with_bit = (struct list_head *)
2009 ((unsigned long)head_page | RB_PAGE_HEAD);
2010
2011 last_page->next = head_page_with_bit;
2012 first_page->prev = prev_page;
2013
2014 r = cmpxchg(&prev_page->next, head_page_with_bit, first_page);
2015
2016 if (r == head_page_with_bit) {
2017 /*
2018 * yay, we replaced the page pointer to our new list,
2019 * now, we just have to update to head page's prev
2020 * pointer to point to end of list
2021 */
2022 head_page->prev = last_page;
2023 success = 1;
2024 break;
2025 }
2026 }
2027
2028 if (success)
2029 INIT_LIST_HEAD(pages);
2030 /*
2031 * If we weren't successful in adding in new pages, warn and stop
2032 * tracing
2033 */
2034 RB_WARN_ON(cpu_buffer, !success);
2035 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
2036
2037 /* free pages if they weren't inserted */
2038 if (!success) {
2039 struct buffer_page *bpage, *tmp;
2040 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2041 list) {
2042 list_del_init(&bpage->list);
2043 free_buffer_page(bpage);
2044 }
2045 }
2046 return success;
2047 }
2048
rb_update_pages(struct ring_buffer_per_cpu * cpu_buffer)2049 static void rb_update_pages(struct ring_buffer_per_cpu *cpu_buffer)
2050 {
2051 int success;
2052
2053 if (cpu_buffer->nr_pages_to_update > 0)
2054 success = rb_insert_pages(cpu_buffer);
2055 else
2056 success = rb_remove_pages(cpu_buffer,
2057 -cpu_buffer->nr_pages_to_update);
2058
2059 if (success)
2060 cpu_buffer->nr_pages += cpu_buffer->nr_pages_to_update;
2061 }
2062
update_pages_handler(struct work_struct * work)2063 static void update_pages_handler(struct work_struct *work)
2064 {
2065 struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
2066 struct ring_buffer_per_cpu, update_pages_work);
2067 rb_update_pages(cpu_buffer);
2068 complete(&cpu_buffer->update_done);
2069 }
2070
2071 /**
2072 * ring_buffer_resize - resize the ring buffer
2073 * @buffer: the buffer to resize.
2074 * @size: the new size.
2075 * @cpu_id: the cpu buffer to resize
2076 *
2077 * Minimum size is 2 * BUF_PAGE_SIZE.
2078 *
2079 * Returns 0 on success and < 0 on failure.
2080 */
ring_buffer_resize(struct trace_buffer * buffer,unsigned long size,int cpu_id)2081 int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
2082 int cpu_id)
2083 {
2084 struct ring_buffer_per_cpu *cpu_buffer;
2085 unsigned long nr_pages;
2086 int cpu, err;
2087
2088 if (unlikely(has_ext_writer(buffer)))
2089 return -EINVAL;
2090 /*
2091 * Always succeed at resizing a non-existent buffer:
2092 */
2093 if (!buffer)
2094 return 0;
2095
2096 /* Make sure the requested buffer exists */
2097 if (cpu_id != RING_BUFFER_ALL_CPUS &&
2098 !cpumask_test_cpu(cpu_id, buffer->cpumask))
2099 return 0;
2100
2101 nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
2102
2103 /* we need a minimum of two pages */
2104 if (nr_pages < 2)
2105 nr_pages = 2;
2106
2107 /* prevent another thread from changing buffer sizes */
2108 mutex_lock(&buffer->mutex);
2109 atomic_inc(&buffer->resizing);
2110
2111 if (cpu_id == RING_BUFFER_ALL_CPUS) {
2112 /*
2113 * Don't succeed if resizing is disabled, as a reader might be
2114 * manipulating the ring buffer and is expecting a sane state while
2115 * this is true.
2116 */
2117 for_each_buffer_cpu(buffer, cpu) {
2118 cpu_buffer = buffer->buffers[cpu];
2119 if (atomic_read(&cpu_buffer->resize_disabled)) {
2120 err = -EBUSY;
2121 goto out_err_unlock;
2122 }
2123 }
2124
2125 /* calculate the pages to update */
2126 for_each_buffer_cpu(buffer, cpu) {
2127 cpu_buffer = buffer->buffers[cpu];
2128
2129 cpu_buffer->nr_pages_to_update = nr_pages -
2130 cpu_buffer->nr_pages;
2131 /*
2132 * nothing more to do for removing pages or no update
2133 */
2134 if (cpu_buffer->nr_pages_to_update <= 0)
2135 continue;
2136 /*
2137 * to add pages, make sure all new pages can be
2138 * allocated without receiving ENOMEM
2139 */
2140 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2141 if (__rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2142 &cpu_buffer->new_pages)) {
2143 /* not enough memory for new pages */
2144 err = -ENOMEM;
2145 goto out_err;
2146 }
2147
2148 cond_resched();
2149 }
2150
2151 cpus_read_lock();
2152 /*
2153 * Fire off all the required work handlers
2154 * We can't schedule on offline CPUs, but it's not necessary
2155 * since we can change their buffer sizes without any race.
2156 */
2157 for_each_buffer_cpu(buffer, cpu) {
2158 cpu_buffer = buffer->buffers[cpu];
2159 if (!cpu_buffer->nr_pages_to_update)
2160 continue;
2161
2162 /* Can't run something on an offline CPU. */
2163 if (!cpu_online(cpu)) {
2164 rb_update_pages(cpu_buffer);
2165 cpu_buffer->nr_pages_to_update = 0;
2166 } else {
2167 schedule_work_on(cpu,
2168 &cpu_buffer->update_pages_work);
2169 }
2170 }
2171
2172 /* wait for all the updates to complete */
2173 for_each_buffer_cpu(buffer, cpu) {
2174 cpu_buffer = buffer->buffers[cpu];
2175 if (!cpu_buffer->nr_pages_to_update)
2176 continue;
2177
2178 if (cpu_online(cpu))
2179 wait_for_completion(&cpu_buffer->update_done);
2180 cpu_buffer->nr_pages_to_update = 0;
2181 }
2182
2183 cpus_read_unlock();
2184 } else {
2185 cpu_buffer = buffer->buffers[cpu_id];
2186
2187 if (nr_pages == cpu_buffer->nr_pages)
2188 goto out;
2189
2190 /*
2191 * Don't succeed if resizing is disabled, as a reader might be
2192 * manipulating the ring buffer and is expecting a sane state while
2193 * this is true.
2194 */
2195 if (atomic_read(&cpu_buffer->resize_disabled)) {
2196 err = -EBUSY;
2197 goto out_err_unlock;
2198 }
2199
2200 cpu_buffer->nr_pages_to_update = nr_pages -
2201 cpu_buffer->nr_pages;
2202
2203 INIT_LIST_HEAD(&cpu_buffer->new_pages);
2204 if (cpu_buffer->nr_pages_to_update > 0 &&
2205 __rb_allocate_pages(cpu_buffer, cpu_buffer->nr_pages_to_update,
2206 &cpu_buffer->new_pages)) {
2207 err = -ENOMEM;
2208 goto out_err;
2209 }
2210
2211 cpus_read_lock();
2212
2213 /* Can't run something on an offline CPU. */
2214 if (!cpu_online(cpu_id))
2215 rb_update_pages(cpu_buffer);
2216 else {
2217 schedule_work_on(cpu_id,
2218 &cpu_buffer->update_pages_work);
2219 wait_for_completion(&cpu_buffer->update_done);
2220 }
2221
2222 cpu_buffer->nr_pages_to_update = 0;
2223 cpus_read_unlock();
2224 }
2225
2226 out:
2227 /*
2228 * The ring buffer resize can happen with the ring buffer
2229 * enabled, so that the update disturbs the tracing as little
2230 * as possible. But if the buffer is disabled, we do not need
2231 * to worry about that, and we can take the time to verify
2232 * that the buffer is not corrupt.
2233 */
2234 if (atomic_read(&buffer->record_disabled)) {
2235 atomic_inc(&buffer->record_disabled);
2236 /*
2237 * Even though the buffer was disabled, we must make sure
2238 * that it is truly disabled before calling rb_check_pages.
2239 * There could have been a race between checking
2240 * record_disable and incrementing it.
2241 */
2242 synchronize_rcu();
2243 for_each_buffer_cpu(buffer, cpu) {
2244 cpu_buffer = buffer->buffers[cpu];
2245 rb_check_pages(cpu_buffer);
2246 }
2247 atomic_dec(&buffer->record_disabled);
2248 }
2249
2250 atomic_dec(&buffer->resizing);
2251 mutex_unlock(&buffer->mutex);
2252 return 0;
2253
2254 out_err:
2255 for_each_buffer_cpu(buffer, cpu) {
2256 struct buffer_page *bpage, *tmp;
2257
2258 cpu_buffer = buffer->buffers[cpu];
2259 cpu_buffer->nr_pages_to_update = 0;
2260
2261 if (list_empty(&cpu_buffer->new_pages))
2262 continue;
2263
2264 list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
2265 list) {
2266 list_del_init(&bpage->list);
2267 free_buffer_page(bpage);
2268 }
2269 }
2270 out_err_unlock:
2271 atomic_dec(&buffer->resizing);
2272 mutex_unlock(&buffer->mutex);
2273 return err;
2274 }
2275 EXPORT_SYMBOL_GPL(ring_buffer_resize);
2276
ring_buffer_change_overwrite(struct trace_buffer * buffer,int val)2277 void ring_buffer_change_overwrite(struct trace_buffer *buffer, int val)
2278 {
2279 mutex_lock(&buffer->mutex);
2280 if (val)
2281 buffer->flags |= RB_FL_OVERWRITE;
2282 else
2283 buffer->flags &= ~RB_FL_OVERWRITE;
2284 mutex_unlock(&buffer->mutex);
2285 }
2286 EXPORT_SYMBOL_GPL(ring_buffer_change_overwrite);
2287
__rb_page_index(struct buffer_page * bpage,unsigned index)2288 static __always_inline void *__rb_page_index(struct buffer_page *bpage, unsigned index)
2289 {
2290 return bpage->page->data + index;
2291 }
2292
2293 static __always_inline struct ring_buffer_event *
rb_reader_event(struct ring_buffer_per_cpu * cpu_buffer)2294 rb_reader_event(struct ring_buffer_per_cpu *cpu_buffer)
2295 {
2296 return __rb_page_index(cpu_buffer->reader_page,
2297 cpu_buffer->reader_page->read);
2298 }
2299
2300 static struct ring_buffer_event *
rb_iter_head_event(struct ring_buffer_iter * iter)2301 rb_iter_head_event(struct ring_buffer_iter *iter)
2302 {
2303 struct ring_buffer_event *event;
2304 struct buffer_page *iter_head_page = iter->head_page;
2305 unsigned long commit;
2306 unsigned length;
2307
2308 if (iter->head != iter->next_event)
2309 return iter->event;
2310
2311 /*
2312 * When the writer goes across pages, it issues a cmpxchg which
2313 * is a mb(), which will synchronize with the rmb here.
2314 * (see rb_tail_page_update() and __rb_reserve_next())
2315 */
2316 commit = rb_page_commit(iter_head_page);
2317 smp_rmb();
2318
2319 /* An event needs to be at least 8 bytes in size */
2320 if (iter->head > commit - 8)
2321 goto reset;
2322
2323 event = __rb_page_index(iter_head_page, iter->head);
2324 length = rb_event_length(event);
2325
2326 /*
2327 * READ_ONCE() doesn't work on functions and we don't want the
2328 * compiler doing any crazy optimizations with length.
2329 */
2330 barrier();
2331
2332 if ((iter->head + length) > commit || length > BUF_PAGE_SIZE)
2333 /* Writer corrupted the read? */
2334 goto reset;
2335
2336 memcpy(iter->event, event, length);
2337 /*
2338 * If the page stamp is still the same after this rmb() then the
2339 * event was safely copied without the writer entering the page.
2340 */
2341 smp_rmb();
2342
2343 /* Make sure the page didn't change since we read this */
2344 if (iter->page_stamp != iter_head_page->page->time_stamp ||
2345 commit > rb_page_commit(iter_head_page))
2346 goto reset;
2347
2348 iter->next_event = iter->head + length;
2349 return iter->event;
2350 reset:
2351 /* Reset to the beginning */
2352 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2353 iter->head = 0;
2354 iter->next_event = 0;
2355 iter->missed_events = 1;
2356 return NULL;
2357 }
2358
2359 /* Size is determined by what has been committed */
rb_page_size(struct buffer_page * bpage)2360 static __always_inline unsigned rb_page_size(struct buffer_page *bpage)
2361 {
2362 return rb_page_commit(bpage);
2363 }
2364
2365 static __always_inline unsigned
rb_commit_index(struct ring_buffer_per_cpu * cpu_buffer)2366 rb_commit_index(struct ring_buffer_per_cpu *cpu_buffer)
2367 {
2368 return rb_page_commit(cpu_buffer->commit_page);
2369 }
2370
2371 static __always_inline unsigned
rb_event_index(struct ring_buffer_event * event)2372 rb_event_index(struct ring_buffer_event *event)
2373 {
2374 unsigned long addr = (unsigned long)event;
2375
2376 return (addr & ~PAGE_MASK) - BUF_PAGE_HDR_SIZE;
2377 }
2378
rb_inc_iter(struct ring_buffer_iter * iter)2379 static void rb_inc_iter(struct ring_buffer_iter *iter)
2380 {
2381 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
2382
2383 /*
2384 * The iterator could be on the reader page (it starts there).
2385 * But the head could have moved, since the reader was
2386 * found. Check for this case and assign the iterator
2387 * to the head page instead of next.
2388 */
2389 if (iter->head_page == cpu_buffer->reader_page)
2390 iter->head_page = rb_set_head_page(cpu_buffer);
2391 else
2392 rb_inc_page(&iter->head_page);
2393
2394 iter->page_stamp = iter->read_stamp = iter->head_page->page->time_stamp;
2395 iter->head = 0;
2396 iter->next_event = 0;
2397 }
2398
2399 /*
2400 * rb_handle_head_page - writer hit the head page
2401 *
2402 * Returns: +1 to retry page
2403 * 0 to continue
2404 * -1 on error
2405 */
2406 static int
rb_handle_head_page(struct ring_buffer_per_cpu * cpu_buffer,struct buffer_page * tail_page,struct buffer_page * next_page)2407 rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
2408 struct buffer_page *tail_page,
2409 struct buffer_page *next_page)
2410 {
2411 struct buffer_page *new_head;
2412 int entries;
2413 int type;
2414 int ret;
2415
2416 entries = rb_page_entries(next_page);
2417
2418 /*
2419 * The hard part is here. We need to move the head
2420 * forward, and protect against both readers on
2421 * other CPUs and writers coming in via interrupts.
2422 */
2423 type = rb_head_page_set_update(cpu_buffer, next_page, tail_page,
2424 RB_PAGE_HEAD);
2425
2426 /*
2427 * type can be one of four:
2428 * NORMAL - an interrupt already moved it for us
2429 * HEAD - we are the first to get here.
2430 * UPDATE - we are the interrupt interrupting
2431 * a current move.
2432 * MOVED - a reader on another CPU moved the next
2433 * pointer to its reader page. Give up
2434 * and try again.
2435 */
2436
2437 switch (type) {
2438 case RB_PAGE_HEAD:
2439 /*
2440 * We changed the head to UPDATE, thus
2441 * it is our responsibility to update
2442 * the counters.
2443 */
2444 local_add(entries, &cpu_buffer->overrun);
2445 local_sub(rb_page_commit(next_page), &cpu_buffer->entries_bytes);
2446 local_inc(&cpu_buffer->pages_lost);
2447
2448 /*
2449 * The entries will be zeroed out when we move the
2450 * tail page.
2451 */
2452
2453 /* still more to do */
2454 break;
2455
2456 case RB_PAGE_UPDATE:
2457 /*
2458 * This is an interrupt that interrupt the
2459 * previous update. Still more to do.
2460 */
2461 break;
2462 case RB_PAGE_NORMAL:
2463 /*
2464 * An interrupt came in before the update
2465 * and processed this for us.
2466 * Nothing left to do.
2467 */
2468 return 1;
2469 case RB_PAGE_MOVED:
2470 /*
2471 * The reader is on another CPU and just did
2472 * a swap with our next_page.
2473 * Try again.
2474 */
2475 return 1;
2476 default:
2477 RB_WARN_ON(cpu_buffer, 1); /* WTF??? */
2478 return -1;
2479 }
2480
2481 /*
2482 * Now that we are here, the old head pointer is
2483 * set to UPDATE. This will keep the reader from
2484 * swapping the head page with the reader page.
2485 * The reader (on another CPU) will spin till
2486 * we are finished.
2487 *
2488 * We just need to protect against interrupts
2489 * doing the job. We will set the next pointer
2490 * to HEAD. After that, we set the old pointer
2491 * to NORMAL, but only if it was HEAD before.
2492 * otherwise we are an interrupt, and only
2493 * want the outer most commit to reset it.
2494 */
2495 new_head = next_page;
2496 rb_inc_page(&new_head);
2497
2498 ret = rb_head_page_set_head(cpu_buffer, new_head, next_page,
2499 RB_PAGE_NORMAL);
2500
2501 /*
2502 * Valid returns are:
2503 * HEAD - an interrupt came in and already set it.
2504 * NORMAL - One of two things:
2505 * 1) We really set it.
2506 * 2) A bunch of interrupts came in and moved
2507 * the page forward again.
2508 */
2509 switch (ret) {
2510 case RB_PAGE_HEAD:
2511 case RB_PAGE_NORMAL:
2512 /* OK */
2513 break;
2514 default:
2515 RB_WARN_ON(cpu_buffer, 1);
2516 return -1;
2517 }
2518
2519 /*
2520 * It is possible that an interrupt came in,
2521 * set the head up, then more interrupts came in
2522 * and moved it again. When we get back here,
2523 * the page would have been set to NORMAL but we
2524 * just set it back to HEAD.
2525 *
2526 * How do you detect this? Well, if that happened
2527 * the tail page would have moved.
2528 */
2529 if (ret == RB_PAGE_NORMAL) {
2530 struct buffer_page *buffer_tail_page;
2531
2532 buffer_tail_page = READ_ONCE(cpu_buffer->tail_page);
2533 /*
2534 * If the tail had moved passed next, then we need
2535 * to reset the pointer.
2536 */
2537 if (buffer_tail_page != tail_page &&
2538 buffer_tail_page != next_page)
2539 rb_head_page_set_normal(cpu_buffer, new_head,
2540 next_page,
2541 RB_PAGE_HEAD);
2542 }
2543
2544 /*
2545 * If this was the outer most commit (the one that
2546 * changed the original pointer from HEAD to UPDATE),
2547 * then it is up to us to reset it to NORMAL.
2548 */
2549 if (type == RB_PAGE_HEAD) {
2550 ret = rb_head_page_set_normal(cpu_buffer, next_page,
2551 tail_page,
2552 RB_PAGE_UPDATE);
2553 if (RB_WARN_ON(cpu_buffer,
2554 ret != RB_PAGE_UPDATE))
2555 return -1;
2556 }
2557
2558 return 0;
2559 }
2560
2561 static inline void
rb_reset_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2562 rb_reset_tail(struct ring_buffer_per_cpu *cpu_buffer,
2563 unsigned long tail, struct rb_event_info *info)
2564 {
2565 struct buffer_page *tail_page = info->tail_page;
2566 struct ring_buffer_event *event;
2567 unsigned long length = info->length;
2568
2569 /*
2570 * Only the event that crossed the page boundary
2571 * must fill the old tail_page with padding.
2572 */
2573 if (tail >= BUF_PAGE_SIZE) {
2574 /*
2575 * If the page was filled, then we still need
2576 * to update the real_end. Reset it to zero
2577 * and the reader will ignore it.
2578 */
2579 if (tail == BUF_PAGE_SIZE)
2580 tail_page->real_end = 0;
2581
2582 local_sub(length, &tail_page->write);
2583 return;
2584 }
2585
2586 event = __rb_page_index(tail_page, tail);
2587
2588 /*
2589 * Save the original length to the meta data.
2590 * This will be used by the reader to add lost event
2591 * counter.
2592 */
2593 tail_page->real_end = tail;
2594
2595 /*
2596 * If this event is bigger than the minimum size, then
2597 * we need to be careful that we don't subtract the
2598 * write counter enough to allow another writer to slip
2599 * in on this page.
2600 * We put in a discarded commit instead, to make sure
2601 * that this space is not used again, and this space will
2602 * not be accounted into 'entries_bytes'.
2603 *
2604 * If we are less than the minimum size, we don't need to
2605 * worry about it.
2606 */
2607 if (tail > (BUF_PAGE_SIZE - RB_EVNT_MIN_SIZE)) {
2608 /* No room for any events */
2609
2610 /* Mark the rest of the page with padding */
2611 rb_event_set_padding(event);
2612
2613 /* Make sure the padding is visible before the write update */
2614 smp_wmb();
2615
2616 /* Set the write back to the previous setting */
2617 local_sub(length, &tail_page->write);
2618 return;
2619 }
2620
2621 /* Put in a discarded event */
2622 event->array[0] = (BUF_PAGE_SIZE - tail) - RB_EVNT_HDR_SIZE;
2623 event->type_len = RINGBUF_TYPE_PADDING;
2624 /* time delta must be non zero */
2625 event->time_delta = 1;
2626
2627 /* account for padding bytes */
2628 local_add(BUF_PAGE_SIZE - tail, &cpu_buffer->entries_bytes);
2629
2630 /* Make sure the padding is visible before the tail_page->write update */
2631 smp_wmb();
2632
2633 /* Set write to end of buffer */
2634 length = (tail + length) - BUF_PAGE_SIZE;
2635 local_sub(length, &tail_page->write);
2636 }
2637
2638 static inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer);
2639
2640 /*
2641 * This is the slow path, force gcc not to inline it.
2642 */
2643 static noinline struct ring_buffer_event *
rb_move_tail(struct ring_buffer_per_cpu * cpu_buffer,unsigned long tail,struct rb_event_info * info)2644 rb_move_tail(struct ring_buffer_per_cpu *cpu_buffer,
2645 unsigned long tail, struct rb_event_info *info)
2646 {
2647 struct buffer_page *tail_page = info->tail_page;
2648 struct buffer_page *commit_page = cpu_buffer->commit_page;
2649 struct trace_buffer *buffer = cpu_buffer->buffer;
2650 struct buffer_page *next_page;
2651 int ret;
2652
2653 next_page = tail_page;
2654
2655 rb_inc_page(&next_page);
2656
2657 /*
2658 * If for some reason, we had an interrupt storm that made
2659 * it all the way around the buffer, bail, and warn
2660 * about it.
2661 */
2662 if (unlikely(next_page == commit_page)) {
2663 local_inc(&cpu_buffer->commit_overrun);
2664 goto out_reset;
2665 }
2666
2667 /*
2668 * This is where the fun begins!
2669 *
2670 * We are fighting against races between a reader that
2671 * could be on another CPU trying to swap its reader
2672 * page with the buffer head.
2673 *
2674 * We are also fighting against interrupts coming in and
2675 * moving the head or tail on us as well.
2676 *
2677 * If the next page is the head page then we have filled
2678 * the buffer, unless the commit page is still on the
2679 * reader page.
2680 */
2681 if (rb_is_head_page(next_page, &tail_page->list)) {
2682
2683 /*
2684 * If the commit is not on the reader page, then
2685 * move the header page.
2686 */
2687 if (!rb_is_reader_page(cpu_buffer->commit_page)) {
2688 /*
2689 * If we are not in overwrite mode,
2690 * this is easy, just stop here.
2691 */
2692 if (!(buffer->flags & RB_FL_OVERWRITE)) {
2693 local_inc(&cpu_buffer->dropped_events);
2694 goto out_reset;
2695 }
2696
2697 ret = rb_handle_head_page(cpu_buffer,
2698 tail_page,
2699 next_page);
2700 if (ret < 0)
2701 goto out_reset;
2702 if (ret)
2703 goto out_again;
2704 } else {
2705 /*
2706 * We need to be careful here too. The
2707 * commit page could still be on the reader
2708 * page. We could have a small buffer, and
2709 * have filled up the buffer with events
2710 * from interrupts and such, and wrapped.
2711 *
2712 * Note, if the tail page is also on the
2713 * reader_page, we let it move out.
2714 */
2715 if (unlikely((cpu_buffer->commit_page !=
2716 cpu_buffer->tail_page) &&
2717 (cpu_buffer->commit_page ==
2718 cpu_buffer->reader_page))) {
2719 local_inc(&cpu_buffer->commit_overrun);
2720 goto out_reset;
2721 }
2722 }
2723 }
2724
2725 rb_tail_page_update(cpu_buffer, tail_page, next_page);
2726
2727 out_again:
2728
2729 rb_reset_tail(cpu_buffer, tail, info);
2730
2731 /* Commit what we have for now. */
2732 rb_end_commit(cpu_buffer);
2733 /* rb_end_commit() decs committing */
2734 local_inc(&cpu_buffer->committing);
2735
2736 /* fail and let the caller try again */
2737 return ERR_PTR(-EAGAIN);
2738
2739 out_reset:
2740 /* reset write */
2741 rb_reset_tail(cpu_buffer, tail, info);
2742
2743 return NULL;
2744 }
2745
2746 /* Slow path */
2747 static struct ring_buffer_event *
rb_add_time_stamp(struct ring_buffer_event * event,u64 delta,bool abs)2748 rb_add_time_stamp(struct ring_buffer_event *event, u64 delta, bool abs)
2749 {
2750 if (abs)
2751 event->type_len = RINGBUF_TYPE_TIME_STAMP;
2752 else
2753 event->type_len = RINGBUF_TYPE_TIME_EXTEND;
2754
2755 /* Not the first event on the page, or not delta? */
2756 if (abs || rb_event_index(event)) {
2757 event->time_delta = delta & TS_MASK;
2758 event->array[0] = delta >> TS_SHIFT;
2759 } else {
2760 /* nope, just zero it */
2761 event->time_delta = 0;
2762 event->array[0] = 0;
2763 }
2764
2765 return skip_time_extend(event);
2766 }
2767
2768 #ifndef CONFIG_HAVE_UNSTABLE_SCHED_CLOCK
sched_clock_stable(void)2769 static inline bool sched_clock_stable(void)
2770 {
2771 return true;
2772 }
2773 #endif
2774
2775 static void
rb_check_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)2776 rb_check_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2777 struct rb_event_info *info)
2778 {
2779 u64 write_stamp;
2780
2781 WARN_ONCE(1, "Delta way too big! %llu ts=%llu before=%llu after=%llu write stamp=%llu\n%s",
2782 (unsigned long long)info->delta,
2783 (unsigned long long)info->ts,
2784 (unsigned long long)info->before,
2785 (unsigned long long)info->after,
2786 (unsigned long long)(rb_time_read(&cpu_buffer->write_stamp, &write_stamp) ? write_stamp : 0),
2787 sched_clock_stable() ? "" :
2788 "If you just came from a suspend/resume,\n"
2789 "please switch to the trace global clock:\n"
2790 " echo global > /sys/kernel/debug/tracing/trace_clock\n"
2791 "or add trace_clock=global to the kernel command line\n");
2792 }
2793
rb_add_timestamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event ** event,struct rb_event_info * info,u64 * delta,unsigned int * length)2794 static void rb_add_timestamp(struct ring_buffer_per_cpu *cpu_buffer,
2795 struct ring_buffer_event **event,
2796 struct rb_event_info *info,
2797 u64 *delta,
2798 unsigned int *length)
2799 {
2800 bool abs = info->add_timestamp &
2801 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE);
2802
2803 if (unlikely(info->delta > (1ULL << 59))) {
2804 /* did the clock go backwards */
2805 if (info->before == info->after && info->before > info->ts) {
2806 /* not interrupted */
2807 static int once;
2808
2809 /*
2810 * This is possible with a recalibrating of the TSC.
2811 * Do not produce a call stack, but just report it.
2812 */
2813 if (!once) {
2814 once++;
2815 pr_warn("Ring buffer clock went backwards: %llu -> %llu\n",
2816 info->before, info->ts);
2817 }
2818 } else
2819 rb_check_timestamp(cpu_buffer, info);
2820 if (!abs)
2821 info->delta = 0;
2822 }
2823 *event = rb_add_time_stamp(*event, info->delta, abs);
2824 *length -= RB_LEN_TIME_EXTEND;
2825 *delta = 0;
2826 }
2827
2828 /**
2829 * rb_update_event - update event type and data
2830 * @cpu_buffer: The per cpu buffer of the @event
2831 * @event: the event to update
2832 * @info: The info to update the @event with (contains length and delta)
2833 *
2834 * Update the type and data fields of the @event. The length
2835 * is the actual size that is written to the ring buffer,
2836 * and with this, we can determine what to place into the
2837 * data field.
2838 */
2839 static void
rb_update_event(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event,struct rb_event_info * info)2840 rb_update_event(struct ring_buffer_per_cpu *cpu_buffer,
2841 struct ring_buffer_event *event,
2842 struct rb_event_info *info)
2843 {
2844 unsigned length = info->length;
2845 u64 delta = info->delta;
2846 unsigned int nest = local_read(&cpu_buffer->committing) - 1;
2847
2848 if (!WARN_ON_ONCE(nest >= MAX_NEST))
2849 cpu_buffer->event_stamp[nest] = info->ts;
2850
2851 /*
2852 * If we need to add a timestamp, then we
2853 * add it to the start of the reserved space.
2854 */
2855 if (unlikely(info->add_timestamp))
2856 rb_add_timestamp(cpu_buffer, &event, info, &delta, &length);
2857
2858 event->time_delta = delta;
2859 length -= RB_EVNT_HDR_SIZE;
2860 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT) {
2861 event->type_len = 0;
2862 event->array[0] = length;
2863 } else
2864 event->type_len = DIV_ROUND_UP(length, RB_ALIGNMENT);
2865 }
2866
rb_calculate_event_length(unsigned length)2867 static unsigned rb_calculate_event_length(unsigned length)
2868 {
2869 struct ring_buffer_event event; /* Used only for sizeof array */
2870
2871 /* zero length can cause confusions */
2872 if (!length)
2873 length++;
2874
2875 if (length > RB_MAX_SMALL_DATA || RB_FORCE_8BYTE_ALIGNMENT)
2876 length += sizeof(event.array[0]);
2877
2878 length += RB_EVNT_HDR_SIZE;
2879 length = ALIGN(length, RB_ARCH_ALIGNMENT);
2880
2881 /*
2882 * In case the time delta is larger than the 27 bits for it
2883 * in the header, we need to add a timestamp. If another
2884 * event comes in when trying to discard this one to increase
2885 * the length, then the timestamp will be added in the allocated
2886 * space of this event. If length is bigger than the size needed
2887 * for the TIME_EXTEND, then padding has to be used. The events
2888 * length must be either RB_LEN_TIME_EXTEND, or greater than or equal
2889 * to RB_LEN_TIME_EXTEND + 8, as 8 is the minimum size for padding.
2890 * As length is a multiple of 4, we only need to worry if it
2891 * is 12 (RB_LEN_TIME_EXTEND + 4).
2892 */
2893 if (length == RB_LEN_TIME_EXTEND + RB_ALIGNMENT)
2894 length += RB_ALIGNMENT;
2895
2896 return length;
2897 }
2898
2899 static inline int
rb_try_to_discard(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)2900 rb_try_to_discard(struct ring_buffer_per_cpu *cpu_buffer,
2901 struct ring_buffer_event *event)
2902 {
2903 unsigned long new_index, old_index;
2904 struct buffer_page *bpage;
2905 unsigned long index;
2906 unsigned long addr;
2907
2908 new_index = rb_event_index(event);
2909 old_index = new_index + rb_event_ts_length(event);
2910 addr = (unsigned long)event;
2911 addr &= PAGE_MASK;
2912
2913 bpage = READ_ONCE(cpu_buffer->tail_page);
2914
2915 /*
2916 * Make sure the tail_page is still the same and
2917 * the next write location is the end of this event
2918 */
2919 if (bpage->page == (void *)addr && rb_page_write(bpage) == old_index) {
2920 unsigned long write_mask =
2921 local_read(&bpage->write) & ~RB_WRITE_MASK;
2922 unsigned long event_length = rb_event_length(event);
2923
2924 /*
2925 * For the before_stamp to be different than the write_stamp
2926 * to make sure that the next event adds an absolute
2927 * value and does not rely on the saved write stamp, which
2928 * is now going to be bogus.
2929 *
2930 * By setting the before_stamp to zero, the next event
2931 * is not going to use the write_stamp and will instead
2932 * create an absolute timestamp. This means there's no
2933 * reason to update the wirte_stamp!
2934 */
2935 rb_time_set(&cpu_buffer->before_stamp, 0);
2936
2937 /*
2938 * If an event were to come in now, it would see that the
2939 * write_stamp and the before_stamp are different, and assume
2940 * that this event just added itself before updating
2941 * the write stamp. The interrupting event will fix the
2942 * write stamp for us, and use an absolute timestamp.
2943 */
2944
2945 /*
2946 * This is on the tail page. It is possible that
2947 * a write could come in and move the tail page
2948 * and write to the next page. That is fine
2949 * because we just shorten what is on this page.
2950 */
2951 old_index += write_mask;
2952 new_index += write_mask;
2953 index = local_cmpxchg(&bpage->write, old_index, new_index);
2954 if (index == old_index) {
2955 /* update counters */
2956 local_sub(event_length, &cpu_buffer->entries_bytes);
2957 return 1;
2958 }
2959 }
2960
2961 /* could not discard */
2962 return 0;
2963 }
2964
rb_start_commit(struct ring_buffer_per_cpu * cpu_buffer)2965 static void rb_start_commit(struct ring_buffer_per_cpu *cpu_buffer)
2966 {
2967 local_inc(&cpu_buffer->committing);
2968 local_inc(&cpu_buffer->commits);
2969 }
2970
2971 static __always_inline void
rb_set_commit_to_write(struct ring_buffer_per_cpu * cpu_buffer)2972 rb_set_commit_to_write(struct ring_buffer_per_cpu *cpu_buffer)
2973 {
2974 unsigned long max_count;
2975
2976 /*
2977 * We only race with interrupts and NMIs on this CPU.
2978 * If we own the commit event, then we can commit
2979 * all others that interrupted us, since the interruptions
2980 * are in stack format (they finish before they come
2981 * back to us). This allows us to do a simple loop to
2982 * assign the commit to the tail.
2983 */
2984 again:
2985 max_count = cpu_buffer->nr_pages * 100;
2986
2987 while (cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)) {
2988 if (RB_WARN_ON(cpu_buffer, !(--max_count)))
2989 return;
2990 if (RB_WARN_ON(cpu_buffer,
2991 rb_is_reader_page(cpu_buffer->tail_page)))
2992 return;
2993 /*
2994 * No need for a memory barrier here, as the update
2995 * of the tail_page did it for this page.
2996 */
2997 local_set(&cpu_buffer->commit_page->page->commit,
2998 rb_page_write(cpu_buffer->commit_page));
2999 rb_inc_page(&cpu_buffer->commit_page);
3000 /* add barrier to keep gcc from optimizing too much */
3001 barrier();
3002 }
3003 while (rb_commit_index(cpu_buffer) !=
3004 rb_page_write(cpu_buffer->commit_page)) {
3005
3006 /* Make sure the readers see the content of what is committed. */
3007 smp_wmb();
3008 local_set(&cpu_buffer->commit_page->page->commit,
3009 rb_page_write(cpu_buffer->commit_page));
3010 RB_WARN_ON(cpu_buffer,
3011 local_read(&cpu_buffer->commit_page->page->commit) &
3012 ~RB_WRITE_MASK);
3013 barrier();
3014 }
3015
3016 /* again, keep gcc from optimizing */
3017 barrier();
3018
3019 /*
3020 * If an interrupt came in just after the first while loop
3021 * and pushed the tail page forward, we will be left with
3022 * a dangling commit that will never go forward.
3023 */
3024 if (unlikely(cpu_buffer->commit_page != READ_ONCE(cpu_buffer->tail_page)))
3025 goto again;
3026 }
3027
rb_end_commit(struct ring_buffer_per_cpu * cpu_buffer)3028 static __always_inline void rb_end_commit(struct ring_buffer_per_cpu *cpu_buffer)
3029 {
3030 unsigned long commits;
3031
3032 if (RB_WARN_ON(cpu_buffer,
3033 !local_read(&cpu_buffer->committing)))
3034 return;
3035
3036 again:
3037 commits = local_read(&cpu_buffer->commits);
3038 /* synchronize with interrupts */
3039 barrier();
3040 if (local_read(&cpu_buffer->committing) == 1)
3041 rb_set_commit_to_write(cpu_buffer);
3042
3043 local_dec(&cpu_buffer->committing);
3044
3045 /* synchronize with interrupts */
3046 barrier();
3047
3048 /*
3049 * Need to account for interrupts coming in between the
3050 * updating of the commit page and the clearing of the
3051 * committing counter.
3052 */
3053 if (unlikely(local_read(&cpu_buffer->commits) != commits) &&
3054 !local_read(&cpu_buffer->committing)) {
3055 local_inc(&cpu_buffer->committing);
3056 goto again;
3057 }
3058 }
3059
rb_event_discard(struct ring_buffer_event * event)3060 static inline void rb_event_discard(struct ring_buffer_event *event)
3061 {
3062 if (extended_time(event))
3063 event = skip_time_extend(event);
3064
3065 /* array[0] holds the actual length for the discarded event */
3066 event->array[0] = rb_event_data_length(event) - RB_EVNT_HDR_SIZE;
3067 event->type_len = RINGBUF_TYPE_PADDING;
3068 /* time delta must be non zero */
3069 if (!event->time_delta)
3070 event->time_delta = 1;
3071 }
3072
rb_commit(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3073 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer,
3074 struct ring_buffer_event *event)
3075 {
3076 local_inc(&cpu_buffer->entries);
3077 rb_end_commit(cpu_buffer);
3078 }
3079
3080 static __always_inline void
rb_wakeups(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer)3081 rb_wakeups(struct trace_buffer *buffer, struct ring_buffer_per_cpu *cpu_buffer)
3082 {
3083 if (buffer->irq_work.waiters_pending) {
3084 buffer->irq_work.waiters_pending = false;
3085 /* irq_work_queue() supplies it's own memory barriers */
3086 irq_work_queue(&buffer->irq_work.work);
3087 }
3088
3089 if (cpu_buffer->irq_work.waiters_pending) {
3090 cpu_buffer->irq_work.waiters_pending = false;
3091 /* irq_work_queue() supplies it's own memory barriers */
3092 irq_work_queue(&cpu_buffer->irq_work.work);
3093 }
3094
3095 if (cpu_buffer->last_pages_touch == local_read(&cpu_buffer->pages_touched))
3096 return;
3097
3098 if (cpu_buffer->reader_page == cpu_buffer->commit_page)
3099 return;
3100
3101 if (!cpu_buffer->irq_work.full_waiters_pending)
3102 return;
3103
3104 cpu_buffer->last_pages_touch = local_read(&cpu_buffer->pages_touched);
3105
3106 if (!full_hit(buffer, cpu_buffer->cpu, cpu_buffer->shortest_full))
3107 return;
3108
3109 cpu_buffer->irq_work.wakeup_full = true;
3110 cpu_buffer->irq_work.full_waiters_pending = false;
3111 /* irq_work_queue() supplies it's own memory barriers */
3112 irq_work_queue(&cpu_buffer->irq_work.work);
3113 }
3114
3115 #ifdef CONFIG_RING_BUFFER_RECORD_RECURSION
3116 # define do_ring_buffer_record_recursion() \
3117 do_ftrace_record_recursion(_THIS_IP_, _RET_IP_)
3118 #else
3119 # define do_ring_buffer_record_recursion() do { } while (0)
3120 #endif
3121
3122 /*
3123 * The lock and unlock are done within a preempt disable section.
3124 * The current_context per_cpu variable can only be modified
3125 * by the current task between lock and unlock. But it can
3126 * be modified more than once via an interrupt. To pass this
3127 * information from the lock to the unlock without having to
3128 * access the 'in_interrupt()' functions again (which do show
3129 * a bit of overhead in something as critical as function tracing,
3130 * we use a bitmask trick.
3131 *
3132 * bit 1 = NMI context
3133 * bit 2 = IRQ context
3134 * bit 3 = SoftIRQ context
3135 * bit 4 = normal context.
3136 *
3137 * This works because this is the order of contexts that can
3138 * preempt other contexts. A SoftIRQ never preempts an IRQ
3139 * context.
3140 *
3141 * When the context is determined, the corresponding bit is
3142 * checked and set (if it was set, then a recursion of that context
3143 * happened).
3144 *
3145 * On unlock, we need to clear this bit. To do so, just subtract
3146 * 1 from the current_context and AND it to itself.
3147 *
3148 * (binary)
3149 * 101 - 1 = 100
3150 * 101 & 100 = 100 (clearing bit zero)
3151 *
3152 * 1010 - 1 = 1001
3153 * 1010 & 1001 = 1000 (clearing bit 1)
3154 *
3155 * The least significant bit can be cleared this way, and it
3156 * just so happens that it is the same bit corresponding to
3157 * the current context.
3158 *
3159 * Now the TRANSITION bit breaks the above slightly. The TRANSITION bit
3160 * is set when a recursion is detected at the current context, and if
3161 * the TRANSITION bit is already set, it will fail the recursion.
3162 * This is needed because there's a lag between the changing of
3163 * interrupt context and updating the preempt count. In this case,
3164 * a false positive will be found. To handle this, one extra recursion
3165 * is allowed, and this is done by the TRANSITION bit. If the TRANSITION
3166 * bit is already set, then it is considered a recursion and the function
3167 * ends. Otherwise, the TRANSITION bit is set, and that bit is returned.
3168 *
3169 * On the trace_recursive_unlock(), the TRANSITION bit will be the first
3170 * to be cleared. Even if it wasn't the context that set it. That is,
3171 * if an interrupt comes in while NORMAL bit is set and the ring buffer
3172 * is called before preempt_count() is updated, since the check will
3173 * be on the NORMAL bit, the TRANSITION bit will then be set. If an
3174 * NMI then comes in, it will set the NMI bit, but when the NMI code
3175 * does the trace_recursive_unlock() it will clear the TRANSITION bit
3176 * and leave the NMI bit set. But this is fine, because the interrupt
3177 * code that set the TRANSITION bit will then clear the NMI bit when it
3178 * calls trace_recursive_unlock(). If another NMI comes in, it will
3179 * set the TRANSITION bit and continue.
3180 *
3181 * Note: The TRANSITION bit only handles a single transition between context.
3182 */
3183
3184 static __always_inline int
trace_recursive_lock(struct ring_buffer_per_cpu * cpu_buffer)3185 trace_recursive_lock(struct ring_buffer_per_cpu *cpu_buffer)
3186 {
3187 unsigned int val = cpu_buffer->current_context;
3188 int bit = interrupt_context_level();
3189
3190 bit = RB_CTX_NORMAL - bit;
3191
3192 if (unlikely(val & (1 << (bit + cpu_buffer->nest)))) {
3193 /*
3194 * It is possible that this was called by transitioning
3195 * between interrupt context, and preempt_count() has not
3196 * been updated yet. In this case, use the TRANSITION bit.
3197 */
3198 bit = RB_CTX_TRANSITION;
3199 if (val & (1 << (bit + cpu_buffer->nest))) {
3200 do_ring_buffer_record_recursion();
3201 return 1;
3202 }
3203 }
3204
3205 val |= (1 << (bit + cpu_buffer->nest));
3206 cpu_buffer->current_context = val;
3207
3208 return 0;
3209 }
3210
3211 static __always_inline void
trace_recursive_unlock(struct ring_buffer_per_cpu * cpu_buffer)3212 trace_recursive_unlock(struct ring_buffer_per_cpu *cpu_buffer)
3213 {
3214 cpu_buffer->current_context &=
3215 cpu_buffer->current_context - (1 << cpu_buffer->nest);
3216 }
3217
3218 /* The recursive locking above uses 5 bits */
3219 #define NESTED_BITS 5
3220
3221 /**
3222 * ring_buffer_nest_start - Allow to trace while nested
3223 * @buffer: The ring buffer to modify
3224 *
3225 * The ring buffer has a safety mechanism to prevent recursion.
3226 * But there may be a case where a trace needs to be done while
3227 * tracing something else. In this case, calling this function
3228 * will allow this function to nest within a currently active
3229 * ring_buffer_lock_reserve().
3230 *
3231 * Call this function before calling another ring_buffer_lock_reserve() and
3232 * call ring_buffer_nest_end() after the nested ring_buffer_unlock_commit().
3233 */
ring_buffer_nest_start(struct trace_buffer * buffer)3234 void ring_buffer_nest_start(struct trace_buffer *buffer)
3235 {
3236 struct ring_buffer_per_cpu *cpu_buffer;
3237 int cpu;
3238
3239 /* Enabled by ring_buffer_nest_end() */
3240 preempt_disable_notrace();
3241 cpu = raw_smp_processor_id();
3242 cpu_buffer = buffer->buffers[cpu];
3243 /* This is the shift value for the above recursive locking */
3244 cpu_buffer->nest += NESTED_BITS;
3245 }
3246
3247 /**
3248 * ring_buffer_nest_end - Allow to trace while nested
3249 * @buffer: The ring buffer to modify
3250 *
3251 * Must be called after ring_buffer_nest_start() and after the
3252 * ring_buffer_unlock_commit().
3253 */
ring_buffer_nest_end(struct trace_buffer * buffer)3254 void ring_buffer_nest_end(struct trace_buffer *buffer)
3255 {
3256 struct ring_buffer_per_cpu *cpu_buffer;
3257 int cpu;
3258
3259 /* disabled by ring_buffer_nest_start() */
3260 cpu = raw_smp_processor_id();
3261 cpu_buffer = buffer->buffers[cpu];
3262 /* This is the shift value for the above recursive locking */
3263 cpu_buffer->nest -= NESTED_BITS;
3264 preempt_enable_notrace();
3265 }
3266
3267 /**
3268 * ring_buffer_unlock_commit - commit a reserved
3269 * @buffer: The buffer to commit to
3270 * @event: The event pointer to commit.
3271 *
3272 * This commits the data to the ring buffer, and releases any locks held.
3273 *
3274 * Must be paired with ring_buffer_lock_reserve.
3275 */
ring_buffer_unlock_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3276 int ring_buffer_unlock_commit(struct trace_buffer *buffer,
3277 struct ring_buffer_event *event)
3278 {
3279 struct ring_buffer_per_cpu *cpu_buffer;
3280 int cpu = raw_smp_processor_id();
3281
3282 cpu_buffer = buffer->buffers[cpu];
3283
3284 rb_commit(cpu_buffer, event);
3285
3286 rb_wakeups(buffer, cpu_buffer);
3287
3288 trace_recursive_unlock(cpu_buffer);
3289
3290 preempt_enable_notrace();
3291
3292 return 0;
3293 }
3294 EXPORT_SYMBOL_GPL(ring_buffer_unlock_commit);
3295
3296 /* Special value to validate all deltas on a page. */
3297 #define CHECK_FULL_PAGE 1L
3298
3299 #ifdef CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS
dump_buffer_page(struct buffer_data_page * bpage,struct rb_event_info * info,unsigned long tail)3300 static void dump_buffer_page(struct buffer_data_page *bpage,
3301 struct rb_event_info *info,
3302 unsigned long tail)
3303 {
3304 struct ring_buffer_event *event;
3305 u64 ts, delta;
3306 int e;
3307
3308 ts = bpage->time_stamp;
3309 pr_warn(" [%lld] PAGE TIME STAMP\n", ts);
3310
3311 for (e = 0; e < tail; e += rb_event_length(event)) {
3312
3313 event = (struct ring_buffer_event *)(bpage->data + e);
3314
3315 switch (event->type_len) {
3316
3317 case RINGBUF_TYPE_TIME_EXTEND:
3318 delta = rb_event_time_stamp(event);
3319 ts += delta;
3320 pr_warn(" [%lld] delta:%lld TIME EXTEND\n", ts, delta);
3321 break;
3322
3323 case RINGBUF_TYPE_TIME_STAMP:
3324 delta = rb_event_time_stamp(event);
3325 ts = delta;
3326 pr_warn(" [%lld] absolute:%lld TIME STAMP\n", ts, delta);
3327 break;
3328
3329 case RINGBUF_TYPE_PADDING:
3330 ts += event->time_delta;
3331 pr_warn(" [%lld] delta:%d PADDING\n", ts, event->time_delta);
3332 break;
3333
3334 case RINGBUF_TYPE_DATA:
3335 ts += event->time_delta;
3336 pr_warn(" [%lld] delta:%d\n", ts, event->time_delta);
3337 break;
3338
3339 default:
3340 break;
3341 }
3342 }
3343 }
3344
3345 static DEFINE_PER_CPU(atomic_t, checking);
3346 static atomic_t ts_dump;
3347
3348 /*
3349 * Check if the current event time stamp matches the deltas on
3350 * the buffer page.
3351 */
check_buffer(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info,unsigned long tail)3352 static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3353 struct rb_event_info *info,
3354 unsigned long tail)
3355 {
3356 struct ring_buffer_event *event;
3357 struct buffer_data_page *bpage;
3358 u64 ts, delta;
3359 bool full = false;
3360 int e;
3361
3362 bpage = info->tail_page->page;
3363
3364 if (tail == CHECK_FULL_PAGE) {
3365 full = true;
3366 tail = local_read(&bpage->commit);
3367 } else if (info->add_timestamp &
3368 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE)) {
3369 /* Ignore events with absolute time stamps */
3370 return;
3371 }
3372
3373 /*
3374 * Do not check the first event (skip possible extends too).
3375 * Also do not check if previous events have not been committed.
3376 */
3377 if (tail <= 8 || tail > local_read(&bpage->commit))
3378 return;
3379
3380 /*
3381 * If this interrupted another event,
3382 */
3383 if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
3384 goto out;
3385
3386 ts = bpage->time_stamp;
3387
3388 for (e = 0; e < tail; e += rb_event_length(event)) {
3389
3390 event = (struct ring_buffer_event *)(bpage->data + e);
3391
3392 switch (event->type_len) {
3393
3394 case RINGBUF_TYPE_TIME_EXTEND:
3395 delta = rb_event_time_stamp(event);
3396 ts += delta;
3397 break;
3398
3399 case RINGBUF_TYPE_TIME_STAMP:
3400 delta = rb_event_time_stamp(event);
3401 ts = delta;
3402 break;
3403
3404 case RINGBUF_TYPE_PADDING:
3405 if (event->time_delta == 1)
3406 break;
3407 fallthrough;
3408 case RINGBUF_TYPE_DATA:
3409 ts += event->time_delta;
3410 break;
3411
3412 default:
3413 RB_WARN_ON(cpu_buffer, 1);
3414 }
3415 }
3416 if ((full && ts > info->ts) ||
3417 (!full && ts + info->delta != info->ts)) {
3418 /* If another report is happening, ignore this one */
3419 if (atomic_inc_return(&ts_dump) != 1) {
3420 atomic_dec(&ts_dump);
3421 goto out;
3422 }
3423 atomic_inc(&cpu_buffer->record_disabled);
3424 /* There's some cases in boot up that this can happen */
3425 WARN_ON_ONCE(system_state != SYSTEM_BOOTING);
3426 pr_warn("[CPU: %d]TIME DOES NOT MATCH expected:%lld actual:%lld delta:%lld before:%lld after:%lld%s\n",
3427 cpu_buffer->cpu,
3428 ts + info->delta, info->ts, info->delta,
3429 info->before, info->after,
3430 full ? " (full)" : "");
3431 dump_buffer_page(bpage, info, tail);
3432 atomic_dec(&ts_dump);
3433 /* Do not re-enable checking */
3434 return;
3435 }
3436 out:
3437 atomic_dec(this_cpu_ptr(&checking));
3438 }
3439 #else
check_buffer(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info,unsigned long tail)3440 static inline void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
3441 struct rb_event_info *info,
3442 unsigned long tail)
3443 {
3444 }
3445 #endif /* CONFIG_RING_BUFFER_VALIDATE_TIME_DELTAS */
3446
3447 static struct ring_buffer_event *
__rb_reserve_next(struct ring_buffer_per_cpu * cpu_buffer,struct rb_event_info * info)3448 __rb_reserve_next(struct ring_buffer_per_cpu *cpu_buffer,
3449 struct rb_event_info *info)
3450 {
3451 struct ring_buffer_event *event;
3452 struct buffer_page *tail_page;
3453 unsigned long tail, write, w;
3454 bool a_ok;
3455 bool b_ok;
3456
3457 /* Don't let the compiler play games with cpu_buffer->tail_page */
3458 tail_page = info->tail_page = READ_ONCE(cpu_buffer->tail_page);
3459
3460 /*A*/ w = local_read(&tail_page->write) & RB_WRITE_MASK;
3461 barrier();
3462 b_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3463 a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3464 barrier();
3465 info->ts = rb_time_stamp(cpu_buffer->buffer);
3466
3467 if ((info->add_timestamp & RB_ADD_STAMP_ABSOLUTE)) {
3468 info->delta = info->ts;
3469 } else {
3470 /*
3471 * If interrupting an event time update, we may need an
3472 * absolute timestamp.
3473 * Don't bother if this is the start of a new page (w == 0).
3474 */
3475 if (!w) {
3476 /* Use the sub-buffer timestamp */
3477 info->delta = 0;
3478 } else if (unlikely(!a_ok || !b_ok || info->before != info->after)) {
3479 info->add_timestamp |= RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND;
3480 info->length += RB_LEN_TIME_EXTEND;
3481 } else {
3482 info->delta = info->ts - info->after;
3483 if (unlikely(test_time_stamp(info->delta))) {
3484 info->add_timestamp |= RB_ADD_STAMP_EXTEND;
3485 info->length += RB_LEN_TIME_EXTEND;
3486 }
3487 }
3488 }
3489
3490 /*B*/ rb_time_set(&cpu_buffer->before_stamp, info->ts);
3491
3492 /*C*/ write = local_add_return(info->length, &tail_page->write);
3493
3494 /* set write to only the index of the write */
3495 write &= RB_WRITE_MASK;
3496
3497 tail = write - info->length;
3498
3499 /* See if we shot pass the end of this buffer page */
3500 if (unlikely(write > BUF_PAGE_SIZE)) {
3501 check_buffer(cpu_buffer, info, CHECK_FULL_PAGE);
3502 return rb_move_tail(cpu_buffer, tail, info);
3503 }
3504
3505 if (likely(tail == w)) {
3506 /* Nothing interrupted us between A and C */
3507 /*D*/ rb_time_set(&cpu_buffer->write_stamp, info->ts);
3508 /*
3509 * If something came in between C and D, the write stamp
3510 * may now not be in sync. But that's fine as the before_stamp
3511 * will be different and then next event will just be forced
3512 * to use an absolute timestamp.
3513 */
3514 if (likely(!(info->add_timestamp &
3515 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3516 /* This did not interrupt any time update */
3517 info->delta = info->ts - info->after;
3518 else
3519 /* Just use full timestamp for interrupting event */
3520 info->delta = info->ts;
3521 check_buffer(cpu_buffer, info, tail);
3522 } else {
3523 u64 ts;
3524 /* SLOW PATH - Interrupted between A and C */
3525
3526 /* Save the old before_stamp */
3527 a_ok = rb_time_read(&cpu_buffer->before_stamp, &info->before);
3528 RB_WARN_ON(cpu_buffer, !a_ok);
3529
3530 /*
3531 * Read a new timestamp and update the before_stamp to make
3532 * the next event after this one force using an absolute
3533 * timestamp. This is in case an interrupt were to come in
3534 * between E and F.
3535 */
3536 ts = rb_time_stamp(cpu_buffer->buffer);
3537 rb_time_set(&cpu_buffer->before_stamp, ts);
3538
3539 barrier();
3540 /*E*/ a_ok = rb_time_read(&cpu_buffer->write_stamp, &info->after);
3541 /* Was interrupted before here, write_stamp must be valid */
3542 RB_WARN_ON(cpu_buffer, !a_ok);
3543 barrier();
3544 /*F*/ if (write == (local_read(&tail_page->write) & RB_WRITE_MASK) &&
3545 info->after == info->before && info->after < ts) {
3546 /*
3547 * Nothing came after this event between C and F, it is
3548 * safe to use info->after for the delta as it
3549 * matched info->before and is still valid.
3550 */
3551 info->delta = ts - info->after;
3552 } else {
3553 /*
3554 * Interrupted between C and F:
3555 * Lost the previous events time stamp. Just set the
3556 * delta to zero, and this will be the same time as
3557 * the event this event interrupted. And the events that
3558 * came after this will still be correct (as they would
3559 * have built their delta on the previous event.
3560 */
3561 info->delta = 0;
3562 }
3563 info->ts = ts;
3564 info->add_timestamp &= ~RB_ADD_STAMP_FORCE;
3565 }
3566
3567 /*
3568 * If this is the first commit on the page, then it has the same
3569 * timestamp as the page itself.
3570 */
3571 if (unlikely(!tail && !(info->add_timestamp &
3572 (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_ABSOLUTE))))
3573 info->delta = 0;
3574
3575 /* We reserved something on the buffer */
3576
3577 event = __rb_page_index(tail_page, tail);
3578 rb_update_event(cpu_buffer, event, info);
3579
3580 local_inc(&tail_page->entries);
3581
3582 /*
3583 * If this is the first commit on the page, then update
3584 * its timestamp.
3585 */
3586 if (unlikely(!tail))
3587 tail_page->page->time_stamp = info->ts;
3588
3589 /* account for these added bytes */
3590 local_add(info->length, &cpu_buffer->entries_bytes);
3591
3592 return event;
3593 }
3594
3595 static __always_inline struct ring_buffer_event *
rb_reserve_next_event(struct trace_buffer * buffer,struct ring_buffer_per_cpu * cpu_buffer,unsigned long length)3596 rb_reserve_next_event(struct trace_buffer *buffer,
3597 struct ring_buffer_per_cpu *cpu_buffer,
3598 unsigned long length)
3599 {
3600 struct ring_buffer_event *event;
3601 struct rb_event_info info;
3602 int nr_loops = 0;
3603 int add_ts_default;
3604
3605 /* ring buffer does cmpxchg, make sure it is safe in NMI context */
3606 if (!IS_ENABLED(CONFIG_ARCH_HAVE_NMI_SAFE_CMPXCHG) &&
3607 (unlikely(in_nmi()))) {
3608 return NULL;
3609 }
3610
3611 rb_start_commit(cpu_buffer);
3612 /* The commit page can not change after this */
3613
3614 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
3615 /*
3616 * Due to the ability to swap a cpu buffer from a buffer
3617 * it is possible it was swapped before we committed.
3618 * (committing stops a swap). We check for it here and
3619 * if it happened, we have to fail the write.
3620 */
3621 barrier();
3622 if (unlikely(READ_ONCE(cpu_buffer->buffer) != buffer)) {
3623 local_dec(&cpu_buffer->committing);
3624 local_dec(&cpu_buffer->commits);
3625 return NULL;
3626 }
3627 #endif
3628
3629 info.length = rb_calculate_event_length(length);
3630
3631 if (ring_buffer_time_stamp_abs(cpu_buffer->buffer)) {
3632 add_ts_default = RB_ADD_STAMP_ABSOLUTE;
3633 info.length += RB_LEN_TIME_EXTEND;
3634 if (info.length > BUF_MAX_DATA_SIZE)
3635 goto out_fail;
3636 } else {
3637 add_ts_default = RB_ADD_STAMP_NONE;
3638 }
3639
3640 again:
3641 info.add_timestamp = add_ts_default;
3642 info.delta = 0;
3643
3644 /*
3645 * We allow for interrupts to reenter here and do a trace.
3646 * If one does, it will cause this original code to loop
3647 * back here. Even with heavy interrupts happening, this
3648 * should only happen a few times in a row. If this happens
3649 * 1000 times in a row, there must be either an interrupt
3650 * storm or we have something buggy.
3651 * Bail!
3652 */
3653 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 1000))
3654 goto out_fail;
3655
3656 event = __rb_reserve_next(cpu_buffer, &info);
3657
3658 if (unlikely(PTR_ERR(event) == -EAGAIN)) {
3659 if (info.add_timestamp & (RB_ADD_STAMP_FORCE | RB_ADD_STAMP_EXTEND))
3660 info.length -= RB_LEN_TIME_EXTEND;
3661 goto again;
3662 }
3663
3664 if (likely(event))
3665 return event;
3666 out_fail:
3667 rb_end_commit(cpu_buffer);
3668 return NULL;
3669 }
3670
3671 /**
3672 * ring_buffer_lock_reserve - reserve a part of the buffer
3673 * @buffer: the ring buffer to reserve from
3674 * @length: the length of the data to reserve (excluding event header)
3675 *
3676 * Returns a reserved event on the ring buffer to copy directly to.
3677 * The user of this interface will need to get the body to write into
3678 * and can use the ring_buffer_event_data() interface.
3679 *
3680 * The length is the length of the data needed, not the event length
3681 * which also includes the event header.
3682 *
3683 * Must be paired with ring_buffer_unlock_commit, unless NULL is returned.
3684 * If NULL is returned, then nothing has been allocated or locked.
3685 */
3686 struct ring_buffer_event *
ring_buffer_lock_reserve(struct trace_buffer * buffer,unsigned long length)3687 ring_buffer_lock_reserve(struct trace_buffer *buffer, unsigned long length)
3688 {
3689 struct ring_buffer_per_cpu *cpu_buffer;
3690 struct ring_buffer_event *event;
3691 int cpu;
3692
3693 /* If we are tracing schedule, we don't want to recurse */
3694 preempt_disable_notrace();
3695
3696 if (unlikely(atomic_read(&buffer->record_disabled)))
3697 goto out;
3698
3699 cpu = raw_smp_processor_id();
3700
3701 if (unlikely(!cpumask_test_cpu(cpu, buffer->cpumask)))
3702 goto out;
3703
3704 cpu_buffer = buffer->buffers[cpu];
3705
3706 if (unlikely(atomic_read(&cpu_buffer->record_disabled)))
3707 goto out;
3708
3709 if (unlikely(length > BUF_MAX_DATA_SIZE))
3710 goto out;
3711
3712 if (unlikely(trace_recursive_lock(cpu_buffer)))
3713 goto out;
3714
3715 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3716 if (!event)
3717 goto out_unlock;
3718
3719 return event;
3720
3721 out_unlock:
3722 trace_recursive_unlock(cpu_buffer);
3723 out:
3724 preempt_enable_notrace();
3725 return NULL;
3726 }
3727 EXPORT_SYMBOL_GPL(ring_buffer_lock_reserve);
3728
3729 /*
3730 * Decrement the entries to the page that an event is on.
3731 * The event does not even need to exist, only the pointer
3732 * to the page it is on. This may only be called before the commit
3733 * takes place.
3734 */
3735 static inline void
rb_decrement_entry(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)3736 rb_decrement_entry(struct ring_buffer_per_cpu *cpu_buffer,
3737 struct ring_buffer_event *event)
3738 {
3739 unsigned long addr = (unsigned long)event;
3740 struct buffer_page *bpage = cpu_buffer->commit_page;
3741 struct buffer_page *start;
3742
3743 addr &= PAGE_MASK;
3744
3745 /* Do the likely case first */
3746 if (likely(bpage->page == (void *)addr)) {
3747 local_dec(&bpage->entries);
3748 return;
3749 }
3750
3751 /*
3752 * Because the commit page may be on the reader page we
3753 * start with the next page and check the end loop there.
3754 */
3755 rb_inc_page(&bpage);
3756 start = bpage;
3757 do {
3758 if (bpage->page == (void *)addr) {
3759 local_dec(&bpage->entries);
3760 return;
3761 }
3762 rb_inc_page(&bpage);
3763 } while (bpage != start);
3764
3765 /* commit not part of this buffer?? */
3766 RB_WARN_ON(cpu_buffer, 1);
3767 }
3768
3769 /**
3770 * ring_buffer_discard_commit - discard an event that has not been committed
3771 * @buffer: the ring buffer
3772 * @event: non committed event to discard
3773 *
3774 * Sometimes an event that is in the ring buffer needs to be ignored.
3775 * This function lets the user discard an event in the ring buffer
3776 * and then that event will not be read later.
3777 *
3778 * This function only works if it is called before the item has been
3779 * committed. It will try to free the event from the ring buffer
3780 * if another event has not been added behind it.
3781 *
3782 * If another event has been added behind it, it will set the event
3783 * up as discarded, and perform the commit.
3784 *
3785 * If this function is called, do not call ring_buffer_unlock_commit on
3786 * the event.
3787 */
ring_buffer_discard_commit(struct trace_buffer * buffer,struct ring_buffer_event * event)3788 void ring_buffer_discard_commit(struct trace_buffer *buffer,
3789 struct ring_buffer_event *event)
3790 {
3791 struct ring_buffer_per_cpu *cpu_buffer;
3792 int cpu;
3793
3794 if (unlikely(has_ext_writer(buffer)))
3795 return;
3796
3797 /* The event is discarded regardless */
3798 rb_event_discard(event);
3799
3800 cpu = smp_processor_id();
3801 cpu_buffer = buffer->buffers[cpu];
3802
3803 /*
3804 * This must only be called if the event has not been
3805 * committed yet. Thus we can assume that preemption
3806 * is still disabled.
3807 */
3808 RB_WARN_ON(buffer, !local_read(&cpu_buffer->committing));
3809
3810 rb_decrement_entry(cpu_buffer, event);
3811 if (rb_try_to_discard(cpu_buffer, event))
3812 goto out;
3813
3814 out:
3815 rb_end_commit(cpu_buffer);
3816
3817 trace_recursive_unlock(cpu_buffer);
3818
3819 preempt_enable_notrace();
3820
3821 }
3822 EXPORT_SYMBOL_GPL(ring_buffer_discard_commit);
3823
3824 /**
3825 * ring_buffer_write - write data to the buffer without reserving
3826 * @buffer: The ring buffer to write to.
3827 * @length: The length of the data being written (excluding the event header)
3828 * @data: The data to write to the buffer.
3829 *
3830 * This is like ring_buffer_lock_reserve and ring_buffer_unlock_commit as
3831 * one function. If you already have the data to write to the buffer, it
3832 * may be easier to simply call this function.
3833 *
3834 * Note, like ring_buffer_lock_reserve, the length is the length of the data
3835 * and not the length of the event which would hold the header.
3836 */
ring_buffer_write(struct trace_buffer * buffer,unsigned long length,void * data)3837 int ring_buffer_write(struct trace_buffer *buffer,
3838 unsigned long length,
3839 void *data)
3840 {
3841 struct ring_buffer_per_cpu *cpu_buffer;
3842 struct ring_buffer_event *event;
3843 void *body;
3844 int ret = -EBUSY;
3845 int cpu;
3846
3847 preempt_disable_notrace();
3848
3849 if (atomic_read(&buffer->record_disabled))
3850 goto out;
3851
3852 cpu = raw_smp_processor_id();
3853
3854 if (!cpumask_test_cpu(cpu, buffer->cpumask))
3855 goto out;
3856
3857 cpu_buffer = buffer->buffers[cpu];
3858
3859 if (atomic_read(&cpu_buffer->record_disabled))
3860 goto out;
3861
3862 if (length > BUF_MAX_DATA_SIZE)
3863 goto out;
3864
3865 if (unlikely(trace_recursive_lock(cpu_buffer)))
3866 goto out;
3867
3868 event = rb_reserve_next_event(buffer, cpu_buffer, length);
3869 if (!event)
3870 goto out_unlock;
3871
3872 body = rb_event_data(event);
3873
3874 memcpy(body, data, length);
3875
3876 rb_commit(cpu_buffer, event);
3877
3878 rb_wakeups(buffer, cpu_buffer);
3879
3880 ret = 0;
3881
3882 out_unlock:
3883 trace_recursive_unlock(cpu_buffer);
3884
3885 out:
3886 preempt_enable_notrace();
3887
3888 return ret;
3889 }
3890 EXPORT_SYMBOL_GPL(ring_buffer_write);
3891
rb_per_cpu_empty(struct ring_buffer_per_cpu * cpu_buffer)3892 static bool rb_per_cpu_empty(struct ring_buffer_per_cpu *cpu_buffer)
3893 {
3894 struct buffer_page *reader = cpu_buffer->reader_page;
3895 struct buffer_page *head = rb_set_head_page(cpu_buffer);
3896 struct buffer_page *commit = cpu_buffer->commit_page;
3897
3898 /* In case of error, head will be NULL */
3899 if (unlikely(!head))
3900 return true;
3901
3902 /* Reader should exhaust content in reader page */
3903 if (reader->read != rb_page_commit(reader))
3904 return false;
3905
3906 /*
3907 * If writers are committing on the reader page, knowing all
3908 * committed content has been read, the ring buffer is empty.
3909 */
3910 if (commit == reader)
3911 return true;
3912
3913 /*
3914 * If writers are committing on a page other than reader page
3915 * and head page, there should always be content to read.
3916 */
3917 if (commit != head)
3918 return false;
3919
3920 /*
3921 * Writers are committing on the head page, we just need
3922 * to care about there're committed data, and the reader will
3923 * swap reader page with head page when it is to read data.
3924 */
3925 return rb_page_commit(commit) == 0;
3926 }
3927
3928 /**
3929 * ring_buffer_record_disable - stop all writes into the buffer
3930 * @buffer: The ring buffer to stop writes to.
3931 *
3932 * This prevents all writes to the buffer. Any attempt to write
3933 * to the buffer after this will fail and return NULL.
3934 *
3935 * The caller should call synchronize_rcu() after this.
3936 */
ring_buffer_record_disable(struct trace_buffer * buffer)3937 void ring_buffer_record_disable(struct trace_buffer *buffer)
3938 {
3939 atomic_inc(&buffer->record_disabled);
3940 }
3941 EXPORT_SYMBOL_GPL(ring_buffer_record_disable);
3942
3943 /**
3944 * ring_buffer_record_enable - enable writes to the buffer
3945 * @buffer: The ring buffer to enable writes
3946 *
3947 * Note, multiple disables will need the same number of enables
3948 * to truly enable the writing (much like preempt_disable).
3949 */
ring_buffer_record_enable(struct trace_buffer * buffer)3950 void ring_buffer_record_enable(struct trace_buffer *buffer)
3951 {
3952 if (unlikely(has_ext_writer(buffer)))
3953 return;
3954
3955 atomic_dec(&buffer->record_disabled);
3956 }
3957 EXPORT_SYMBOL_GPL(ring_buffer_record_enable);
3958
3959 /**
3960 * ring_buffer_record_off - stop all writes into the buffer
3961 * @buffer: The ring buffer to stop writes to.
3962 *
3963 * This prevents all writes to the buffer. Any attempt to write
3964 * to the buffer after this will fail and return NULL.
3965 *
3966 * This is different than ring_buffer_record_disable() as
3967 * it works like an on/off switch, where as the disable() version
3968 * must be paired with a enable().
3969 */
ring_buffer_record_off(struct trace_buffer * buffer)3970 void ring_buffer_record_off(struct trace_buffer *buffer)
3971 {
3972 unsigned int rd;
3973 unsigned int new_rd;
3974
3975 do {
3976 rd = atomic_read(&buffer->record_disabled);
3977 new_rd = rd | RB_BUFFER_OFF;
3978 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
3979 }
3980 EXPORT_SYMBOL_GPL(ring_buffer_record_off);
3981
3982 /**
3983 * ring_buffer_record_on - restart writes into the buffer
3984 * @buffer: The ring buffer to start writes to.
3985 *
3986 * This enables all writes to the buffer that was disabled by
3987 * ring_buffer_record_off().
3988 *
3989 * This is different than ring_buffer_record_enable() as
3990 * it works like an on/off switch, where as the enable() version
3991 * must be paired with a disable().
3992 */
ring_buffer_record_on(struct trace_buffer * buffer)3993 void ring_buffer_record_on(struct trace_buffer *buffer)
3994 {
3995 unsigned int rd;
3996 unsigned int new_rd;
3997
3998 if (unlikely(has_ext_writer(buffer)))
3999 return;
4000
4001 do {
4002 rd = atomic_read(&buffer->record_disabled);
4003 new_rd = rd & ~RB_BUFFER_OFF;
4004 } while (atomic_cmpxchg(&buffer->record_disabled, rd, new_rd) != rd);
4005 }
4006 EXPORT_SYMBOL_GPL(ring_buffer_record_on);
4007
4008 /**
4009 * ring_buffer_record_is_on - return true if the ring buffer can write
4010 * @buffer: The ring buffer to see if write is enabled
4011 *
4012 * Returns true if the ring buffer is in a state that it accepts writes.
4013 */
ring_buffer_record_is_on(struct trace_buffer * buffer)4014 bool ring_buffer_record_is_on(struct trace_buffer *buffer)
4015 {
4016 return !atomic_read(&buffer->record_disabled);
4017 }
4018
4019 /**
4020 * ring_buffer_record_is_set_on - return true if the ring buffer is set writable
4021 * @buffer: The ring buffer to see if write is set enabled
4022 *
4023 * Returns true if the ring buffer is set writable by ring_buffer_record_on().
4024 * Note that this does NOT mean it is in a writable state.
4025 *
4026 * It may return true when the ring buffer has been disabled by
4027 * ring_buffer_record_disable(), as that is a temporary disabling of
4028 * the ring buffer.
4029 */
ring_buffer_record_is_set_on(struct trace_buffer * buffer)4030 bool ring_buffer_record_is_set_on(struct trace_buffer *buffer)
4031 {
4032 return !(atomic_read(&buffer->record_disabled) & RB_BUFFER_OFF);
4033 }
4034
4035 /**
4036 * ring_buffer_record_disable_cpu - stop all writes into the cpu_buffer
4037 * @buffer: The ring buffer to stop writes to.
4038 * @cpu: The CPU buffer to stop
4039 *
4040 * This prevents all writes to the buffer. Any attempt to write
4041 * to the buffer after this will fail and return NULL.
4042 *
4043 * The caller should call synchronize_rcu() after this.
4044 */
ring_buffer_record_disable_cpu(struct trace_buffer * buffer,int cpu)4045 void ring_buffer_record_disable_cpu(struct trace_buffer *buffer, int cpu)
4046 {
4047 struct ring_buffer_per_cpu *cpu_buffer;
4048
4049 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4050 return;
4051
4052 cpu_buffer = buffer->buffers[cpu];
4053 atomic_inc(&cpu_buffer->record_disabled);
4054 }
4055 EXPORT_SYMBOL_GPL(ring_buffer_record_disable_cpu);
4056
4057 /**
4058 * ring_buffer_record_enable_cpu - enable writes to the buffer
4059 * @buffer: The ring buffer to enable writes
4060 * @cpu: The CPU to enable.
4061 *
4062 * Note, multiple disables will need the same number of enables
4063 * to truly enable the writing (much like preempt_disable).
4064 */
ring_buffer_record_enable_cpu(struct trace_buffer * buffer,int cpu)4065 void ring_buffer_record_enable_cpu(struct trace_buffer *buffer, int cpu)
4066 {
4067 struct ring_buffer_per_cpu *cpu_buffer;
4068
4069 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4070 return;
4071
4072 cpu_buffer = buffer->buffers[cpu];
4073 atomic_dec(&cpu_buffer->record_disabled);
4074 }
4075 EXPORT_SYMBOL_GPL(ring_buffer_record_enable_cpu);
4076
4077 /*
4078 * The total entries in the ring buffer is the running counter
4079 * of entries entered into the ring buffer, minus the sum of
4080 * the entries read from the ring buffer and the number of
4081 * entries that were overwritten.
4082 */
4083 static inline unsigned long
rb_num_of_entries(struct ring_buffer_per_cpu * cpu_buffer)4084 rb_num_of_entries(struct ring_buffer_per_cpu *cpu_buffer)
4085 {
4086 return local_read(&cpu_buffer->entries) -
4087 (local_read(&cpu_buffer->overrun) + cpu_buffer->read);
4088 }
4089
4090 /**
4091 * ring_buffer_oldest_event_ts - get the oldest event timestamp from the buffer
4092 * @buffer: The ring buffer
4093 * @cpu: The per CPU buffer to read from.
4094 */
ring_buffer_oldest_event_ts(struct trace_buffer * buffer,int cpu)4095 u64 ring_buffer_oldest_event_ts(struct trace_buffer *buffer, int cpu)
4096 {
4097 unsigned long flags;
4098 struct ring_buffer_per_cpu *cpu_buffer;
4099 struct buffer_page *bpage;
4100 u64 ret = 0;
4101
4102 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4103 return 0;
4104
4105 cpu_buffer = buffer->buffers[cpu];
4106 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4107 /*
4108 * if the tail is on reader_page, oldest time stamp is on the reader
4109 * page
4110 */
4111 if (cpu_buffer->tail_page == cpu_buffer->reader_page)
4112 bpage = cpu_buffer->reader_page;
4113 else
4114 bpage = rb_set_head_page(cpu_buffer);
4115 if (bpage)
4116 ret = bpage->page->time_stamp;
4117 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4118
4119 return ret;
4120 }
4121 EXPORT_SYMBOL_GPL(ring_buffer_oldest_event_ts);
4122
4123 /**
4124 * ring_buffer_bytes_cpu - get the number of bytes unconsumed in a cpu buffer
4125 * @buffer: The ring buffer
4126 * @cpu: The per CPU buffer to read from.
4127 */
ring_buffer_bytes_cpu(struct trace_buffer * buffer,int cpu)4128 unsigned long ring_buffer_bytes_cpu(struct trace_buffer *buffer, int cpu)
4129 {
4130 struct ring_buffer_per_cpu *cpu_buffer;
4131 unsigned long ret;
4132
4133 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4134 return 0;
4135
4136 cpu_buffer = buffer->buffers[cpu];
4137 ret = local_read(&cpu_buffer->entries_bytes) - cpu_buffer->read_bytes;
4138
4139 return ret;
4140 }
4141 EXPORT_SYMBOL_GPL(ring_buffer_bytes_cpu);
4142
4143 /**
4144 * ring_buffer_entries_cpu - get the number of entries in a cpu buffer
4145 * @buffer: The ring buffer
4146 * @cpu: The per CPU buffer to get the entries from.
4147 */
ring_buffer_entries_cpu(struct trace_buffer * buffer,int cpu)4148 unsigned long ring_buffer_entries_cpu(struct trace_buffer *buffer, int cpu)
4149 {
4150 struct ring_buffer_per_cpu *cpu_buffer;
4151
4152 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4153 return 0;
4154
4155 cpu_buffer = buffer->buffers[cpu];
4156
4157 return rb_num_of_entries(cpu_buffer);
4158 }
4159 EXPORT_SYMBOL_GPL(ring_buffer_entries_cpu);
4160
4161 /**
4162 * ring_buffer_overrun_cpu - get the number of overruns caused by the ring
4163 * buffer wrapping around (only if RB_FL_OVERWRITE is on).
4164 * @buffer: The ring buffer
4165 * @cpu: The per CPU buffer to get the number of overruns from
4166 */
ring_buffer_overrun_cpu(struct trace_buffer * buffer,int cpu)4167 unsigned long ring_buffer_overrun_cpu(struct trace_buffer *buffer, int cpu)
4168 {
4169 struct ring_buffer_per_cpu *cpu_buffer;
4170 unsigned long ret;
4171
4172 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4173 return 0;
4174
4175 cpu_buffer = buffer->buffers[cpu];
4176 ret = local_read(&cpu_buffer->overrun);
4177
4178 return ret;
4179 }
4180 EXPORT_SYMBOL_GPL(ring_buffer_overrun_cpu);
4181
4182 /**
4183 * ring_buffer_commit_overrun_cpu - get the number of overruns caused by
4184 * commits failing due to the buffer wrapping around while there are uncommitted
4185 * events, such as during an interrupt storm.
4186 * @buffer: The ring buffer
4187 * @cpu: The per CPU buffer to get the number of overruns from
4188 */
4189 unsigned long
ring_buffer_commit_overrun_cpu(struct trace_buffer * buffer,int cpu)4190 ring_buffer_commit_overrun_cpu(struct trace_buffer *buffer, int cpu)
4191 {
4192 struct ring_buffer_per_cpu *cpu_buffer;
4193 unsigned long ret;
4194
4195 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4196 return 0;
4197
4198 cpu_buffer = buffer->buffers[cpu];
4199 ret = local_read(&cpu_buffer->commit_overrun);
4200
4201 return ret;
4202 }
4203 EXPORT_SYMBOL_GPL(ring_buffer_commit_overrun_cpu);
4204
4205 /**
4206 * ring_buffer_dropped_events_cpu - get the number of dropped events caused by
4207 * the ring buffer filling up (only if RB_FL_OVERWRITE is off).
4208 * @buffer: The ring buffer
4209 * @cpu: The per CPU buffer to get the number of overruns from
4210 */
4211 unsigned long
ring_buffer_dropped_events_cpu(struct trace_buffer * buffer,int cpu)4212 ring_buffer_dropped_events_cpu(struct trace_buffer *buffer, int cpu)
4213 {
4214 struct ring_buffer_per_cpu *cpu_buffer;
4215 unsigned long ret;
4216
4217 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4218 return 0;
4219
4220 cpu_buffer = buffer->buffers[cpu];
4221 ret = local_read(&cpu_buffer->dropped_events);
4222
4223 return ret;
4224 }
4225 EXPORT_SYMBOL_GPL(ring_buffer_dropped_events_cpu);
4226
4227 /**
4228 * ring_buffer_read_events_cpu - get the number of events successfully read
4229 * @buffer: The ring buffer
4230 * @cpu: The per CPU buffer to get the number of events read
4231 */
4232 unsigned long
ring_buffer_read_events_cpu(struct trace_buffer * buffer,int cpu)4233 ring_buffer_read_events_cpu(struct trace_buffer *buffer, int cpu)
4234 {
4235 struct ring_buffer_per_cpu *cpu_buffer;
4236
4237 if (!cpumask_test_cpu(cpu, buffer->cpumask))
4238 return 0;
4239
4240 cpu_buffer = buffer->buffers[cpu];
4241 return cpu_buffer->read;
4242 }
4243 EXPORT_SYMBOL_GPL(ring_buffer_read_events_cpu);
4244
4245 /**
4246 * ring_buffer_entries - get the number of entries in a buffer
4247 * @buffer: The ring buffer
4248 *
4249 * Returns the total number of entries in the ring buffer
4250 * (all CPU entries)
4251 */
ring_buffer_entries(struct trace_buffer * buffer)4252 unsigned long ring_buffer_entries(struct trace_buffer *buffer)
4253 {
4254 struct ring_buffer_per_cpu *cpu_buffer;
4255 unsigned long entries = 0;
4256 int cpu;
4257
4258 /* if you care about this being correct, lock the buffer */
4259 for_each_buffer_cpu(buffer, cpu) {
4260 cpu_buffer = buffer->buffers[cpu];
4261 entries += rb_num_of_entries(cpu_buffer);
4262 }
4263
4264 return entries;
4265 }
4266 EXPORT_SYMBOL_GPL(ring_buffer_entries);
4267
4268 /**
4269 * ring_buffer_overruns - get the number of overruns in buffer
4270 * @buffer: The ring buffer
4271 *
4272 * Returns the total number of overruns in the ring buffer
4273 * (all CPU entries)
4274 */
ring_buffer_overruns(struct trace_buffer * buffer)4275 unsigned long ring_buffer_overruns(struct trace_buffer *buffer)
4276 {
4277 struct ring_buffer_per_cpu *cpu_buffer;
4278 unsigned long overruns = 0;
4279 int cpu;
4280
4281 /* if you care about this being correct, lock the buffer */
4282 for_each_buffer_cpu(buffer, cpu) {
4283 cpu_buffer = buffer->buffers[cpu];
4284 overruns += local_read(&cpu_buffer->overrun);
4285 }
4286
4287 return overruns;
4288 }
4289 EXPORT_SYMBOL_GPL(ring_buffer_overruns);
4290
rb_iter_reset(struct ring_buffer_iter * iter)4291 static void rb_iter_reset(struct ring_buffer_iter *iter)
4292 {
4293 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
4294
4295 /* Iterator usage is expected to have record disabled */
4296 iter->head_page = cpu_buffer->reader_page;
4297 iter->head = cpu_buffer->reader_page->read;
4298 iter->next_event = iter->head;
4299
4300 iter->cache_reader_page = iter->head_page;
4301 iter->cache_read = cpu_buffer->read;
4302
4303 if (iter->head) {
4304 iter->read_stamp = cpu_buffer->read_stamp;
4305 iter->page_stamp = cpu_buffer->reader_page->page->time_stamp;
4306 } else {
4307 iter->read_stamp = iter->head_page->page->time_stamp;
4308 iter->page_stamp = iter->read_stamp;
4309 }
4310 }
4311
4312 /**
4313 * ring_buffer_iter_reset - reset an iterator
4314 * @iter: The iterator to reset
4315 *
4316 * Resets the iterator, so that it will start from the beginning
4317 * again.
4318 */
ring_buffer_iter_reset(struct ring_buffer_iter * iter)4319 void ring_buffer_iter_reset(struct ring_buffer_iter *iter)
4320 {
4321 struct ring_buffer_per_cpu *cpu_buffer;
4322 unsigned long flags;
4323
4324 if (!iter)
4325 return;
4326
4327 cpu_buffer = iter->cpu_buffer;
4328
4329 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
4330 rb_iter_reset(iter);
4331 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
4332 }
4333 EXPORT_SYMBOL_GPL(ring_buffer_iter_reset);
4334
4335 /**
4336 * ring_buffer_iter_empty - check if an iterator has no more to read
4337 * @iter: The iterator to check
4338 */
ring_buffer_iter_empty(struct ring_buffer_iter * iter)4339 int ring_buffer_iter_empty(struct ring_buffer_iter *iter)
4340 {
4341 struct ring_buffer_per_cpu *cpu_buffer;
4342 struct buffer_page *reader;
4343 struct buffer_page *head_page;
4344 struct buffer_page *commit_page;
4345 struct buffer_page *curr_commit_page;
4346 unsigned commit;
4347 u64 curr_commit_ts;
4348 u64 commit_ts;
4349
4350 cpu_buffer = iter->cpu_buffer;
4351 reader = cpu_buffer->reader_page;
4352 head_page = cpu_buffer->head_page;
4353 commit_page = cpu_buffer->commit_page;
4354 commit_ts = commit_page->page->time_stamp;
4355
4356 /*
4357 * When the writer goes across pages, it issues a cmpxchg which
4358 * is a mb(), which will synchronize with the rmb here.
4359 * (see rb_tail_page_update())
4360 */
4361 smp_rmb();
4362 commit = rb_page_commit(commit_page);
4363 /* We want to make sure that the commit page doesn't change */
4364 smp_rmb();
4365
4366 /* Make sure commit page didn't change */
4367 curr_commit_page = READ_ONCE(cpu_buffer->commit_page);
4368 curr_commit_ts = READ_ONCE(curr_commit_page->page->time_stamp);
4369
4370 /* If the commit page changed, then there's more data */
4371 if (curr_commit_page != commit_page ||
4372 curr_commit_ts != commit_ts)
4373 return 0;
4374
4375 /* Still racy, as it may return a false positive, but that's OK */
4376 return ((iter->head_page == commit_page && iter->head >= commit) ||
4377 (iter->head_page == reader && commit_page == head_page &&
4378 head_page->read == commit &&
4379 iter->head == rb_page_commit(cpu_buffer->reader_page)));
4380 }
4381 EXPORT_SYMBOL_GPL(ring_buffer_iter_empty);
4382
4383 static void
rb_update_read_stamp(struct ring_buffer_per_cpu * cpu_buffer,struct ring_buffer_event * event)4384 rb_update_read_stamp(struct ring_buffer_per_cpu *cpu_buffer,
4385 struct ring_buffer_event *event)
4386 {
4387 u64 delta;
4388
4389 switch (event->type_len) {
4390 case RINGBUF_TYPE_PADDING:
4391 return;
4392
4393 case RINGBUF_TYPE_TIME_EXTEND:
4394 delta = rb_event_time_stamp(event);
4395 cpu_buffer->read_stamp += delta;
4396 return;
4397
4398 case RINGBUF_TYPE_TIME_STAMP:
4399 delta = rb_event_time_stamp(event);
4400 cpu_buffer->read_stamp = delta;
4401 return;
4402
4403 case RINGBUF_TYPE_DATA:
4404 cpu_buffer->read_stamp += event->time_delta;
4405 return;
4406
4407 default:
4408 RB_WARN_ON(cpu_buffer, 1);
4409 }
4410 return;
4411 }
4412
4413 static void
rb_update_iter_read_stamp(struct ring_buffer_iter * iter,struct ring_buffer_event * event)4414 rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
4415 struct ring_buffer_event *event)
4416 {
4417 u64 delta;
4418
4419 switch (event->type_len) {
4420 case RINGBUF_TYPE_PADDING:
4421 return;
4422
4423 case RINGBUF_TYPE_TIME_EXTEND:
4424 delta = rb_event_time_stamp(event);
4425 iter->read_stamp += delta;
4426 return;
4427
4428 case RINGBUF_TYPE_TIME_STAMP:
4429 delta = rb_event_time_stamp(event);
4430 iter->read_stamp = delta;
4431 return;
4432
4433 case RINGBUF_TYPE_DATA:
4434 iter->read_stamp += event->time_delta;
4435 return;
4436
4437 default:
4438 RB_WARN_ON(iter->cpu_buffer, 1);
4439 }
4440 return;
4441 }
4442
__set_head_page_flag(struct buffer_page * head,int flag)4443 static void __set_head_page_flag(struct buffer_page *head, int flag)
4444 {
4445 struct list_head *prev = head->list.prev;
4446
4447 prev->next = (struct list_head *)(((unsigned long)prev->next & ~RB_FLAG_MASK) | flag);
4448 }
4449
__read_footer_reader_status(struct buffer_page * bpage)4450 static int __read_footer_reader_status(struct buffer_page *bpage)
4451 {
4452 struct rb_ext_page_footer *footer = rb_ext_page_get_footer(bpage->page);
4453
4454 return atomic_read(&footer->reader_status);
4455 }
4456
__read_footer_writer_status(struct buffer_page * bpage)4457 static int __read_footer_writer_status(struct buffer_page *bpage)
4458 {
4459 struct rb_ext_page_footer *footer = rb_ext_page_get_footer(bpage->page);
4460
4461 return atomic_read(&footer->writer_status);
4462 }
4463
4464 static struct buffer_page *
ring_buffer_search_footer(struct buffer_page * start,unsigned long flag)4465 ring_buffer_search_footer(struct buffer_page *start, unsigned long flag)
4466 {
4467 bool search_writer = flag == RB_PAGE_FT_COMMIT;
4468 struct buffer_page *bpage = start;
4469 unsigned long status;
4470 int cnt = 0;
4471 again:
4472 do {
4473 status = search_writer ? __read_footer_writer_status(bpage) :
4474 __read_footer_reader_status(bpage);
4475 if (flag & status)
4476 return bpage;
4477
4478 rb_inc_page(&bpage);
4479 } while (bpage != start);
4480
4481 /*
4482 * There's a chance the writer is in the middle of moving the flag and
4483 * we might not find anything after a first round. Let's try again.
4484 */
4485 if (cnt++ < 3)
4486 goto again;
4487
4488 return NULL;
4489 }
4490
4491 static struct buffer_page *
rb_swap_reader_page_ext(struct ring_buffer_per_cpu * cpu_buffer)4492 noinline rb_swap_reader_page_ext(struct ring_buffer_per_cpu *cpu_buffer)
4493 {
4494 struct buffer_page *new_reader, *new_rb_page, *new_head;
4495 struct rb_ext_page_footer *footer;
4496 unsigned long overrun;
4497
4498 if (cpu_buffer->buffer->ext_cb->swap_reader(cpu_buffer->cpu)) {
4499 WARN_ON(1);
4500 return NULL;
4501 }
4502
4503 new_rb_page = cpu_buffer->reader_page;
4504
4505 /*
4506 * Find what page is the new reader... starting with the latest known
4507 * head.
4508 */
4509 new_reader = ring_buffer_search_footer(cpu_buffer->head_page,
4510 RB_PAGE_FT_READER);
4511 if (!new_reader) {
4512 WARN_ON(1);
4513 return NULL;
4514 }
4515
4516 /* ... and install it into the ring buffer in place of the old head */
4517 rb_list_head_clear(&new_reader->list);
4518 new_rb_page->list.next = new_reader->list.next;
4519 new_rb_page->list.prev = new_reader->list.prev;
4520 new_rb_page->list.next->prev = &new_rb_page->list;
4521 new_rb_page->list.prev->next = &new_rb_page->list;
4522
4523 cpu_buffer->reader_page = new_reader;
4524 cpu_buffer->reader_page->read = 0;
4525
4526 /* Install the new head page */
4527 new_head = new_rb_page;
4528 rb_inc_page(&new_head);
4529 cpu_buffer->head_page = new_head;
4530
4531 /*
4532 * cpu_buffer->pages just needs to point to the buffer, it
4533 * has no specific buffer page to point to. Lets move it out
4534 * of our way so we don't accidentally swap it.
4535 */
4536 cpu_buffer->pages = &new_head->list;
4537
4538 __set_head_page_flag(new_head, RB_PAGE_HEAD);
4539
4540 footer = rb_ext_page_get_footer(new_reader->page);
4541 overrun = footer->stats.overrun;
4542 if (overrun != cpu_buffer->last_overrun) {
4543 cpu_buffer->lost_events = overrun - cpu_buffer->last_overrun;
4544 cpu_buffer->last_overrun = overrun;
4545 }
4546
4547 return new_reader;
4548 }
4549
4550 static struct buffer_page *
rb_swap_reader_page(struct ring_buffer_per_cpu * cpu_buffer)4551 rb_swap_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4552 {
4553 struct buffer_page *reader;
4554 unsigned long overwrite;
4555 int ret;
4556
4557 /*
4558 * Reset the reader page to size zero.
4559 */
4560 local_set(&cpu_buffer->reader_page->write, 0);
4561 local_set(&cpu_buffer->reader_page->entries, 0);
4562 local_set(&cpu_buffer->reader_page->page->commit, 0);
4563 cpu_buffer->reader_page->real_end = 0;
4564
4565 spin:
4566 /*
4567 * Splice the empty reader page into the list around the head.
4568 */
4569 reader = rb_set_head_page(cpu_buffer);
4570 if (!reader)
4571 return NULL;
4572
4573 cpu_buffer->reader_page->list.next = rb_list_head(reader->list.next);
4574 cpu_buffer->reader_page->list.prev = reader->list.prev;
4575
4576 /*
4577 * cpu_buffer->pages just needs to point to the buffer, it
4578 * has no specific buffer page to point to. Lets move it out
4579 * of our way so we don't accidentally swap it.
4580 */
4581 cpu_buffer->pages = reader->list.prev;
4582
4583 /* The reader page will be pointing to the new head */
4584 rb_set_list_to_head(&cpu_buffer->reader_page->list);
4585
4586 /*
4587 * We want to make sure we read the overruns after we set up our
4588 * pointers to the next object. The writer side does a
4589 * cmpxchg to cross pages which acts as the mb on the writer
4590 * side. Note, the reader will constantly fail the swap
4591 * while the writer is updating the pointers, so this
4592 * guarantees that the overwrite recorded here is the one we
4593 * want to compare with the last_overrun.
4594 */
4595 smp_mb();
4596 overwrite = local_read(&(cpu_buffer->overrun));
4597
4598 /*
4599 * Here's the tricky part.
4600 *
4601 * We need to move the pointer past the header page.
4602 * But we can only do that if a writer is not currently
4603 * moving it. The page before the header page has the
4604 * flag bit '1' set if it is pointing to the page we want.
4605 * but if the writer is in the process of moving it
4606 * than it will be '2' or already moved '0'.
4607 */
4608
4609 ret = rb_head_page_replace(reader, cpu_buffer->reader_page);
4610
4611 /*
4612 * If we did not convert it, then we must try again.
4613 */
4614 if (!ret)
4615 goto spin;
4616
4617 /*
4618 * Yay! We succeeded in replacing the page.
4619 *
4620 * Now make the new head point back to the reader page.
4621 */
4622 rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
4623 rb_inc_page(&cpu_buffer->head_page);
4624
4625 local_inc(&cpu_buffer->pages_read);
4626
4627 /* Finally update the reader page to the new head */
4628 cpu_buffer->reader_page = reader;
4629 cpu_buffer->reader_page->read = 0;
4630
4631 if (overwrite != cpu_buffer->last_overrun) {
4632 cpu_buffer->lost_events = overwrite - cpu_buffer->last_overrun;
4633 cpu_buffer->last_overrun = overwrite;
4634 }
4635
4636 return reader;
4637 }
4638
4639 static struct buffer_page *
rb_get_reader_page(struct ring_buffer_per_cpu * cpu_buffer)4640 rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
4641 {
4642 struct buffer_page *reader = NULL;
4643 unsigned long flags;
4644 int nr_loops = 0;
4645 unsigned int page_size;
4646
4647 local_irq_save(flags);
4648 arch_spin_lock(&cpu_buffer->lock);
4649
4650 again:
4651 /*
4652 * This should normally only loop twice. But because the
4653 * start of the reader inserts an empty page, it causes
4654 * a case where we will loop three times. There should be no
4655 * reason to loop four times (that I know of).
4656 */
4657 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 3)) {
4658 reader = NULL;
4659 goto out;
4660 }
4661
4662 reader = cpu_buffer->reader_page;
4663
4664 /* If there's more to read, return this page */
4665 if (cpu_buffer->reader_page->read < rb_page_size(reader))
4666 goto out;
4667
4668 page_size = rb_page_size(reader);
4669 /* Never should we have an index greater than the size */
4670 if (RB_WARN_ON(cpu_buffer,
4671 cpu_buffer->reader_page->read > page_size))
4672 goto out;
4673
4674 /* check if we caught up to the tail */
4675 reader = NULL;
4676 if (cpu_buffer->commit_page == cpu_buffer->reader_page)
4677 goto out;
4678
4679 /* Don't bother swapping if the ring buffer is empty */
4680 if (rb_num_of_entries(cpu_buffer) == 0)
4681 goto out;
4682
4683 if (rb_has_ext_writer(cpu_buffer))
4684 reader = rb_swap_reader_page_ext(cpu_buffer);
4685 else
4686 reader = rb_swap_reader_page(cpu_buffer);
4687
4688 if (reader)
4689 goto again;
4690
4691 out:
4692 /* Update the read_stamp on the first event */
4693 if (reader && reader->read == 0)
4694 cpu_buffer->read_stamp = reader->page->time_stamp;
4695
4696 arch_spin_unlock(&cpu_buffer->lock);
4697 local_irq_restore(flags);
4698
4699 /*
4700 * The writer has preempt disable, wait for it. But not forever
4701 * Although, 1 second is pretty much "forever"
4702 */
4703 #define USECS_WAIT 1000000
4704 for (nr_loops = 0; nr_loops < USECS_WAIT; nr_loops++) {
4705 /* If the write is past the end of page, a writer is still updating it */
4706 if (likely(!reader || rb_page_write(reader) <= BUF_PAGE_SIZE))
4707 break;
4708
4709 udelay(1);
4710
4711 /* Get the latest version of the reader write value */
4712 smp_rmb();
4713 }
4714
4715 /* The writer is not moving forward? Something is wrong */
4716 if (RB_WARN_ON(cpu_buffer, nr_loops == USECS_WAIT))
4717 reader = NULL;
4718
4719 /*
4720 * Make sure we see any padding after the write update
4721 * (see rb_reset_tail()).
4722 *
4723 * In addition, a writer may be writing on the reader page
4724 * if the page has not been fully filled, so the read barrier
4725 * is also needed to make sure we see the content of what is
4726 * committed by the writer (see rb_set_commit_to_write()).
4727 */
4728 smp_rmb();
4729
4730
4731 return reader;
4732 }
4733
rb_advance_reader(struct ring_buffer_per_cpu * cpu_buffer)4734 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
4735 {
4736 struct ring_buffer_event *event;
4737 struct buffer_page *reader;
4738 unsigned length;
4739
4740 reader = rb_get_reader_page(cpu_buffer);
4741
4742 /* This function should not be called when buffer is empty */
4743 if (RB_WARN_ON(cpu_buffer, !reader))
4744 return;
4745
4746 event = rb_reader_event(cpu_buffer);
4747
4748 if (event->type_len <= RINGBUF_TYPE_DATA_TYPE_LEN_MAX)
4749 cpu_buffer->read++;
4750
4751 rb_update_read_stamp(cpu_buffer, event);
4752
4753 length = rb_event_length(event);
4754 cpu_buffer->reader_page->read += length;
4755 cpu_buffer->read_bytes += length;
4756 }
4757
rb_advance_iter(struct ring_buffer_iter * iter)4758 static void rb_advance_iter(struct ring_buffer_iter *iter)
4759 {
4760 struct ring_buffer_per_cpu *cpu_buffer;
4761
4762 cpu_buffer = iter->cpu_buffer;
4763
4764 /* If head == next_event then we need to jump to the next event */
4765 if (iter->head == iter->next_event) {
4766 /* If the event gets overwritten again, there's nothing to do */
4767 if (rb_iter_head_event(iter) == NULL)
4768 return;
4769 }
4770
4771 iter->head = iter->next_event;
4772
4773 /*
4774 * Check if we are at the end of the buffer.
4775 */
4776 if (iter->next_event >= rb_page_size(iter->head_page)) {
4777 /* discarded commits can make the page empty */
4778 if (iter->head_page == cpu_buffer->commit_page)
4779 return;
4780 rb_inc_iter(iter);
4781 return;
4782 }
4783
4784 rb_update_iter_read_stamp(iter, iter->event);
4785 }
4786
rb_lost_events(struct ring_buffer_per_cpu * cpu_buffer)4787 static int rb_lost_events(struct ring_buffer_per_cpu *cpu_buffer)
4788 {
4789 return cpu_buffer->lost_events;
4790 }
4791
4792 static struct ring_buffer_event *
rb_buffer_peek(struct ring_buffer_per_cpu * cpu_buffer,u64 * ts,unsigned long * lost_events)4793 rb_buffer_peek(struct ring_buffer_per_cpu *cpu_buffer, u64 *ts,
4794 unsigned long *lost_events)
4795 {
4796 struct ring_buffer_event *event;
4797 struct buffer_page *reader;
4798 int nr_loops = 0;
4799
4800 if (ts)
4801 *ts = 0;
4802 again:
4803 /*
4804 * We repeat when a time extend is encountered.
4805 * Since the time extend is always attached to a data event,
4806 * we should never loop more than once.
4807 * (We never hit the following condition more than twice).
4808 */
4809 if (RB_WARN_ON(cpu_buffer, ++nr_loops > 2))
4810 return NULL;
4811
4812 reader = rb_get_reader_page(cpu_buffer);
4813 if (!reader)
4814 return NULL;
4815
4816 event = rb_reader_event(cpu_buffer);
4817
4818 switch (event->type_len) {
4819 case RINGBUF_TYPE_PADDING:
4820 if (rb_null_event(event))
4821 RB_WARN_ON(cpu_buffer, 1);
4822 /*
4823 * Because the writer could be discarding every
4824 * event it creates (which would probably be bad)
4825 * if we were to go back to "again" then we may never
4826 * catch up, and will trigger the warn on, or lock
4827 * the box. Return the padding, and we will release
4828 * the current locks, and try again.
4829 */
4830 return event;
4831
4832 case RINGBUF_TYPE_TIME_EXTEND:
4833 /* Internal data, OK to advance */
4834 rb_advance_reader(cpu_buffer);
4835 goto again;
4836
4837 case RINGBUF_TYPE_TIME_STAMP:
4838 if (ts) {
4839 *ts = rb_event_time_stamp(event);
4840 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4841 cpu_buffer->cpu, ts);
4842 }
4843 /* Internal data, OK to advance */
4844 rb_advance_reader(cpu_buffer);
4845 goto again;
4846
4847 case RINGBUF_TYPE_DATA:
4848 if (ts && !(*ts)) {
4849 *ts = cpu_buffer->read_stamp + event->time_delta;
4850 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4851 cpu_buffer->cpu, ts);
4852 }
4853 if (lost_events)
4854 *lost_events = rb_lost_events(cpu_buffer);
4855 return event;
4856
4857 default:
4858 RB_WARN_ON(cpu_buffer, 1);
4859 }
4860
4861 return NULL;
4862 }
4863 EXPORT_SYMBOL_GPL(ring_buffer_peek);
4864
4865 static struct ring_buffer_event *
rb_iter_peek(struct ring_buffer_iter * iter,u64 * ts)4866 rb_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
4867 {
4868 struct trace_buffer *buffer;
4869 struct ring_buffer_per_cpu *cpu_buffer;
4870 struct ring_buffer_event *event;
4871 int nr_loops = 0;
4872
4873 if (ts)
4874 *ts = 0;
4875
4876 cpu_buffer = iter->cpu_buffer;
4877 buffer = cpu_buffer->buffer;
4878
4879 /*
4880 * Check if someone performed a consuming read to
4881 * the buffer. A consuming read invalidates the iterator
4882 * and we need to reset the iterator in this case.
4883 */
4884 if (unlikely(iter->cache_read != cpu_buffer->read ||
4885 iter->cache_reader_page != cpu_buffer->reader_page))
4886 rb_iter_reset(iter);
4887
4888 again:
4889 if (ring_buffer_iter_empty(iter))
4890 return NULL;
4891
4892 /*
4893 * As the writer can mess with what the iterator is trying
4894 * to read, just give up if we fail to get an event after
4895 * three tries. The iterator is not as reliable when reading
4896 * the ring buffer with an active write as the consumer is.
4897 * Do not warn if the three failures is reached.
4898 */
4899 if (++nr_loops > 3)
4900 return NULL;
4901
4902 if (rb_per_cpu_empty(cpu_buffer))
4903 return NULL;
4904
4905 if (iter->head >= rb_page_size(iter->head_page)) {
4906 rb_inc_iter(iter);
4907 goto again;
4908 }
4909
4910 event = rb_iter_head_event(iter);
4911 if (!event)
4912 goto again;
4913
4914 switch (event->type_len) {
4915 case RINGBUF_TYPE_PADDING:
4916 if (rb_null_event(event)) {
4917 rb_inc_iter(iter);
4918 goto again;
4919 }
4920 rb_advance_iter(iter);
4921 return event;
4922
4923 case RINGBUF_TYPE_TIME_EXTEND:
4924 /* Internal data, OK to advance */
4925 rb_advance_iter(iter);
4926 goto again;
4927
4928 case RINGBUF_TYPE_TIME_STAMP:
4929 if (ts) {
4930 *ts = rb_event_time_stamp(event);
4931 ring_buffer_normalize_time_stamp(cpu_buffer->buffer,
4932 cpu_buffer->cpu, ts);
4933 }
4934 /* Internal data, OK to advance */
4935 rb_advance_iter(iter);
4936 goto again;
4937
4938 case RINGBUF_TYPE_DATA:
4939 if (ts && !(*ts)) {
4940 *ts = iter->read_stamp + event->time_delta;
4941 ring_buffer_normalize_time_stamp(buffer,
4942 cpu_buffer->cpu, ts);
4943 }
4944 return event;
4945
4946 default:
4947 RB_WARN_ON(cpu_buffer, 1);
4948 }
4949
4950 return NULL;
4951 }
4952 EXPORT_SYMBOL_GPL(ring_buffer_iter_peek);
4953
rb_reader_lock(struct ring_buffer_per_cpu * cpu_buffer)4954 static inline bool rb_reader_lock(struct ring_buffer_per_cpu *cpu_buffer)
4955 {
4956 if (likely(!in_nmi())) {
4957 raw_spin_lock(&cpu_buffer->reader_lock);
4958 return true;
4959 }
4960
4961 /*
4962 * If an NMI die dumps out the content of the ring buffer
4963 * trylock must be used to prevent a deadlock if the NMI
4964 * preempted a task that holds the ring buffer locks. If
4965 * we get the lock then all is fine, if not, then continue
4966 * to do the read, but this can corrupt the ring buffer,
4967 * so it must be permanently disabled from future writes.
4968 * Reading from NMI is a oneshot deal.
4969 */
4970 if (raw_spin_trylock(&cpu_buffer->reader_lock))
4971 return true;
4972
4973 /* Continue without locking, but disable the ring buffer */
4974 atomic_inc(&cpu_buffer->record_disabled);
4975 return false;
4976 }
4977
4978 static inline void
rb_reader_unlock(struct ring_buffer_per_cpu * cpu_buffer,bool locked)4979 rb_reader_unlock(struct ring_buffer_per_cpu *cpu_buffer, bool locked)
4980 {
4981 if (likely(locked))
4982 raw_spin_unlock(&cpu_buffer->reader_lock);
4983 return;
4984 }
4985
4986 /**
4987 * ring_buffer_peek - peek at the next event to be read
4988 * @buffer: The ring buffer to read
4989 * @cpu: The cpu to peak at
4990 * @ts: The timestamp counter of this event.
4991 * @lost_events: a variable to store if events were lost (may be NULL)
4992 *
4993 * This will return the event that will be read next, but does
4994 * not consume the data.
4995 */
4996 struct ring_buffer_event *
ring_buffer_peek(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)4997 ring_buffer_peek(struct trace_buffer *buffer, int cpu, u64 *ts,
4998 unsigned long *lost_events)
4999 {
5000 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5001 struct ring_buffer_event *event;
5002 unsigned long flags;
5003 bool dolock;
5004
5005 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5006 return NULL;
5007
5008 again:
5009 local_irq_save(flags);
5010 dolock = rb_reader_lock(cpu_buffer);
5011 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
5012 if (event && event->type_len == RINGBUF_TYPE_PADDING)
5013 rb_advance_reader(cpu_buffer);
5014 rb_reader_unlock(cpu_buffer, dolock);
5015 local_irq_restore(flags);
5016
5017 if (event && event->type_len == RINGBUF_TYPE_PADDING)
5018 goto again;
5019
5020 return event;
5021 }
5022
5023 /** ring_buffer_iter_dropped - report if there are dropped events
5024 * @iter: The ring buffer iterator
5025 *
5026 * Returns true if there was dropped events since the last peek.
5027 */
ring_buffer_iter_dropped(struct ring_buffer_iter * iter)5028 bool ring_buffer_iter_dropped(struct ring_buffer_iter *iter)
5029 {
5030 bool ret = iter->missed_events != 0;
5031
5032 iter->missed_events = 0;
5033 return ret;
5034 }
5035 EXPORT_SYMBOL_GPL(ring_buffer_iter_dropped);
5036
5037 /**
5038 * ring_buffer_iter_peek - peek at the next event to be read
5039 * @iter: The ring buffer iterator
5040 * @ts: The timestamp counter of this event.
5041 *
5042 * This will return the event that will be read next, but does
5043 * not increment the iterator.
5044 */
5045 struct ring_buffer_event *
ring_buffer_iter_peek(struct ring_buffer_iter * iter,u64 * ts)5046 ring_buffer_iter_peek(struct ring_buffer_iter *iter, u64 *ts)
5047 {
5048 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5049 struct ring_buffer_event *event;
5050 unsigned long flags;
5051
5052 again:
5053 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5054 event = rb_iter_peek(iter, ts);
5055 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5056
5057 if (event && event->type_len == RINGBUF_TYPE_PADDING)
5058 goto again;
5059
5060 return event;
5061 }
5062
5063 /**
5064 * ring_buffer_consume - return an event and consume it
5065 * @buffer: The ring buffer to get the next event from
5066 * @cpu: the cpu to read the buffer from
5067 * @ts: a variable to store the timestamp (may be NULL)
5068 * @lost_events: a variable to store if events were lost (may be NULL)
5069 *
5070 * Returns the next event in the ring buffer, and that event is consumed.
5071 * Meaning, that sequential reads will keep returning a different event,
5072 * and eventually empty the ring buffer if the producer is slower.
5073 */
5074 struct ring_buffer_event *
ring_buffer_consume(struct trace_buffer * buffer,int cpu,u64 * ts,unsigned long * lost_events)5075 ring_buffer_consume(struct trace_buffer *buffer, int cpu, u64 *ts,
5076 unsigned long *lost_events)
5077 {
5078 struct ring_buffer_per_cpu *cpu_buffer;
5079 struct ring_buffer_event *event = NULL;
5080 unsigned long flags;
5081 bool dolock;
5082
5083 again:
5084 /* might be called in atomic */
5085 preempt_disable();
5086
5087 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5088 goto out;
5089
5090 cpu_buffer = buffer->buffers[cpu];
5091 local_irq_save(flags);
5092 dolock = rb_reader_lock(cpu_buffer);
5093
5094 event = rb_buffer_peek(cpu_buffer, ts, lost_events);
5095 if (event) {
5096 cpu_buffer->lost_events = 0;
5097 rb_advance_reader(cpu_buffer);
5098 }
5099
5100 rb_reader_unlock(cpu_buffer, dolock);
5101 local_irq_restore(flags);
5102
5103 out:
5104 preempt_enable();
5105
5106 if (event && event->type_len == RINGBUF_TYPE_PADDING)
5107 goto again;
5108
5109 return event;
5110 }
5111 EXPORT_SYMBOL_GPL(ring_buffer_consume);
5112
ring_buffer_update_view(struct ring_buffer_per_cpu * cpu_buffer)5113 static void ring_buffer_update_view(struct ring_buffer_per_cpu *cpu_buffer)
5114 {
5115 struct rb_ext_page_footer *footer;
5116 struct buffer_page *bpage;
5117
5118 if (!rb_has_ext_writer(cpu_buffer))
5119 return;
5120
5121 raw_spin_lock_irq(&cpu_buffer->reader_lock);
5122 arch_spin_lock(&cpu_buffer->lock);
5123
5124 cpu_buffer->buffer->ext_cb->update_footers(cpu_buffer->cpu);
5125
5126 bpage = cpu_buffer->reader_page;
5127 footer = rb_ext_page_get_footer(bpage->page);
5128
5129 local_set(&cpu_buffer->entries, footer->stats.entries);
5130 local_set(&cpu_buffer->pages_touched, footer->stats.pages_touched);
5131 local_set(&cpu_buffer->overrun, footer->stats.overrun);
5132
5133 /* Update the commit page */
5134 bpage = ring_buffer_search_footer(cpu_buffer->commit_page,
5135 RB_PAGE_FT_COMMIT);
5136 if (!bpage) {
5137 WARN_ON(1);
5138 goto unlock;
5139 }
5140 cpu_buffer->commit_page = bpage;
5141
5142 /* Update the head page */
5143 bpage = ring_buffer_search_footer(cpu_buffer->head_page,
5144 RB_PAGE_FT_HEAD);
5145 if (!bpage) {
5146 WARN_ON(1);
5147 goto unlock;
5148 }
5149
5150 /* Reset the previous RB_PAGE_HEAD flag */
5151 __set_head_page_flag(cpu_buffer->head_page, RB_PAGE_NORMAL);
5152
5153 /* Set RB_PAGE_HEAD flag pointing to the new head */
5154 __set_head_page_flag(bpage, RB_PAGE_HEAD);
5155
5156 cpu_buffer->reader_page->list.next = &cpu_buffer->head_page->list;
5157
5158 cpu_buffer->head_page = bpage;
5159
5160 unlock:
5161 arch_spin_unlock(&cpu_buffer->lock);
5162 raw_spin_unlock_irq(&cpu_buffer->reader_lock);
5163 }
5164
ring_buffer_poke(struct trace_buffer * buffer,int cpu)5165 int ring_buffer_poke(struct trace_buffer *buffer, int cpu)
5166 {
5167 struct ring_buffer_per_cpu *cpu_buffer;
5168
5169 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5170 return -EINVAL;
5171
5172 cpu_buffer = buffer->buffers[cpu];
5173
5174 ring_buffer_update_view(cpu_buffer);
5175 rb_wakeups(buffer, cpu_buffer);
5176
5177 return 0;
5178 }
5179
5180 /**
5181 * ring_buffer_read_prepare - Prepare for a non consuming read of the buffer
5182 * @buffer: The ring buffer to read from
5183 * @cpu: The cpu buffer to iterate over
5184 * @flags: gfp flags to use for memory allocation
5185 *
5186 * This performs the initial preparations necessary to iterate
5187 * through the buffer. Memory is allocated, buffer recording
5188 * is disabled, and the iterator pointer is returned to the caller.
5189 *
5190 * Disabling buffer recording prevents the reading from being
5191 * corrupted. This is not a consuming read, so a producer is not
5192 * expected.
5193 *
5194 * After a sequence of ring_buffer_read_prepare calls, the user is
5195 * expected to make at least one call to ring_buffer_read_prepare_sync.
5196 * Afterwards, ring_buffer_read_start is invoked to get things going
5197 * for real.
5198 *
5199 * This overall must be paired with ring_buffer_read_finish.
5200 */
5201 struct ring_buffer_iter *
ring_buffer_read_prepare(struct trace_buffer * buffer,int cpu,gfp_t flags)5202 ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
5203 {
5204 struct ring_buffer_per_cpu *cpu_buffer;
5205 struct ring_buffer_iter *iter;
5206
5207 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5208 return NULL;
5209
5210 iter = kzalloc(sizeof(*iter), flags);
5211 if (!iter)
5212 return NULL;
5213
5214 /* Holds the entire event: data and meta data */
5215 iter->event = kmalloc(BUF_PAGE_SIZE, flags);
5216 if (!iter->event) {
5217 kfree(iter);
5218 return NULL;
5219 }
5220
5221 cpu_buffer = buffer->buffers[cpu];
5222
5223 iter->cpu_buffer = cpu_buffer;
5224
5225 atomic_inc(&cpu_buffer->resize_disabled);
5226
5227 ring_buffer_update_view(cpu_buffer);
5228
5229 return iter;
5230 }
5231 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare);
5232
5233 /**
5234 * ring_buffer_read_prepare_sync - Synchronize a set of prepare calls
5235 *
5236 * All previously invoked ring_buffer_read_prepare calls to prepare
5237 * iterators will be synchronized. Afterwards, read_buffer_read_start
5238 * calls on those iterators are allowed.
5239 */
5240 void
ring_buffer_read_prepare_sync(void)5241 ring_buffer_read_prepare_sync(void)
5242 {
5243 synchronize_rcu();
5244 }
5245 EXPORT_SYMBOL_GPL(ring_buffer_read_prepare_sync);
5246
5247 /**
5248 * ring_buffer_read_start - start a non consuming read of the buffer
5249 * @iter: The iterator returned by ring_buffer_read_prepare
5250 *
5251 * This finalizes the startup of an iteration through the buffer.
5252 * The iterator comes from a call to ring_buffer_read_prepare and
5253 * an intervening ring_buffer_read_prepare_sync must have been
5254 * performed.
5255 *
5256 * Must be paired with ring_buffer_read_finish.
5257 */
5258 void
ring_buffer_read_start(struct ring_buffer_iter * iter)5259 ring_buffer_read_start(struct ring_buffer_iter *iter)
5260 {
5261 struct ring_buffer_per_cpu *cpu_buffer;
5262 unsigned long flags;
5263
5264 if (!iter)
5265 return;
5266
5267 cpu_buffer = iter->cpu_buffer;
5268
5269 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5270 arch_spin_lock(&cpu_buffer->lock);
5271 rb_iter_reset(iter);
5272 arch_spin_unlock(&cpu_buffer->lock);
5273 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5274 }
5275 EXPORT_SYMBOL_GPL(ring_buffer_read_start);
5276
5277 /**
5278 * ring_buffer_read_finish - finish reading the iterator of the buffer
5279 * @iter: The iterator retrieved by ring_buffer_start
5280 *
5281 * This re-enables the recording to the buffer, and frees the
5282 * iterator.
5283 */
5284 void
ring_buffer_read_finish(struct ring_buffer_iter * iter)5285 ring_buffer_read_finish(struct ring_buffer_iter *iter)
5286 {
5287 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5288 unsigned long flags;
5289
5290 /*
5291 * Ring buffer is disabled from recording, here's a good place
5292 * to check the integrity of the ring buffer.
5293 * Must prevent readers from trying to read, as the check
5294 * clears the HEAD page and readers require it.
5295 */
5296 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5297 rb_check_pages(cpu_buffer);
5298 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5299
5300 atomic_dec(&cpu_buffer->resize_disabled);
5301 kfree(iter->event);
5302 kfree(iter);
5303 }
5304 EXPORT_SYMBOL_GPL(ring_buffer_read_finish);
5305
5306 /**
5307 * ring_buffer_iter_advance - advance the iterator to the next location
5308 * @iter: The ring buffer iterator
5309 *
5310 * Move the location of the iterator such that the next read will
5311 * be the next location of the iterator.
5312 */
ring_buffer_iter_advance(struct ring_buffer_iter * iter)5313 void ring_buffer_iter_advance(struct ring_buffer_iter *iter)
5314 {
5315 struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
5316 unsigned long flags;
5317
5318 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5319
5320 rb_advance_iter(iter);
5321
5322 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5323 }
5324 EXPORT_SYMBOL_GPL(ring_buffer_iter_advance);
5325
5326 /**
5327 * ring_buffer_size - return the size of the ring buffer (in bytes)
5328 * @buffer: The ring buffer.
5329 * @cpu: The CPU to get ring buffer size from.
5330 */
ring_buffer_size(struct trace_buffer * buffer,int cpu)5331 unsigned long ring_buffer_size(struct trace_buffer *buffer, int cpu)
5332 {
5333 /*
5334 * Earlier, this method returned
5335 * BUF_PAGE_SIZE * buffer->nr_pages
5336 * Since the nr_pages field is now removed, we have converted this to
5337 * return the per cpu buffer value.
5338 */
5339 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5340 return 0;
5341
5342 return BUF_PAGE_SIZE * buffer->buffers[cpu]->nr_pages;
5343 }
5344 EXPORT_SYMBOL_GPL(ring_buffer_size);
5345
rb_clear_buffer_page(struct buffer_page * page)5346 static void rb_clear_buffer_page(struct buffer_page *page)
5347 {
5348 local_set(&page->write, 0);
5349 local_set(&page->entries, 0);
5350 rb_init_page(page->page);
5351 page->read = 0;
5352 }
5353
5354 static void
rb_reset_cpu(struct ring_buffer_per_cpu * cpu_buffer)5355 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
5356 {
5357 struct buffer_page *page;
5358
5359 rb_head_page_deactivate(cpu_buffer);
5360
5361 cpu_buffer->head_page
5362 = list_entry(cpu_buffer->pages, struct buffer_page, list);
5363 rb_clear_buffer_page(cpu_buffer->head_page);
5364 list_for_each_entry(page, cpu_buffer->pages, list) {
5365 rb_clear_buffer_page(page);
5366 }
5367
5368 cpu_buffer->tail_page = cpu_buffer->head_page;
5369 cpu_buffer->commit_page = cpu_buffer->head_page;
5370
5371 INIT_LIST_HEAD(&cpu_buffer->reader_page->list);
5372 INIT_LIST_HEAD(&cpu_buffer->new_pages);
5373 rb_clear_buffer_page(cpu_buffer->reader_page);
5374
5375 local_set(&cpu_buffer->entries_bytes, 0);
5376 local_set(&cpu_buffer->overrun, 0);
5377 local_set(&cpu_buffer->commit_overrun, 0);
5378 local_set(&cpu_buffer->dropped_events, 0);
5379 local_set(&cpu_buffer->entries, 0);
5380 local_set(&cpu_buffer->committing, 0);
5381 local_set(&cpu_buffer->commits, 0);
5382 local_set(&cpu_buffer->pages_touched, 0);
5383 local_set(&cpu_buffer->pages_lost, 0);
5384 local_set(&cpu_buffer->pages_read, 0);
5385 cpu_buffer->last_pages_touch = 0;
5386 cpu_buffer->shortest_full = 0;
5387 cpu_buffer->read = 0;
5388 cpu_buffer->read_bytes = 0;
5389
5390 rb_time_set(&cpu_buffer->write_stamp, 0);
5391 rb_time_set(&cpu_buffer->before_stamp, 0);
5392
5393 memset(cpu_buffer->event_stamp, 0, sizeof(cpu_buffer->event_stamp));
5394
5395 cpu_buffer->lost_events = 0;
5396 cpu_buffer->last_overrun = 0;
5397
5398 rb_head_page_activate(cpu_buffer);
5399 }
5400
5401 /* Must have disabled the cpu buffer then done a synchronize_rcu */
reset_disabled_cpu_buffer(struct ring_buffer_per_cpu * cpu_buffer)5402 static void reset_disabled_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
5403 {
5404 unsigned long flags;
5405
5406 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5407
5408 if (RB_WARN_ON(cpu_buffer, local_read(&cpu_buffer->committing)))
5409 goto out;
5410
5411 arch_spin_lock(&cpu_buffer->lock);
5412
5413 rb_reset_cpu(cpu_buffer);
5414
5415 arch_spin_unlock(&cpu_buffer->lock);
5416
5417 out:
5418 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5419 }
5420
5421 /**
5422 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5423 * @buffer: The ring buffer to reset a per cpu buffer of
5424 * @cpu: The CPU buffer to be reset
5425 */
ring_buffer_reset_cpu(struct trace_buffer * buffer,int cpu)5426 void ring_buffer_reset_cpu(struct trace_buffer *buffer, int cpu)
5427 {
5428 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5429
5430 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5431 return;
5432
5433 /* prevent another thread from changing buffer sizes */
5434 mutex_lock(&buffer->mutex);
5435
5436 atomic_inc(&cpu_buffer->resize_disabled);
5437 atomic_inc(&cpu_buffer->record_disabled);
5438
5439 /* Make sure all commits have finished */
5440 synchronize_rcu();
5441
5442 reset_disabled_cpu_buffer(cpu_buffer);
5443
5444 atomic_dec(&cpu_buffer->record_disabled);
5445 atomic_dec(&cpu_buffer->resize_disabled);
5446
5447 mutex_unlock(&buffer->mutex);
5448 }
5449 EXPORT_SYMBOL_GPL(ring_buffer_reset_cpu);
5450
5451 /* Flag to ensure proper resetting of atomic variables */
5452 #define RESET_BIT (1 << 30)
5453
5454 /**
5455 * ring_buffer_reset_cpu - reset a ring buffer per CPU buffer
5456 * @buffer: The ring buffer to reset a per cpu buffer of
5457 * @cpu: The CPU buffer to be reset
5458 */
ring_buffer_reset_online_cpus(struct trace_buffer * buffer)5459 void ring_buffer_reset_online_cpus(struct trace_buffer *buffer)
5460 {
5461 struct ring_buffer_per_cpu *cpu_buffer;
5462 int cpu;
5463
5464 /* prevent another thread from changing buffer sizes */
5465 mutex_lock(&buffer->mutex);
5466
5467 for_each_online_buffer_cpu(buffer, cpu) {
5468 cpu_buffer = buffer->buffers[cpu];
5469
5470 atomic_add(RESET_BIT, &cpu_buffer->resize_disabled);
5471 atomic_inc(&cpu_buffer->record_disabled);
5472 }
5473
5474 /* Make sure all commits have finished */
5475 synchronize_rcu();
5476
5477 for_each_buffer_cpu(buffer, cpu) {
5478 cpu_buffer = buffer->buffers[cpu];
5479
5480 /*
5481 * If a CPU came online during the synchronize_rcu(), then
5482 * ignore it.
5483 */
5484 if (!(atomic_read(&cpu_buffer->resize_disabled) & RESET_BIT))
5485 continue;
5486
5487 reset_disabled_cpu_buffer(cpu_buffer);
5488
5489 atomic_dec(&cpu_buffer->record_disabled);
5490 atomic_sub(RESET_BIT, &cpu_buffer->resize_disabled);
5491 }
5492
5493 mutex_unlock(&buffer->mutex);
5494 }
5495
5496 /**
5497 * ring_buffer_reset - reset a ring buffer
5498 * @buffer: The ring buffer to reset all cpu buffers
5499 */
ring_buffer_reset(struct trace_buffer * buffer)5500 void ring_buffer_reset(struct trace_buffer *buffer)
5501 {
5502 struct ring_buffer_per_cpu *cpu_buffer;
5503 int cpu;
5504
5505 /* prevent another thread from changing buffer sizes */
5506 mutex_lock(&buffer->mutex);
5507
5508 for_each_buffer_cpu(buffer, cpu) {
5509 cpu_buffer = buffer->buffers[cpu];
5510
5511 atomic_inc(&cpu_buffer->resize_disabled);
5512 atomic_inc(&cpu_buffer->record_disabled);
5513 }
5514
5515 /* Make sure all commits have finished */
5516 synchronize_rcu();
5517
5518 for_each_buffer_cpu(buffer, cpu) {
5519 cpu_buffer = buffer->buffers[cpu];
5520
5521 reset_disabled_cpu_buffer(cpu_buffer);
5522
5523 atomic_dec(&cpu_buffer->record_disabled);
5524 atomic_dec(&cpu_buffer->resize_disabled);
5525 }
5526
5527 mutex_unlock(&buffer->mutex);
5528 }
5529 EXPORT_SYMBOL_GPL(ring_buffer_reset);
5530
5531 /**
5532 * rind_buffer_empty - is the ring buffer empty?
5533 * @buffer: The ring buffer to test
5534 */
ring_buffer_empty(struct trace_buffer * buffer)5535 bool ring_buffer_empty(struct trace_buffer *buffer)
5536 {
5537 struct ring_buffer_per_cpu *cpu_buffer;
5538 unsigned long flags;
5539 bool dolock;
5540 int cpu;
5541 int ret;
5542
5543 /* yes this is racy, but if you don't like the race, lock the buffer */
5544 for_each_buffer_cpu(buffer, cpu) {
5545 cpu_buffer = buffer->buffers[cpu];
5546 local_irq_save(flags);
5547 dolock = rb_reader_lock(cpu_buffer);
5548 ret = rb_per_cpu_empty(cpu_buffer);
5549 rb_reader_unlock(cpu_buffer, dolock);
5550 local_irq_restore(flags);
5551
5552 if (!ret)
5553 return false;
5554 }
5555
5556 return true;
5557 }
5558 EXPORT_SYMBOL_GPL(ring_buffer_empty);
5559
5560 /**
5561 * ring_buffer_empty_cpu - is a cpu buffer of a ring buffer empty?
5562 * @buffer: The ring buffer
5563 * @cpu: The CPU buffer to test
5564 */
ring_buffer_empty_cpu(struct trace_buffer * buffer,int cpu)5565 bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
5566 {
5567 struct ring_buffer_per_cpu *cpu_buffer;
5568 unsigned long flags;
5569 bool dolock;
5570 int ret;
5571
5572 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5573 return true;
5574
5575 cpu_buffer = buffer->buffers[cpu];
5576 local_irq_save(flags);
5577 dolock = rb_reader_lock(cpu_buffer);
5578 ret = rb_per_cpu_empty(cpu_buffer);
5579 rb_reader_unlock(cpu_buffer, dolock);
5580 local_irq_restore(flags);
5581
5582 return ret;
5583 }
5584 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
5585
5586 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
5587 /**
5588 * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
5589 * @buffer_a: One buffer to swap with
5590 * @buffer_b: The other buffer to swap with
5591 * @cpu: the CPU of the buffers to swap
5592 *
5593 * This function is useful for tracers that want to take a "snapshot"
5594 * of a CPU buffer and has another back up buffer lying around.
5595 * it is expected that the tracer handles the cpu buffer not being
5596 * used at the moment.
5597 */
ring_buffer_swap_cpu(struct trace_buffer * buffer_a,struct trace_buffer * buffer_b,int cpu)5598 int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
5599 struct trace_buffer *buffer_b, int cpu)
5600 {
5601 struct ring_buffer_per_cpu *cpu_buffer_a;
5602 struct ring_buffer_per_cpu *cpu_buffer_b;
5603 int ret = -EINVAL;
5604
5605 if (unlikely(has_ext_writer(buffer_a) || has_ext_writer(buffer_b)))
5606 return -EINVAL;
5607
5608 if (!cpumask_test_cpu(cpu, buffer_a->cpumask) ||
5609 !cpumask_test_cpu(cpu, buffer_b->cpumask))
5610 goto out;
5611
5612 cpu_buffer_a = buffer_a->buffers[cpu];
5613 cpu_buffer_b = buffer_b->buffers[cpu];
5614
5615 /* At least make sure the two buffers are somewhat the same */
5616 if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
5617 goto out;
5618
5619 ret = -EAGAIN;
5620
5621 if (atomic_read(&buffer_a->record_disabled))
5622 goto out;
5623
5624 if (atomic_read(&buffer_b->record_disabled))
5625 goto out;
5626
5627 if (atomic_read(&cpu_buffer_a->record_disabled))
5628 goto out;
5629
5630 if (atomic_read(&cpu_buffer_b->record_disabled))
5631 goto out;
5632
5633 /*
5634 * We can't do a synchronize_rcu here because this
5635 * function can be called in atomic context.
5636 * Normally this will be called from the same CPU as cpu.
5637 * If not it's up to the caller to protect this.
5638 */
5639 atomic_inc(&cpu_buffer_a->record_disabled);
5640 atomic_inc(&cpu_buffer_b->record_disabled);
5641
5642 ret = -EBUSY;
5643 if (local_read(&cpu_buffer_a->committing))
5644 goto out_dec;
5645 if (local_read(&cpu_buffer_b->committing))
5646 goto out_dec;
5647
5648 /*
5649 * When resize is in progress, we cannot swap it because
5650 * it will mess the state of the cpu buffer.
5651 */
5652 if (atomic_read(&buffer_a->resizing))
5653 goto out_dec;
5654 if (atomic_read(&buffer_b->resizing))
5655 goto out_dec;
5656
5657 buffer_a->buffers[cpu] = cpu_buffer_b;
5658 buffer_b->buffers[cpu] = cpu_buffer_a;
5659
5660 cpu_buffer_b->buffer = buffer_a;
5661 cpu_buffer_a->buffer = buffer_b;
5662
5663 ret = 0;
5664
5665 out_dec:
5666 atomic_dec(&cpu_buffer_a->record_disabled);
5667 atomic_dec(&cpu_buffer_b->record_disabled);
5668 out:
5669 return ret;
5670 }
5671 EXPORT_SYMBOL_GPL(ring_buffer_swap_cpu);
5672 #endif /* CONFIG_RING_BUFFER_ALLOW_SWAP */
5673
5674 /**
5675 * ring_buffer_alloc_read_page - allocate a page to read from buffer
5676 * @buffer: the buffer to allocate for.
5677 * @cpu: the cpu buffer to allocate.
5678 *
5679 * This function is used in conjunction with ring_buffer_read_page.
5680 * When reading a full page from the ring buffer, these functions
5681 * can be used to speed up the process. The calling function should
5682 * allocate a few pages first with this function. Then when it
5683 * needs to get pages from the ring buffer, it passes the result
5684 * of this function into ring_buffer_read_page, which will swap
5685 * the page that was allocated, with the read page of the buffer.
5686 *
5687 * Returns:
5688 * The page allocated, or ERR_PTR
5689 */
ring_buffer_alloc_read_page(struct trace_buffer * buffer,int cpu)5690 void *ring_buffer_alloc_read_page(struct trace_buffer *buffer, int cpu)
5691 {
5692 struct ring_buffer_per_cpu *cpu_buffer;
5693 struct buffer_data_page *bpage = NULL;
5694 unsigned long flags;
5695 struct page *page;
5696
5697 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5698 return ERR_PTR(-ENODEV);
5699
5700 cpu_buffer = buffer->buffers[cpu];
5701 local_irq_save(flags);
5702 arch_spin_lock(&cpu_buffer->lock);
5703
5704 if (cpu_buffer->free_page) {
5705 bpage = cpu_buffer->free_page;
5706 cpu_buffer->free_page = NULL;
5707 }
5708
5709 arch_spin_unlock(&cpu_buffer->lock);
5710 local_irq_restore(flags);
5711
5712 if (bpage)
5713 goto out;
5714
5715 page = alloc_pages_node(cpu_to_node(cpu),
5716 GFP_KERNEL | __GFP_NORETRY, 0);
5717 if (!page)
5718 return ERR_PTR(-ENOMEM);
5719
5720 bpage = page_address(page);
5721
5722 out:
5723 rb_init_page(bpage);
5724
5725 return bpage;
5726 }
5727 EXPORT_SYMBOL_GPL(ring_buffer_alloc_read_page);
5728
5729 /**
5730 * ring_buffer_free_read_page - free an allocated read page
5731 * @buffer: the buffer the page was allocate for
5732 * @cpu: the cpu buffer the page came from
5733 * @data: the page to free
5734 *
5735 * Free a page allocated from ring_buffer_alloc_read_page.
5736 */
ring_buffer_free_read_page(struct trace_buffer * buffer,int cpu,void * data)5737 void ring_buffer_free_read_page(struct trace_buffer *buffer, int cpu, void *data)
5738 {
5739 struct ring_buffer_per_cpu *cpu_buffer;
5740 struct buffer_data_page *bpage = data;
5741 struct page *page = virt_to_page(bpage);
5742 unsigned long flags;
5743
5744 if (!buffer || !buffer->buffers || !buffer->buffers[cpu])
5745 return;
5746
5747 cpu_buffer = buffer->buffers[cpu];
5748
5749 /* If the page is still in use someplace else, we can't reuse it */
5750 if (page_ref_count(page) > 1)
5751 goto out;
5752
5753 local_irq_save(flags);
5754 arch_spin_lock(&cpu_buffer->lock);
5755
5756 if (!cpu_buffer->free_page) {
5757 cpu_buffer->free_page = bpage;
5758 bpage = NULL;
5759 }
5760
5761 arch_spin_unlock(&cpu_buffer->lock);
5762 local_irq_restore(flags);
5763
5764 out:
5765 free_page((unsigned long)bpage);
5766 }
5767 EXPORT_SYMBOL_GPL(ring_buffer_free_read_page);
5768
5769 /**
5770 * ring_buffer_read_page - extract a page from the ring buffer
5771 * @buffer: buffer to extract from
5772 * @data_page: the page to use allocated from ring_buffer_alloc_read_page
5773 * @len: amount to extract
5774 * @cpu: the cpu of the buffer to extract
5775 * @full: should the extraction only happen when the page is full.
5776 *
5777 * This function will pull out a page from the ring buffer and consume it.
5778 * @data_page must be the address of the variable that was returned
5779 * from ring_buffer_alloc_read_page. This is because the page might be used
5780 * to swap with a page in the ring buffer.
5781 *
5782 * for example:
5783 * rpage = ring_buffer_alloc_read_page(buffer, cpu);
5784 * if (IS_ERR(rpage))
5785 * return PTR_ERR(rpage);
5786 * ret = ring_buffer_read_page(buffer, &rpage, len, cpu, 0);
5787 * if (ret >= 0)
5788 * process_page(rpage, ret);
5789 *
5790 * When @full is set, the function will not return true unless
5791 * the writer is off the reader page.
5792 *
5793 * Note: it is up to the calling functions to handle sleeps and wakeups.
5794 * The ring buffer can be used anywhere in the kernel and can not
5795 * blindly call wake_up. The layer that uses the ring buffer must be
5796 * responsible for that.
5797 *
5798 * Returns:
5799 * >=0 if data has been transferred, returns the offset of consumed data.
5800 * <0 if no data has been transferred.
5801 */
ring_buffer_read_page(struct trace_buffer * buffer,void ** data_page,size_t len,int cpu,int full)5802 int ring_buffer_read_page(struct trace_buffer *buffer,
5803 void **data_page, size_t len, int cpu, int full)
5804 {
5805 struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
5806 struct ring_buffer_event *event;
5807 struct buffer_data_page *bpage;
5808 struct buffer_page *reader;
5809 unsigned long missed_events;
5810 unsigned long flags;
5811 unsigned int commit;
5812 unsigned int read;
5813 u64 save_timestamp;
5814 int ret = -1;
5815
5816 if (!cpumask_test_cpu(cpu, buffer->cpumask))
5817 goto out;
5818
5819 /*
5820 * If len is not big enough to hold the page header, then
5821 * we can not copy anything.
5822 */
5823 if (len <= BUF_PAGE_HDR_SIZE)
5824 goto out;
5825
5826 len -= BUF_PAGE_HDR_SIZE;
5827
5828 if (!data_page)
5829 goto out;
5830
5831 bpage = *data_page;
5832 if (!bpage)
5833 goto out;
5834
5835 raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
5836
5837 reader = rb_get_reader_page(cpu_buffer);
5838 if (!reader)
5839 goto out_unlock;
5840
5841 event = rb_reader_event(cpu_buffer);
5842
5843 read = reader->read;
5844 commit = rb_page_commit(reader);
5845
5846 /* Check if any events were dropped */
5847 missed_events = cpu_buffer->lost_events;
5848
5849 /*
5850 * If this page has been partially read or
5851 * if len is not big enough to read the rest of the page or
5852 * a writer is still on the page, then
5853 * we must copy the data from the page to the buffer.
5854 * Otherwise, we can simply swap the page with the one passed in.
5855 */
5856 if (read || (len < (commit - read)) ||
5857 cpu_buffer->reader_page == cpu_buffer->commit_page ||
5858 unlikely(has_ext_writer(buffer))) {
5859 struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
5860 unsigned int rpos = read;
5861 unsigned int pos = 0;
5862 unsigned int size;
5863
5864 /*
5865 * If a full page is expected, this can still be returned
5866 * if there's been a previous partial read and the
5867 * rest of the page can be read and the commit page is off
5868 * the reader page.
5869 */
5870 if (full &&
5871 (!read || (len < (commit - read)) ||
5872 cpu_buffer->reader_page == cpu_buffer->commit_page))
5873 goto out_unlock;
5874
5875 if (len > (commit - read))
5876 len = (commit - read);
5877
5878 /* Always keep the time extend and data together */
5879 size = rb_event_ts_length(event);
5880
5881 if (len < size)
5882 goto out_unlock;
5883
5884 /* save the current timestamp, since the user will need it */
5885 save_timestamp = cpu_buffer->read_stamp;
5886
5887 /* Need to copy one event at a time */
5888 do {
5889 /* We need the size of one event, because
5890 * rb_advance_reader only advances by one event,
5891 * whereas rb_event_ts_length may include the size of
5892 * one or two events.
5893 * We have already ensured there's enough space if this
5894 * is a time extend. */
5895 size = rb_event_length(event);
5896 memcpy(bpage->data + pos, rpage->data + rpos, size);
5897
5898 len -= size;
5899
5900 rb_advance_reader(cpu_buffer);
5901 rpos = reader->read;
5902 pos += size;
5903
5904 if (rpos >= commit)
5905 break;
5906
5907 event = rb_reader_event(cpu_buffer);
5908 /* Always keep the time extend and data together */
5909 size = rb_event_ts_length(event);
5910 } while (len >= size);
5911
5912 /* update bpage */
5913 local_set(&bpage->commit, pos);
5914 bpage->time_stamp = save_timestamp;
5915
5916 /* we copied everything to the beginning */
5917 read = 0;
5918 } else {
5919 /* update the entry counter */
5920 cpu_buffer->read += rb_page_entries(reader);
5921 cpu_buffer->read_bytes += rb_page_commit(reader);
5922
5923 /* swap the pages */
5924 rb_init_page(bpage);
5925 bpage = reader->page;
5926 reader->page = *data_page;
5927 local_set(&reader->write, 0);
5928 local_set(&reader->entries, 0);
5929 reader->read = 0;
5930 *data_page = bpage;
5931
5932 /*
5933 * Use the real_end for the data size,
5934 * This gives us a chance to store the lost events
5935 * on the page.
5936 */
5937 if (reader->real_end)
5938 local_set(&bpage->commit, reader->real_end);
5939 }
5940 ret = read;
5941
5942 cpu_buffer->lost_events = 0;
5943
5944 commit = local_read(&bpage->commit);
5945 /*
5946 * Set a flag in the commit field if we lost events
5947 */
5948 if (missed_events) {
5949 /* If there is room at the end of the page to save the
5950 * missed events, then record it there.
5951 */
5952 if (BUF_PAGE_SIZE - commit >= sizeof(missed_events)) {
5953 memcpy(&bpage->data[commit], &missed_events,
5954 sizeof(missed_events));
5955 local_add(RB_MISSED_STORED, &bpage->commit);
5956 commit += sizeof(missed_events);
5957 }
5958 local_add(RB_MISSED_EVENTS, &bpage->commit);
5959 }
5960
5961 /*
5962 * This page may be off to user land. Zero it out here.
5963 */
5964 if (commit < BUF_PAGE_SIZE)
5965 memset(&bpage->data[commit], 0, BUF_PAGE_SIZE - commit);
5966
5967 out_unlock:
5968 raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
5969
5970 out:
5971 return ret;
5972 }
5973 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
5974
5975 /*
5976 * We only allocate new buffers, never free them if the CPU goes down.
5977 * If we were to free the buffer, then the user would lose any trace that was in
5978 * the buffer.
5979 */
trace_rb_cpu_prepare(unsigned int cpu,struct hlist_node * node)5980 int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node)
5981 {
5982 struct trace_buffer *buffer;
5983 long nr_pages_same;
5984 int cpu_i;
5985 unsigned long nr_pages;
5986
5987 buffer = container_of(node, struct trace_buffer, node);
5988 if (cpumask_test_cpu(cpu, buffer->cpumask))
5989 return 0;
5990
5991 nr_pages = 0;
5992 nr_pages_same = 1;
5993 /* check if all cpu sizes are same */
5994 for_each_buffer_cpu(buffer, cpu_i) {
5995 /* fill in the size from first enabled cpu */
5996 if (nr_pages == 0)
5997 nr_pages = buffer->buffers[cpu_i]->nr_pages;
5998 if (nr_pages != buffer->buffers[cpu_i]->nr_pages) {
5999 nr_pages_same = 0;
6000 break;
6001 }
6002 }
6003 /* allocate minimum pages, user can later expand it */
6004 if (!nr_pages_same)
6005 nr_pages = 2;
6006 buffer->buffers[cpu] =
6007 rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
6008 if (!buffer->buffers[cpu]) {
6009 WARN(1, "failed to allocate ring buffer on CPU %u\n",
6010 cpu);
6011 return -ENOMEM;
6012 }
6013 smp_wmb();
6014 cpumask_set_cpu(cpu, buffer->cpumask);
6015 return 0;
6016 }
6017
6018 #define TRACE_BUFFER_PACK_HDR_SIZE offsetof(struct trace_buffer_pack, __data)
6019 #define RING_BUFFER_PACK_HDR_SIZE offsetof(struct ring_buffer_pack, page_va)
6020
trace_buffer_pack_size(struct trace_buffer * trace_buffer)6021 size_t trace_buffer_pack_size(struct trace_buffer *trace_buffer)
6022 {
6023 size_t size = 0;
6024 int cpu;
6025
6026 for_each_buffer_cpu(trace_buffer, cpu) {
6027 struct ring_buffer_per_cpu *rb = trace_buffer->buffers[cpu];
6028 size += rb->nr_pages * sizeof(unsigned long);
6029 size += RING_BUFFER_PACK_HDR_SIZE;
6030 }
6031
6032 size += TRACE_BUFFER_PACK_HDR_SIZE;
6033
6034 return size;
6035 }
6036
trace_buffer_pack(struct trace_buffer * trace_buffer,struct trace_buffer_pack * pack)6037 int trace_buffer_pack(struct trace_buffer *trace_buffer,
6038 struct trace_buffer_pack *pack)
6039 {
6040 struct ring_buffer_pack *cpu_pack;
6041 int cpu = -1, pack_cpu, j;
6042
6043 if (!has_ext_writer(trace_buffer))
6044 return -EINVAL;
6045
6046 pack->nr_cpus = cpumask_weight(trace_buffer->cpumask);
6047 pack->total_pages = 0;
6048
6049 for_each_ring_buffer_pack(cpu_pack, pack_cpu, pack) {
6050 struct ring_buffer_per_cpu *rb;
6051 unsigned long flags, nr_pages;
6052 struct buffer_page *bpage;
6053
6054 cpu = cpumask_next(cpu, trace_buffer->cpumask);
6055 if (cpu > nr_cpu_ids) {
6056 WARN_ON(1);
6057 break;
6058 }
6059
6060 rb = trace_buffer->buffers[cpu];
6061
6062 local_irq_save(flags);
6063 arch_spin_lock(&rb->lock);
6064
6065 bpage = rb->head_page;
6066 nr_pages = rb->nr_pages;
6067
6068 pack->total_pages += nr_pages + 1;
6069
6070 cpu_pack->cpu = cpu;
6071 cpu_pack->reader_page_va = (unsigned long)rb->reader_page->page;
6072 cpu_pack->nr_pages = nr_pages;
6073
6074 for (j = 0; j < nr_pages; j++) {
6075 cpu_pack->page_va[j] = (unsigned long)bpage->page;
6076 rb_inc_page(&bpage);
6077 }
6078
6079 arch_spin_unlock(&rb->lock);
6080 local_irq_restore(flags);
6081 }
6082
6083 return 0;
6084 }
6085
6086 #ifdef CONFIG_RING_BUFFER_STARTUP_TEST
6087 /*
6088 * This is a basic integrity check of the ring buffer.
6089 * Late in the boot cycle this test will run when configured in.
6090 * It will kick off a thread per CPU that will go into a loop
6091 * writing to the per cpu ring buffer various sizes of data.
6092 * Some of the data will be large items, some small.
6093 *
6094 * Another thread is created that goes into a spin, sending out
6095 * IPIs to the other CPUs to also write into the ring buffer.
6096 * this is to test the nesting ability of the buffer.
6097 *
6098 * Basic stats are recorded and reported. If something in the
6099 * ring buffer should happen that's not expected, a big warning
6100 * is displayed and all ring buffers are disabled.
6101 */
6102 static struct task_struct *rb_threads[NR_CPUS] __initdata;
6103
6104 struct rb_test_data {
6105 struct trace_buffer *buffer;
6106 unsigned long events;
6107 unsigned long bytes_written;
6108 unsigned long bytes_alloc;
6109 unsigned long bytes_dropped;
6110 unsigned long events_nested;
6111 unsigned long bytes_written_nested;
6112 unsigned long bytes_alloc_nested;
6113 unsigned long bytes_dropped_nested;
6114 int min_size_nested;
6115 int max_size_nested;
6116 int max_size;
6117 int min_size;
6118 int cpu;
6119 int cnt;
6120 };
6121
6122 static struct rb_test_data rb_data[NR_CPUS] __initdata;
6123
6124 /* 1 meg per cpu */
6125 #define RB_TEST_BUFFER_SIZE 1048576
6126
6127 static char rb_string[] __initdata =
6128 "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()?+\\"
6129 "?+|:';\",.<>/?abcdefghijklmnopqrstuvwxyz1234567890"
6130 "!@#$%^&*()?+\\?+|:';\",.<>/?abcdefghijklmnopqrstuv";
6131
6132 static bool rb_test_started __initdata;
6133
6134 struct rb_item {
6135 int size;
6136 char str[];
6137 };
6138
rb_write_something(struct rb_test_data * data,bool nested)6139 static __init int rb_write_something(struct rb_test_data *data, bool nested)
6140 {
6141 struct ring_buffer_event *event;
6142 struct rb_item *item;
6143 bool started;
6144 int event_len;
6145 int size;
6146 int len;
6147 int cnt;
6148
6149 /* Have nested writes different that what is written */
6150 cnt = data->cnt + (nested ? 27 : 0);
6151
6152 /* Multiply cnt by ~e, to make some unique increment */
6153 size = (cnt * 68 / 25) % (sizeof(rb_string) - 1);
6154
6155 len = size + sizeof(struct rb_item);
6156
6157 started = rb_test_started;
6158 /* read rb_test_started before checking buffer enabled */
6159 smp_rmb();
6160
6161 event = ring_buffer_lock_reserve(data->buffer, len);
6162 if (!event) {
6163 /* Ignore dropped events before test starts. */
6164 if (started) {
6165 if (nested)
6166 data->bytes_dropped += len;
6167 else
6168 data->bytes_dropped_nested += len;
6169 }
6170 return len;
6171 }
6172
6173 event_len = ring_buffer_event_length(event);
6174
6175 if (RB_WARN_ON(data->buffer, event_len < len))
6176 goto out;
6177
6178 item = ring_buffer_event_data(event);
6179 item->size = size;
6180 memcpy(item->str, rb_string, size);
6181
6182 if (nested) {
6183 data->bytes_alloc_nested += event_len;
6184 data->bytes_written_nested += len;
6185 data->events_nested++;
6186 if (!data->min_size_nested || len < data->min_size_nested)
6187 data->min_size_nested = len;
6188 if (len > data->max_size_nested)
6189 data->max_size_nested = len;
6190 } else {
6191 data->bytes_alloc += event_len;
6192 data->bytes_written += len;
6193 data->events++;
6194 if (!data->min_size || len < data->min_size)
6195 data->max_size = len;
6196 if (len > data->max_size)
6197 data->max_size = len;
6198 }
6199
6200 out:
6201 ring_buffer_unlock_commit(data->buffer, event);
6202
6203 return 0;
6204 }
6205
rb_test(void * arg)6206 static __init int rb_test(void *arg)
6207 {
6208 struct rb_test_data *data = arg;
6209
6210 while (!kthread_should_stop()) {
6211 rb_write_something(data, false);
6212 data->cnt++;
6213
6214 set_current_state(TASK_INTERRUPTIBLE);
6215 /* Now sleep between a min of 100-300us and a max of 1ms */
6216 usleep_range(((data->cnt % 3) + 1) * 100, 1000);
6217 }
6218
6219 return 0;
6220 }
6221
rb_ipi(void * ignore)6222 static __init void rb_ipi(void *ignore)
6223 {
6224 struct rb_test_data *data;
6225 int cpu = smp_processor_id();
6226
6227 data = &rb_data[cpu];
6228 rb_write_something(data, true);
6229 }
6230
rb_hammer_test(void * arg)6231 static __init int rb_hammer_test(void *arg)
6232 {
6233 while (!kthread_should_stop()) {
6234
6235 /* Send an IPI to all cpus to write data! */
6236 smp_call_function(rb_ipi, NULL, 1);
6237 /* No sleep, but for non preempt, let others run */
6238 schedule();
6239 }
6240
6241 return 0;
6242 }
6243
test_ringbuffer(void)6244 static __init int test_ringbuffer(void)
6245 {
6246 struct task_struct *rb_hammer;
6247 struct trace_buffer *buffer;
6248 int cpu;
6249 int ret = 0;
6250
6251 if (security_locked_down(LOCKDOWN_TRACEFS)) {
6252 pr_warn("Lockdown is enabled, skipping ring buffer tests\n");
6253 return 0;
6254 }
6255
6256 pr_info("Running ring buffer tests...\n");
6257
6258 buffer = ring_buffer_alloc(RB_TEST_BUFFER_SIZE, RB_FL_OVERWRITE);
6259 if (WARN_ON(!buffer))
6260 return 0;
6261
6262 /* Disable buffer so that threads can't write to it yet */
6263 ring_buffer_record_off(buffer);
6264
6265 for_each_online_cpu(cpu) {
6266 rb_data[cpu].buffer = buffer;
6267 rb_data[cpu].cpu = cpu;
6268 rb_data[cpu].cnt = cpu;
6269 rb_threads[cpu] = kthread_create(rb_test, &rb_data[cpu],
6270 "rbtester/%d", cpu);
6271 if (WARN_ON(IS_ERR(rb_threads[cpu]))) {
6272 pr_cont("FAILED\n");
6273 ret = PTR_ERR(rb_threads[cpu]);
6274 goto out_free;
6275 }
6276
6277 kthread_bind(rb_threads[cpu], cpu);
6278 wake_up_process(rb_threads[cpu]);
6279 }
6280
6281 /* Now create the rb hammer! */
6282 rb_hammer = kthread_run(rb_hammer_test, NULL, "rbhammer");
6283 if (WARN_ON(IS_ERR(rb_hammer))) {
6284 pr_cont("FAILED\n");
6285 ret = PTR_ERR(rb_hammer);
6286 goto out_free;
6287 }
6288
6289 ring_buffer_record_on(buffer);
6290 /*
6291 * Show buffer is enabled before setting rb_test_started.
6292 * Yes there's a small race window where events could be
6293 * dropped and the thread wont catch it. But when a ring
6294 * buffer gets enabled, there will always be some kind of
6295 * delay before other CPUs see it. Thus, we don't care about
6296 * those dropped events. We care about events dropped after
6297 * the threads see that the buffer is active.
6298 */
6299 smp_wmb();
6300 rb_test_started = true;
6301
6302 set_current_state(TASK_INTERRUPTIBLE);
6303 /* Just run for 10 seconds */;
6304 schedule_timeout(10 * HZ);
6305
6306 kthread_stop(rb_hammer);
6307
6308 out_free:
6309 for_each_online_cpu(cpu) {
6310 if (!rb_threads[cpu])
6311 break;
6312 kthread_stop(rb_threads[cpu]);
6313 }
6314 if (ret) {
6315 ring_buffer_free(buffer);
6316 return ret;
6317 }
6318
6319 /* Report! */
6320 pr_info("finished\n");
6321 for_each_online_cpu(cpu) {
6322 struct ring_buffer_event *event;
6323 struct rb_test_data *data = &rb_data[cpu];
6324 struct rb_item *item;
6325 unsigned long total_events;
6326 unsigned long total_dropped;
6327 unsigned long total_written;
6328 unsigned long total_alloc;
6329 unsigned long total_read = 0;
6330 unsigned long total_size = 0;
6331 unsigned long total_len = 0;
6332 unsigned long total_lost = 0;
6333 unsigned long lost;
6334 int big_event_size;
6335 int small_event_size;
6336
6337 ret = -1;
6338
6339 total_events = data->events + data->events_nested;
6340 total_written = data->bytes_written + data->bytes_written_nested;
6341 total_alloc = data->bytes_alloc + data->bytes_alloc_nested;
6342 total_dropped = data->bytes_dropped + data->bytes_dropped_nested;
6343
6344 big_event_size = data->max_size + data->max_size_nested;
6345 small_event_size = data->min_size + data->min_size_nested;
6346
6347 pr_info("CPU %d:\n", cpu);
6348 pr_info(" events: %ld\n", total_events);
6349 pr_info(" dropped bytes: %ld\n", total_dropped);
6350 pr_info(" alloced bytes: %ld\n", total_alloc);
6351 pr_info(" written bytes: %ld\n", total_written);
6352 pr_info(" biggest event: %d\n", big_event_size);
6353 pr_info(" smallest event: %d\n", small_event_size);
6354
6355 if (RB_WARN_ON(buffer, total_dropped))
6356 break;
6357
6358 ret = 0;
6359
6360 while ((event = ring_buffer_consume(buffer, cpu, NULL, &lost))) {
6361 total_lost += lost;
6362 item = ring_buffer_event_data(event);
6363 total_len += ring_buffer_event_length(event);
6364 total_size += item->size + sizeof(struct rb_item);
6365 if (memcmp(&item->str[0], rb_string, item->size) != 0) {
6366 pr_info("FAILED!\n");
6367 pr_info("buffer had: %.*s\n", item->size, item->str);
6368 pr_info("expected: %.*s\n", item->size, rb_string);
6369 RB_WARN_ON(buffer, 1);
6370 ret = -1;
6371 break;
6372 }
6373 total_read++;
6374 }
6375 if (ret)
6376 break;
6377
6378 ret = -1;
6379
6380 pr_info(" read events: %ld\n", total_read);
6381 pr_info(" lost events: %ld\n", total_lost);
6382 pr_info(" total events: %ld\n", total_lost + total_read);
6383 pr_info(" recorded len bytes: %ld\n", total_len);
6384 pr_info(" recorded size bytes: %ld\n", total_size);
6385 if (total_lost)
6386 pr_info(" With dropped events, record len and size may not match\n"
6387 " alloced and written from above\n");
6388 if (!total_lost) {
6389 if (RB_WARN_ON(buffer, total_len != total_alloc ||
6390 total_size != total_written))
6391 break;
6392 }
6393 if (RB_WARN_ON(buffer, total_lost + total_read != total_events))
6394 break;
6395
6396 ret = 0;
6397 }
6398 if (!ret)
6399 pr_info("Ring buffer PASSED!\n");
6400
6401 ring_buffer_free(buffer);
6402 return 0;
6403 }
6404
6405 late_initcall(test_ringbuffer);
6406 #endif /* CONFIG_RING_BUFFER_STARTUP_TEST */
6407