/* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "IOUringSocketHandler" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include bool IOUringSocketHandler::isIouringEnabled() { return isIouringSupportedByKernel(); } bool IOUringSocketHandler::isIouringSupportedByKernel() { struct utsname uts {}; unsigned int major, minor; uname(&uts); if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) { return false; } // We will only support kernels from 6.1 and higher. return major > 6 || (major == 6 && minor >= 1); } IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {} IOUringSocketHandler::~IOUringSocketHandler() { DeRegisterBuffers(); if (ring_setup_) { io_uring_queue_exit(&mCtx->ring); } } bool IOUringSocketHandler::EnqueueMultishotRecvmsg() { struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring); memset(&msg, 0, sizeof(msg)); msg.msg_controllen = control_len_; io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0); sqe->flags |= IOSQE_BUFFER_SELECT; sqe->buf_group = bgid_; int ret = io_uring_submit(&mCtx->ring); if (ret < 0) { LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret; return false; } return true; } bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) { num_buffers_ = num_buffers; control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr); buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size; for (size_t i = 0; i < num_buffers_; i++) { std::unique_ptr buffer = std::make_unique(buffer_size_); buffers_.push_back(std::move(buffer)); } return RegisterBuffers(); } bool IOUringSocketHandler::RegisterBuffers() { int ret = 0; br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret); if (!br_) { LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret; return false; } for (size_t i = 0; i < num_buffers_; i++) { void* buffer = buffers_[i].get(); io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_), i); } io_uring_buf_ring_advance(br_, num_buffers_); LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_; registered_buffers_ = true; return true; } void IOUringSocketHandler::DeRegisterBuffers() { if (registered_buffers_) { io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_); registered_buffers_ = false; } buffers_.clear(); num_buffers_ = 0; control_len_ = 0; buffer_size_ = 0; } bool IOUringSocketHandler::SetupIoUring(int queue_size) { mCtx = std::unique_ptr(new uring_context()); struct io_uring_params params = {}; // COOP_TASKRUN - No IPI to logd // SINGLE_ISSUER - Only one thread is doing the work on the ring // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required // DEFER_TASKRUN - trigger task work when CQE is explicitly polled params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN); int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms); if (ret) { LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret; return false; } else { LOG(INFO) << "io_uring_queue_init_params success"; } ring_setup_ = true; return true; } void IOUringSocketHandler::ReleaseBuffer() { if (active_buffer_id_ == -1) { return; } // Put the buffer back to the pool io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_, io_uring_buf_ring_mask(num_buffers_), 0); io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1); active_buffer_id_ = -1; // If there are no more CQE data, re-arm the SQE bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE); if (!is_more_cqe) { EnqueueMultishotRecvmsg(); } } void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) { if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) { int ret = io_uring_wait_cqe(&mCtx->ring, &cqe); if (ret) { LOG(ERROR) << "WaitCqe failed: " << ret; EnqueueMultishotRecvmsg(); return; } } if (cqe->res < 0) { io_uring_cqe_seen(&mCtx->ring, cqe); EnqueueMultishotRecvmsg(); return; } active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT; void* this_recv = buffers_[active_buffer_id_].get(); struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg); if (!o) { return; } struct cmsghdr* cmsg; cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg); struct ucred* cr = nullptr; while (cmsg != nullptr) { if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) { cr = (struct ucred*)CMSG_DATA(cmsg); break; } cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg); } *payload = io_uring_recvmsg_payload(o, &msg); payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg); *cred = cr; }