1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <sys/mman.h>
7 #include <unistd.h>
8 #include <errno.h>
9 #include <string.h>
10 #include <stdbool.h>
11
12 #include "liburing/compat.h"
13 #include "liburing/io_uring.h"
14 #include "liburing.h"
15 #include "liburing/barrier.h"
16
17 #include "syscall.h"
18
19 /*
20 * Returns true if we're not using SQ thread (thus nobody submits but us)
21 * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
22 * awakened. For the latter case, we set the thread wakeup flag.
23 */
sq_ring_needs_enter(struct io_uring * ring,unsigned * flags)24 static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags)
25 {
26 if (!(ring->flags & IORING_SETUP_SQPOLL))
27 return true;
28
29 if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
30 IORING_SQ_NEED_WAKEUP)) {
31 *flags |= IORING_ENTER_SQ_WAKEUP;
32 return true;
33 }
34
35 return false;
36 }
37
cq_ring_needs_flush(struct io_uring * ring)38 static inline bool cq_ring_needs_flush(struct io_uring *ring)
39 {
40 return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW;
41 }
42
__io_uring_peek_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned * nr_available)43 static int __io_uring_peek_cqe(struct io_uring *ring,
44 struct io_uring_cqe **cqe_ptr,
45 unsigned *nr_available)
46 {
47 struct io_uring_cqe *cqe;
48 int err = 0;
49 unsigned available;
50 unsigned mask = *ring->cq.kring_mask;
51
52 do {
53 unsigned tail = io_uring_smp_load_acquire(ring->cq.ktail);
54 unsigned head = *ring->cq.khead;
55
56 cqe = NULL;
57 available = tail - head;
58 if (!available)
59 break;
60
61 cqe = &ring->cq.cqes[head & mask];
62 if (!(ring->features & IORING_FEAT_EXT_ARG) &&
63 cqe->user_data == LIBURING_UDATA_TIMEOUT) {
64 if (cqe->res < 0)
65 err = cqe->res;
66 io_uring_cq_advance(ring, 1);
67 if (!err)
68 continue;
69 cqe = NULL;
70 }
71
72 break;
73 } while (1);
74
75 *cqe_ptr = cqe;
76 *nr_available = available;
77 return err;
78 }
79
80 struct get_data {
81 unsigned submit;
82 unsigned wait_nr;
83 unsigned get_flags;
84 int sz;
85 void *arg;
86 };
87
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)88 static int _io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
89 struct get_data *data)
90 {
91 struct io_uring_cqe *cqe = NULL;
92 int err;
93
94 do {
95 bool need_enter = false;
96 bool cq_overflow_flush = false;
97 unsigned flags = 0;
98 unsigned nr_available;
99 int ret;
100
101 err = __io_uring_peek_cqe(ring, &cqe, &nr_available);
102 if (err)
103 break;
104 if (!cqe && !data->wait_nr && !data->submit) {
105 if (!cq_ring_needs_flush(ring)) {
106 err = -EAGAIN;
107 break;
108 }
109 cq_overflow_flush = true;
110 }
111 if (data->wait_nr > nr_available || cq_overflow_flush) {
112 flags = IORING_ENTER_GETEVENTS | data->get_flags;
113 need_enter = true;
114 }
115 if (data->submit) {
116 sq_ring_needs_enter(ring, &flags);
117 need_enter = true;
118 }
119 if (!need_enter)
120 break;
121
122 ret = __sys_io_uring_enter2(ring->ring_fd, data->submit,
123 data->wait_nr, flags, data->arg,
124 data->sz);
125 if (ret < 0) {
126 err = -errno;
127 break;
128 }
129
130 data->submit -= ret;
131 if (cqe)
132 break;
133 } while (1);
134
135 *cqe_ptr = cqe;
136 return err;
137 }
138
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)139 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
140 unsigned submit, unsigned wait_nr, sigset_t *sigmask)
141 {
142 struct get_data data = {
143 .submit = submit,
144 .wait_nr = wait_nr,
145 .get_flags = 0,
146 .sz = _NSIG / 8,
147 .arg = sigmask,
148 };
149
150 return _io_uring_get_cqe(ring, cqe_ptr, &data);
151 }
152
153 /*
154 * Fill in an array of IO completions up to count, if any are available.
155 * Returns the amount of IO completions filled.
156 */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)157 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
158 struct io_uring_cqe **cqes, unsigned count)
159 {
160 unsigned ready;
161 bool overflow_checked = false;
162
163 again:
164 ready = io_uring_cq_ready(ring);
165 if (ready) {
166 unsigned head = *ring->cq.khead;
167 unsigned mask = *ring->cq.kring_mask;
168 unsigned last;
169 int i = 0;
170
171 count = count > ready ? ready : count;
172 last = head + count;
173 for (;head != last; head++, i++)
174 cqes[i] = &ring->cq.cqes[head & mask];
175
176 return count;
177 }
178
179 if (overflow_checked)
180 goto done;
181
182 if (cq_ring_needs_flush(ring)) {
183 __sys_io_uring_enter(ring->ring_fd, 0, 0,
184 IORING_ENTER_GETEVENTS, NULL);
185 overflow_checked = true;
186 goto again;
187 }
188
189 done:
190 return 0;
191 }
192
193 /*
194 * Sync internal state with kernel ring state on the SQ side. Returns the
195 * number of pending items in the SQ ring, for the shared ring.
196 */
__io_uring_flush_sq(struct io_uring * ring)197 int __io_uring_flush_sq(struct io_uring *ring)
198 {
199 struct io_uring_sq *sq = &ring->sq;
200 const unsigned mask = *sq->kring_mask;
201 unsigned ktail = *sq->ktail;
202 unsigned to_submit = sq->sqe_tail - sq->sqe_head;
203
204 if (!to_submit)
205 goto out;
206
207 /*
208 * Fill in sqes that we have queued up, adding them to the kernel ring
209 */
210 do {
211 sq->array[ktail & mask] = sq->sqe_head & mask;
212 ktail++;
213 sq->sqe_head++;
214 } while (--to_submit);
215
216 /*
217 * Ensure that the kernel sees the SQE updates before it sees the tail
218 * update.
219 */
220 io_uring_smp_store_release(sq->ktail, ktail);
221 out:
222 /*
223 * This _may_ look problematic, as we're not supposed to be reading
224 * SQ->head without acquire semantics. When we're in SQPOLL mode, the
225 * kernel submitter could be updating this right now. For non-SQPOLL,
226 * task itself does it, and there's no potential race. But even for
227 * SQPOLL, the load is going to be potentially out-of-date the very
228 * instant it's done, regardless or whether or not it's done
229 * atomically. Worst case, we're going to be over-estimating what
230 * we can submit. The point is, we need to be able to deal with this
231 * situation regardless of any perceived atomicity.
232 */
233 return ktail - *sq->khead;
234 }
235
236 /*
237 * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
238 * more efficiently than queueing an internal timeout command.
239 */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)240 static int io_uring_wait_cqes_new(struct io_uring *ring,
241 struct io_uring_cqe **cqe_ptr,
242 unsigned wait_nr, struct __kernel_timespec *ts,
243 sigset_t *sigmask)
244 {
245 struct io_uring_getevents_arg arg = {
246 .sigmask = (unsigned long) sigmask,
247 .sigmask_sz = _NSIG / 8,
248 .ts = (unsigned long) ts
249 };
250 struct get_data data = {
251 .submit = __io_uring_flush_sq(ring),
252 .wait_nr = wait_nr,
253 .get_flags = IORING_ENTER_EXT_ARG,
254 .sz = sizeof(arg),
255 .arg = &arg
256 };
257
258 return _io_uring_get_cqe(ring, cqe_ptr, &data);
259 }
260
261 /*
262 * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
263 * that an sqe is used internally to handle the timeout. For kernel doesn't
264 * support IORING_FEAT_EXT_ARG, applications using this function must never
265 * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
266 *
267 * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
268 * specified, the application need not call io_uring_submit() before
269 * calling this function, as we will do that on its behalf. From this it also
270 * follows that this function isn't safe to use for applications that split SQ
271 * and CQ handling between two threads and expect that to work without
272 * synchronization, as this function manipulates both the SQ and CQ side.
273 *
274 * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
275 * hence this function is safe to use for applications that split SQ and CQ
276 * handling between two threads.
277 */
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)278 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
279 unsigned wait_nr, struct __kernel_timespec *ts,
280 sigset_t *sigmask)
281 {
282 unsigned to_submit = 0;
283
284 if (ts) {
285 struct io_uring_sqe *sqe;
286 int ret;
287
288 if (ring->features & IORING_FEAT_EXT_ARG)
289 return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
290 ts, sigmask);
291
292 /*
293 * If the SQ ring is full, we may need to submit IO first
294 */
295 sqe = io_uring_get_sqe(ring);
296 if (!sqe) {
297 ret = io_uring_submit(ring);
298 if (ret < 0)
299 return ret;
300 sqe = io_uring_get_sqe(ring);
301 if (!sqe)
302 return -EAGAIN;
303 }
304 io_uring_prep_timeout(sqe, ts, wait_nr, 0);
305 sqe->user_data = LIBURING_UDATA_TIMEOUT;
306 to_submit = __io_uring_flush_sq(ring);
307 }
308
309 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
310 }
311
312 /*
313 * See io_uring_wait_cqes() - this function is the same, it just always uses
314 * '1' as the wait_nr.
315 */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)316 int io_uring_wait_cqe_timeout(struct io_uring *ring,
317 struct io_uring_cqe **cqe_ptr,
318 struct __kernel_timespec *ts)
319 {
320 return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
321 }
322
323 /*
324 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
325 *
326 * Returns number of sqes submitted
327 */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr)328 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
329 unsigned wait_nr)
330 {
331 unsigned flags;
332 int ret;
333
334 flags = 0;
335 if (sq_ring_needs_enter(ring, &flags) || wait_nr) {
336 if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL))
337 flags |= IORING_ENTER_GETEVENTS;
338
339 ret = __sys_io_uring_enter(ring->ring_fd, submitted, wait_nr,
340 flags, NULL);
341 if (ret < 0)
342 return -errno;
343 } else
344 ret = submitted;
345
346 return ret;
347 }
348
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)349 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
350 {
351 return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr);
352 }
353
354 /*
355 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
356 *
357 * Returns number of sqes submitted
358 */
io_uring_submit(struct io_uring * ring)359 int io_uring_submit(struct io_uring *ring)
360 {
361 return __io_uring_submit_and_wait(ring, 0);
362 }
363
364 /*
365 * Like io_uring_submit(), but allows waiting for events as well.
366 *
367 * Returns number of sqes submitted
368 */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)369 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
370 {
371 return __io_uring_submit_and_wait(ring, wait_nr);
372 }
373
374 /*
375 * Return an sqe to fill. Application must later call io_uring_submit()
376 * when it's ready to tell the kernel about it. The caller may call this
377 * function multiple times before calling io_uring_submit().
378 *
379 * Returns a vacant sqe, or NULL if we're full.
380 */
io_uring_get_sqe(struct io_uring * ring)381 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
382 {
383 struct io_uring_sq *sq = &ring->sq;
384 unsigned int head = io_uring_smp_load_acquire(sq->khead);
385 unsigned int next = sq->sqe_tail + 1;
386 struct io_uring_sqe *sqe = NULL;
387
388 if (next - head <= *sq->kring_entries) {
389 sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask];
390 sq->sqe_tail = next;
391 }
392 return sqe;
393 }
394
__io_uring_sqring_wait(struct io_uring * ring)395 int __io_uring_sqring_wait(struct io_uring *ring)
396 {
397 int ret;
398
399 ret = __sys_io_uring_enter(ring->ring_fd, 0, 0, IORING_ENTER_SQ_WAIT,
400 NULL);
401 if (ret < 0)
402 ret = -errno;
403 return ret;
404 }
405