use core::ffi::c_void; use std::fmt::Debug; use std::fmt::Formatter; use std::fmt::Result as FmtResult; use std::ops::Deref as _; use std::ops::DerefMut as _; use std::os::raw::c_ulong; use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::BorrowedFd; use std::ptr::null_mut; use std::ptr::NonNull; use std::slice; use std::time::Duration; use crate::util; use crate::util::validate_bpf_ret; use crate::AsRawLibbpf; use crate::Error; use crate::ErrorExt as _; use crate::MapCore; use crate::MapType; use crate::Result; type Cb<'a> = Box i32 + 'a>; struct RingBufferCallback<'a> { cb: Cb<'a>, } impl<'a> RingBufferCallback<'a> { fn new(cb: F) -> Self where F: FnMut(&[u8]) -> i32 + 'a, { RingBufferCallback { cb: Box::new(cb) } } } impl Debug for RingBufferCallback<'_> { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { let Self { cb } = self; f.debug_struct("RingBufferCallback") .field("cb", &(cb.deref() as *const _)) .finish() } } /// Builds [`RingBuffer`] instances. /// /// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data /// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the /// `ringbuf` map is now preferred over the `perf buffer`. #[derive(Debug, Default)] pub struct RingBufferBuilder<'slf, 'cb> { fd_callbacks: Vec<(BorrowedFd<'slf>, RingBufferCallback<'cb>)>, } impl<'slf, 'cb: 'slf> RingBufferBuilder<'slf, 'cb> { /// Create a new `RingBufferBuilder` object. pub fn new() -> Self { RingBufferBuilder { fd_callbacks: vec![], } } /// Add a new ringbuf `map` and associated `callback` to this ring buffer /// manager. The callback should take one argument, a slice of raw bytes, /// and return an i32. /// /// Non-zero return values in the callback will stop ring buffer consumption early. /// /// The callback provides a raw byte slice. You may find libraries such as /// [`plain`](https://crates.io/crates/plain) helpful. pub fn add(&mut self, map: &'slf dyn MapCore, callback: NewF) -> Result<&mut Self> where NewF: FnMut(&[u8]) -> i32 + 'cb, { if map.map_type() != MapType::RingBuf { return Err(Error::with_invalid_data("Must use a RingBuf map")); } self.fd_callbacks .push((map.as_fd(), RingBufferCallback::new(callback))); Ok(self) } /// Build a new [`RingBuffer`]. Must have added at least one ringbuf. pub fn build(self) -> Result> { let mut cbs = vec![]; let mut rb_ptr: Option> = None; let c_sample_cb: libbpf_sys::ring_buffer_sample_fn = Some(Self::call_sample_cb); for (fd, callback) in self.fd_callbacks { let mut sample_cb = Box::new(callback); match rb_ptr { None => { // Allocate a new ringbuf manager and add a ringbuf to it // SAFETY: All pointers are valid or rightly NULL. // The object referenced by `sample_cb` is // not modified by `libbpf` let ptr = unsafe { libbpf_sys::ring_buffer__new( fd.as_raw_fd(), c_sample_cb, sample_cb.deref_mut() as *mut _ as *mut _, null_mut(), ) }; let ptr = validate_bpf_ret(ptr).context("failed to create new ring buffer")?; rb_ptr = Some(ptr) } Some(mut ptr) => { // Add a ringbuf to the existing ringbuf manager // SAFETY: All pointers are valid or rightly NULL. // The object referenced by `sample_cb` is // not modified by `libbpf` let err = unsafe { libbpf_sys::ring_buffer__add( ptr.as_ptr(), fd.as_raw_fd(), c_sample_cb, sample_cb.deref_mut() as *mut _ as *mut _, ) }; // Handle errors if err != 0 { // SAFETY: The pointer is valid. let () = unsafe { libbpf_sys::ring_buffer__free(ptr.as_mut()) }; return Err(Error::from_raw_os_error(err)); } } } let () = cbs.push(sample_cb); } match rb_ptr { Some(ptr) => Ok(RingBuffer { ptr, _cbs: cbs }), None => Err(Error::with_invalid_data( "You must add at least one ring buffer map and callback before building", )), } } unsafe extern "C" fn call_sample_cb(ctx: *mut c_void, data: *mut c_void, size: c_ulong) -> i32 { let callback_struct = ctx as *mut RingBufferCallback<'_>; let callback = unsafe { (*callback_struct).cb.as_mut() }; let slice = unsafe { slice::from_raw_parts(data as *const u8, size as usize) }; callback(slice) } } /// The canonical interface for managing a collection of `ringbuf` maps. /// /// `ringbuf`s are a special kind of [`Map`][crate::Map], used to transfer data /// between [`Program`][crate::Program]s and userspace. As of Linux 5.8, the /// `ringbuf` map is now preferred over the `perf buffer`. #[derive(Debug)] pub struct RingBuffer<'cb> { ptr: NonNull, #[allow(clippy::vec_box)] _cbs: Vec>>, } impl RingBuffer<'_> { /// Poll from all open ring buffers, calling the registered callback for /// each one. Polls continually until we either run out of events to consume /// or `timeout` is reached. If `timeout` is Duration::MAX, this will block /// indefinitely until an event occurs. /// /// Return the amount of events consumed, or a negative value in case of error. pub fn poll_raw(&self, timeout: Duration) -> i32 { let mut timeout_ms = -1; if timeout != Duration::MAX { timeout_ms = timeout.as_millis() as i32; } unsafe { libbpf_sys::ring_buffer__poll(self.ptr.as_ptr(), timeout_ms) } } /// Poll from all open ring buffers, calling the registered callback for /// each one. Polls continually until we either run out of events to consume /// or `timeout` is reached. If `timeout` is Duration::MAX, this will block /// indefinitely until an event occurs. pub fn poll(&self, timeout: Duration) -> Result<()> { let ret = self.poll_raw(timeout); util::parse_ret(ret) } /// Greedily consume from all open ring buffers, calling the registered /// callback for each one. Consumes continually until we run out of events /// to consume or one of the callbacks returns a non-zero integer. /// /// Return the amount of events consumed, or a negative value in case of error. pub fn consume_raw(&self) -> i32 { unsafe { libbpf_sys::ring_buffer__consume(self.ptr.as_ptr()) } } /// Greedily consume from all open ring buffers, calling the registered /// callback for each one. Consumes continually until we run out of events /// to consume or one of the callbacks returns a non-zero integer. pub fn consume(&self) -> Result<()> { let ret = self.consume_raw(); util::parse_ret(ret) } /// Get an fd that can be used to sleep until data is available pub fn epoll_fd(&self) -> i32 { unsafe { libbpf_sys::ring_buffer__epoll_fd(self.ptr.as_ptr()) } } } impl AsRawLibbpf for RingBuffer<'_> { type LibbpfType = libbpf_sys::ring_buffer; /// Retrieve the underlying [`libbpf_sys::ring_buffer`]. fn as_libbpf_object(&self) -> NonNull { self.ptr } } // SAFETY: `ring_buffer` objects can safely be polled from any thread. unsafe impl Send for RingBuffer<'_> {} impl Drop for RingBuffer<'_> { fn drop(&mut self) { unsafe { libbpf_sys::ring_buffer__free(self.ptr.as_ptr()); } } } #[cfg(test)] mod test { use super::*; /// Check that `RingBuffer` is `Send`. #[test] fn ringbuffer_is_send() { fn test() where T: Send, { } test::>(); } }