• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2025 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "IOUringSocketHandler"
18 
19 #include <sys/resource.h>
20 #include <sys/utsname.h>
21 #include <unistd.h>
22 
23 #include <limits.h>
24 #include <linux/time_types.h>
25 #include <sys/cdefs.h>
26 #include <sys/prctl.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <sys/un.h>
30 #include <unistd.h>
31 
32 #include <chrono>
33 #include <thread>
34 
35 #include <cutils/sockets.h>
36 #include <private/android_logger.h>
37 
38 #include <IOUringSocketHandler/IOUringSocketHandler.h>
39 
40 #include <android-base/logging.h>
41 #include <android-base/scopeguard.h>
42 
isIouringEnabled()43 bool IOUringSocketHandler::isIouringEnabled() {
44     return isIouringSupportedByKernel();
45 }
46 
isIouringSupportedByKernel()47 bool IOUringSocketHandler::isIouringSupportedByKernel() {
48     struct utsname uts {};
49     unsigned int major, minor;
50 
51     uname(&uts);
52     if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) {
53         return false;
54     }
55 
56     // We will only support kernels from 6.1 and higher.
57     return major > 6 || (major == 6 && minor >= 1);
58 }
59 
IOUringSocketHandler(int socket_fd)60 IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {}
61 
~IOUringSocketHandler()62 IOUringSocketHandler::~IOUringSocketHandler() {
63     DeRegisterBuffers();
64     if (ring_setup_) {
65         io_uring_queue_exit(&mCtx->ring);
66     }
67 }
68 
EnqueueMultishotRecvmsg()69 bool IOUringSocketHandler::EnqueueMultishotRecvmsg() {
70     struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring);
71     memset(&msg, 0, sizeof(msg));
72     msg.msg_controllen = control_len_;
73     io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0);
74     sqe->flags |= IOSQE_BUFFER_SELECT;
75     sqe->buf_group = bgid_;
76     int ret = io_uring_submit(&mCtx->ring);
77     if (ret < 0) {
78         LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret;
79         return false;
80     }
81     return true;
82 }
83 
AllocateAndRegisterBuffers(size_t num_buffers,size_t buf_size)84 bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) {
85     num_buffers_ = num_buffers;
86     control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr);
87 
88     buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size;
89 
90     for (size_t i = 0; i < num_buffers_; i++) {
91         std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_);
92         buffers_.push_back(std::move(buffer));
93     }
94     return RegisterBuffers();
95 }
96 
RegisterBuffers()97 bool IOUringSocketHandler::RegisterBuffers() {
98     int ret = 0;
99     br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret);
100     if (!br_) {
101         LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret;
102         return false;
103     }
104     for (size_t i = 0; i < num_buffers_; i++) {
105         void* buffer = buffers_[i].get();
106         io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_),
107                               i);
108     }
109     io_uring_buf_ring_advance(br_, num_buffers_);
110     LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_;
111     registered_buffers_ = true;
112     return true;
113 }
114 
DeRegisterBuffers()115 void IOUringSocketHandler::DeRegisterBuffers() {
116     if (registered_buffers_) {
117         io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_);
118         registered_buffers_ = false;
119     }
120     buffers_.clear();
121     num_buffers_ = 0;
122     control_len_ = 0;
123     buffer_size_ = 0;
124 }
125 
SetupIoUring(int queue_size)126 bool IOUringSocketHandler::SetupIoUring(int queue_size) {
127     mCtx = std::unique_ptr<uring_context>(new uring_context());
128     struct io_uring_params params = {};
129 
130     // COOP_TASKRUN - No IPI to logd
131     // SINGLE_ISSUER - Only one thread is doing the work on the ring
132     // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required
133     // DEFER_TASKRUN - trigger task work when CQE is explicitly polled
134     params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER |
135                      IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN);
136 
137     int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, &params);
138     if (ret) {
139         LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret;
140         return false;
141     } else {
142         LOG(INFO) << "io_uring_queue_init_params success";
143     }
144 
145     ring_setup_ = true;
146     return true;
147 }
148 
ReleaseBuffer()149 void IOUringSocketHandler::ReleaseBuffer() {
150     if (active_buffer_id_ == -1) {
151         return;
152     }
153 
154     // Put the buffer back to the pool
155     io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_,
156                           io_uring_buf_ring_mask(num_buffers_), 0);
157     io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1);
158     active_buffer_id_ = -1;
159 
160     // If there are no more CQE data, re-arm the SQE
161     bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE);
162     if (!is_more_cqe) {
163         EnqueueMultishotRecvmsg();
164     }
165 }
166 
ReceiveData(void ** payload,size_t & payload_len,struct ucred ** cred)167 void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) {
168     if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) {
169         int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
170         if (ret) {
171             LOG(ERROR) << "WaitCqe failed: " << ret;
172             EnqueueMultishotRecvmsg();
173             return;
174         }
175     }
176 
177     if (cqe->res < 0) {
178         io_uring_cqe_seen(&mCtx->ring, cqe);
179         EnqueueMultishotRecvmsg();
180         return;
181     }
182 
183     active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
184 
185     void* this_recv = buffers_[active_buffer_id_].get();
186     struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
187 
188     if (!o) {
189         return;
190     }
191 
192     struct cmsghdr* cmsg;
193     cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
194 
195     struct ucred* cr = nullptr;
196     while (cmsg != nullptr) {
197         if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
198             cr = (struct ucred*)CMSG_DATA(cmsg);
199             break;
200         }
201         cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
202     }
203 
204     *payload = io_uring_recvmsg_payload(o, &msg);
205     payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
206     *cred = cr;
207 }
208