1 /*
2 * Copyright (C) 2025 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "IOUringSocketHandler"
18
19 #include <sys/resource.h>
20 #include <sys/utsname.h>
21 #include <unistd.h>
22
23 #include <limits.h>
24 #include <linux/time_types.h>
25 #include <sys/cdefs.h>
26 #include <sys/prctl.h>
27 #include <sys/socket.h>
28 #include <sys/types.h>
29 #include <sys/un.h>
30 #include <unistd.h>
31
32 #include <chrono>
33 #include <thread>
34
35 #include <cutils/sockets.h>
36 #include <private/android_logger.h>
37
38 #include <IOUringSocketHandler/IOUringSocketHandler.h>
39
40 #include <android-base/logging.h>
41 #include <android-base/scopeguard.h>
42
isIouringEnabled()43 bool IOUringSocketHandler::isIouringEnabled() {
44 return isIouringSupportedByKernel();
45 }
46
isIouringSupportedByKernel()47 bool IOUringSocketHandler::isIouringSupportedByKernel() {
48 struct utsname uts {};
49 unsigned int major, minor;
50
51 uname(&uts);
52 if (sscanf(uts.release, "%u.%u", &major, &minor) != 2) {
53 return false;
54 }
55
56 // We will only support kernels from 6.1 and higher.
57 return major > 6 || (major == 6 && minor >= 1);
58 }
59
IOUringSocketHandler(int socket_fd)60 IOUringSocketHandler::IOUringSocketHandler(int socket_fd) : socket_(socket_fd) {}
61
~IOUringSocketHandler()62 IOUringSocketHandler::~IOUringSocketHandler() {
63 DeRegisterBuffers();
64 if (ring_setup_) {
65 io_uring_queue_exit(&mCtx->ring);
66 }
67 }
68
EnqueueMultishotRecvmsg()69 bool IOUringSocketHandler::EnqueueMultishotRecvmsg() {
70 struct io_uring_sqe* sqe = io_uring_get_sqe(&mCtx->ring);
71 memset(&msg, 0, sizeof(msg));
72 msg.msg_controllen = control_len_;
73 io_uring_prep_recvmsg_multishot(sqe, socket_, &msg, 0);
74 sqe->flags |= IOSQE_BUFFER_SELECT;
75 sqe->buf_group = bgid_;
76 int ret = io_uring_submit(&mCtx->ring);
77 if (ret < 0) {
78 LOG(ERROR) << "EnqueueMultishotRecvmsg failed: ret: " << ret;
79 return false;
80 }
81 return true;
82 }
83
AllocateAndRegisterBuffers(size_t num_buffers,size_t buf_size)84 bool IOUringSocketHandler::AllocateAndRegisterBuffers(size_t num_buffers, size_t buf_size) {
85 num_buffers_ = num_buffers;
86 control_len_ = CMSG_ALIGN(sizeof(struct ucred)) + sizeof(struct cmsghdr);
87
88 buffer_size_ = sizeof(struct io_uring_recvmsg_out) + control_len_ + buf_size;
89
90 for (size_t i = 0; i < num_buffers_; i++) {
91 std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(buffer_size_);
92 buffers_.push_back(std::move(buffer));
93 }
94 return RegisterBuffers();
95 }
96
RegisterBuffers()97 bool IOUringSocketHandler::RegisterBuffers() {
98 int ret = 0;
99 br_ = io_uring_setup_buf_ring(&mCtx->ring, num_buffers_, bgid_, 0, &ret);
100 if (!br_) {
101 LOG(ERROR) << "io_uring_setup_buf_ring failed with error: " << ret;
102 return false;
103 }
104 for (size_t i = 0; i < num_buffers_; i++) {
105 void* buffer = buffers_[i].get();
106 io_uring_buf_ring_add(br_, buffer, buffer_size_, i, io_uring_buf_ring_mask(num_buffers_),
107 i);
108 }
109 io_uring_buf_ring_advance(br_, num_buffers_);
110 LOG(DEBUG) << "RegisterBuffers success: " << num_buffers_;
111 registered_buffers_ = true;
112 return true;
113 }
114
DeRegisterBuffers()115 void IOUringSocketHandler::DeRegisterBuffers() {
116 if (registered_buffers_) {
117 io_uring_free_buf_ring(&mCtx->ring, br_, num_buffers_, bgid_);
118 registered_buffers_ = false;
119 }
120 buffers_.clear();
121 num_buffers_ = 0;
122 control_len_ = 0;
123 buffer_size_ = 0;
124 }
125
SetupIoUring(int queue_size)126 bool IOUringSocketHandler::SetupIoUring(int queue_size) {
127 mCtx = std::unique_ptr<uring_context>(new uring_context());
128 struct io_uring_params params = {};
129
130 // COOP_TASKRUN - No IPI to logd
131 // SINGLE_ISSUER - Only one thread is doing the work on the ring
132 // TASKRUN_FLAG - we use peek_cqe - Hence, trigger task work if required
133 // DEFER_TASKRUN - trigger task work when CQE is explicitly polled
134 params.flags |= (IORING_SETUP_COOP_TASKRUN | IORING_SETUP_SINGLE_ISSUER |
135 IORING_SETUP_TASKRUN_FLAG | IORING_SETUP_DEFER_TASKRUN);
136
137 int ret = io_uring_queue_init_params(queue_size + 1, &mCtx->ring, ¶ms);
138 if (ret) {
139 LOG(ERROR) << "io_uring_queue_init_params failed with ret: " << ret;
140 return false;
141 } else {
142 LOG(INFO) << "io_uring_queue_init_params success";
143 }
144
145 ring_setup_ = true;
146 return true;
147 }
148
ReleaseBuffer()149 void IOUringSocketHandler::ReleaseBuffer() {
150 if (active_buffer_id_ == -1) {
151 return;
152 }
153
154 // Put the buffer back to the pool
155 io_uring_buf_ring_add(br_, buffers_[active_buffer_id_].get(), buffer_size_, active_buffer_id_,
156 io_uring_buf_ring_mask(num_buffers_), 0);
157 io_uring_buf_ring_cq_advance(&mCtx->ring, br_, 1);
158 active_buffer_id_ = -1;
159
160 // If there are no more CQE data, re-arm the SQE
161 bool is_more_cqe = (cqe->flags & IORING_CQE_F_MORE);
162 if (!is_more_cqe) {
163 EnqueueMultishotRecvmsg();
164 }
165 }
166
ReceiveData(void ** payload,size_t & payload_len,struct ucred ** cred)167 void IOUringSocketHandler::ReceiveData(void** payload, size_t& payload_len, struct ucred** cred) {
168 if (io_uring_peek_cqe(&mCtx->ring, &cqe) < 0) {
169 int ret = io_uring_wait_cqe(&mCtx->ring, &cqe);
170 if (ret) {
171 LOG(ERROR) << "WaitCqe failed: " << ret;
172 EnqueueMultishotRecvmsg();
173 return;
174 }
175 }
176
177 if (cqe->res < 0) {
178 io_uring_cqe_seen(&mCtx->ring, cqe);
179 EnqueueMultishotRecvmsg();
180 return;
181 }
182
183 active_buffer_id_ = cqe->flags >> IORING_CQE_BUFFER_SHIFT;
184
185 void* this_recv = buffers_[active_buffer_id_].get();
186 struct io_uring_recvmsg_out* o = io_uring_recvmsg_validate(this_recv, cqe->res, &msg);
187
188 if (!o) {
189 return;
190 }
191
192 struct cmsghdr* cmsg;
193 cmsg = io_uring_recvmsg_cmsg_firsthdr(o, &msg);
194
195 struct ucred* cr = nullptr;
196 while (cmsg != nullptr) {
197 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_CREDENTIALS) {
198 cr = (struct ucred*)CMSG_DATA(cmsg);
199 break;
200 }
201 cmsg = io_uring_recvmsg_cmsg_nexthdr(o, &msg, cmsg);
202 }
203
204 *payload = io_uring_recvmsg_payload(o, &msg);
205 payload_len = io_uring_recvmsg_payload_length(o, cqe->res, &msg);
206 *cred = cr;
207 }
208