1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3
4 #include "lib.h"
5 #include "syscall.h"
6 #include "liburing.h"
7 #include "int_flags.h"
8 #include "liburing/compat.h"
9 #include "liburing/io_uring.h"
10
11 /*
12 * Returns true if we're not using SQ thread (thus nobody submits but us)
13 * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
14 * awakened. For the latter case, we set the thread wakeup flag.
15 * If no SQEs are ready for submission, returns false.
16 */
sq_ring_needs_enter(struct io_uring * ring,unsigned submit,unsigned * flags)17 static inline bool sq_ring_needs_enter(struct io_uring *ring,
18 unsigned submit,
19 unsigned *flags)
20 {
21 if (!submit)
22 return false;
23
24 if (!(ring->flags & IORING_SETUP_SQPOLL))
25 return true;
26
27 /*
28 * Ensure the kernel can see the store to the SQ tail before we read
29 * the flags.
30 */
31 io_uring_smp_mb();
32
33 if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
34 IORING_SQ_NEED_WAKEUP)) {
35 *flags |= IORING_ENTER_SQ_WAKEUP;
36 return true;
37 }
38
39 return false;
40 }
41
cq_ring_needs_flush(struct io_uring * ring)42 static inline bool cq_ring_needs_flush(struct io_uring *ring)
43 {
44 return IO_URING_READ_ONCE(*ring->sq.kflags) &
45 (IORING_SQ_CQ_OVERFLOW | IORING_SQ_TASKRUN);
46 }
47
cq_ring_needs_enter(struct io_uring * ring)48 static inline bool cq_ring_needs_enter(struct io_uring *ring)
49 {
50 return (ring->flags & IORING_SETUP_IOPOLL) || cq_ring_needs_flush(ring);
51 }
52
53 struct get_data {
54 unsigned submit;
55 unsigned wait_nr;
56 unsigned get_flags;
57 int sz;
58 int has_ts;
59 void *arg;
60 };
61
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)62 static int _io_uring_get_cqe(struct io_uring *ring,
63 struct io_uring_cqe **cqe_ptr,
64 struct get_data *data)
65 {
66 struct io_uring_cqe *cqe = NULL;
67 bool looped = false;
68 int err = 0;
69
70 do {
71 bool need_enter = false;
72 unsigned flags = 0;
73 unsigned nr_available;
74 int ret;
75
76 ret = __io_uring_peek_cqe(ring, &cqe, &nr_available);
77 if (ret) {
78 if (!err)
79 err = ret;
80 break;
81 }
82 if (!cqe && !data->wait_nr && !data->submit) {
83 /*
84 * If we already looped once, we already entered
85 * the kernel. Since there's nothing to submit or
86 * wait for, don't keep retrying.
87 */
88 if (looped || !cq_ring_needs_enter(ring)) {
89 if (!err)
90 err = -EAGAIN;
91 break;
92 }
93 need_enter = true;
94 }
95 if (data->wait_nr > nr_available || need_enter) {
96 flags = IORING_ENTER_GETEVENTS | data->get_flags;
97 need_enter = true;
98 }
99 if (sq_ring_needs_enter(ring, data->submit, &flags))
100 need_enter = true;
101 if (!need_enter)
102 break;
103 if (looped && data->has_ts) {
104 struct io_uring_getevents_arg *arg = data->arg;
105
106 if (!cqe && arg->ts && !err)
107 err = -ETIME;
108 break;
109 }
110
111 if (ring->int_flags & INT_FLAG_REG_RING)
112 flags |= IORING_ENTER_REGISTERED_RING;
113 ret = __sys_io_uring_enter2(ring->enter_ring_fd, data->submit,
114 data->wait_nr, flags, data->arg,
115 data->sz);
116 if (ret < 0) {
117 if (!err)
118 err = ret;
119 break;
120 }
121
122 data->submit -= ret;
123 if (cqe)
124 break;
125 if (!looped) {
126 looped = true;
127 err = ret;
128 }
129 } while (1);
130
131 *cqe_ptr = cqe;
132 return err;
133 }
134
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)135 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
136 unsigned submit, unsigned wait_nr, sigset_t *sigmask)
137 {
138 struct get_data data = {
139 .submit = submit,
140 .wait_nr = wait_nr,
141 .get_flags = 0,
142 .sz = _NSIG / 8,
143 .arg = sigmask,
144 };
145
146 return _io_uring_get_cqe(ring, cqe_ptr, &data);
147 }
148
io_uring_get_events(struct io_uring * ring)149 int io_uring_get_events(struct io_uring *ring)
150 {
151 int flags = IORING_ENTER_GETEVENTS;
152
153 if (ring->int_flags & INT_FLAG_REG_RING)
154 flags |= IORING_ENTER_REGISTERED_RING;
155 return __sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
156 }
157
158 /*
159 * Fill in an array of IO completions up to count, if any are available.
160 * Returns the amount of IO completions filled.
161 */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)162 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
163 struct io_uring_cqe **cqes, unsigned count)
164 {
165 unsigned ready;
166 bool overflow_checked = false;
167 int shift = 0;
168
169 if (ring->flags & IORING_SETUP_CQE32)
170 shift = 1;
171
172 again:
173 ready = io_uring_cq_ready(ring);
174 if (ready) {
175 unsigned head = *ring->cq.khead;
176 unsigned mask = ring->cq.ring_mask;
177 unsigned last;
178 int i = 0;
179
180 count = count > ready ? ready : count;
181 last = head + count;
182 for (;head != last; head++, i++)
183 cqes[i] = &ring->cq.cqes[(head & mask) << shift];
184
185 return count;
186 }
187
188 if (overflow_checked)
189 return 0;
190
191 if (cq_ring_needs_flush(ring)) {
192 io_uring_get_events(ring);
193 overflow_checked = true;
194 goto again;
195 }
196
197 return 0;
198 }
199
200 /*
201 * Sync internal state with kernel ring state on the SQ side. Returns the
202 * number of pending items in the SQ ring, for the shared ring.
203 */
__io_uring_flush_sq(struct io_uring * ring)204 static unsigned __io_uring_flush_sq(struct io_uring *ring)
205 {
206 struct io_uring_sq *sq = &ring->sq;
207 unsigned tail = sq->sqe_tail;
208
209 if (sq->sqe_head != tail) {
210 sq->sqe_head = tail;
211 /*
212 * Ensure kernel sees the SQE updates before the tail update.
213 */
214 if (!(ring->flags & IORING_SETUP_SQPOLL))
215 *sq->ktail = tail;
216 else
217 io_uring_smp_store_release(sq->ktail, tail);
218 }
219 /*
220 * This load needs to be atomic, since sq->khead is written concurrently
221 * by the kernel, but it doesn't need to be load_acquire, since the
222 * kernel doesn't store to the submission queue; it advances khead just
223 * to indicate that it's finished reading the submission queue entries
224 * so they're available for us to write to.
225 */
226 return tail - IO_URING_READ_ONCE(*sq->khead);
227 }
228
229 /*
230 * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
231 * more efficiently than queueing an internal timeout command.
232 */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)233 static int io_uring_wait_cqes_new(struct io_uring *ring,
234 struct io_uring_cqe **cqe_ptr,
235 unsigned wait_nr,
236 struct __kernel_timespec *ts,
237 sigset_t *sigmask)
238 {
239 struct io_uring_getevents_arg arg = {
240 .sigmask = (unsigned long) sigmask,
241 .sigmask_sz = _NSIG / 8,
242 .ts = (unsigned long) ts
243 };
244 struct get_data data = {
245 .wait_nr = wait_nr,
246 .get_flags = IORING_ENTER_EXT_ARG,
247 .sz = sizeof(arg),
248 .has_ts = ts != NULL,
249 .arg = &arg
250 };
251
252 return _io_uring_get_cqe(ring, cqe_ptr, &data);
253 }
254
255 /*
256 * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
257 * that an sqe is used internally to handle the timeout. For kernel doesn't
258 * support IORING_FEAT_EXT_ARG, applications using this function must never
259 * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
260 *
261 * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
262 * specified, the application need not call io_uring_submit() before
263 * calling this function, as we will do that on its behalf. From this it also
264 * follows that this function isn't safe to use for applications that split SQ
265 * and CQ handling between two threads and expect that to work without
266 * synchronization, as this function manipulates both the SQ and CQ side.
267 *
268 * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
269 * hence this function is safe to use for applications that split SQ and CQ
270 * handling between two threads.
271 */
__io_uring_submit_timeout(struct io_uring * ring,unsigned wait_nr,struct __kernel_timespec * ts)272 static int __io_uring_submit_timeout(struct io_uring *ring, unsigned wait_nr,
273 struct __kernel_timespec *ts)
274 {
275 struct io_uring_sqe *sqe;
276 int ret;
277
278 /*
279 * If the SQ ring is full, we may need to submit IO first
280 */
281 sqe = io_uring_get_sqe(ring);
282 if (!sqe) {
283 ret = io_uring_submit(ring);
284 if (ret < 0)
285 return ret;
286 sqe = io_uring_get_sqe(ring);
287 if (!sqe)
288 return -EAGAIN;
289 }
290 io_uring_prep_timeout(sqe, ts, wait_nr, 0);
291 sqe->user_data = LIBURING_UDATA_TIMEOUT;
292 return __io_uring_flush_sq(ring);
293 }
294
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)295 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
296 unsigned wait_nr, struct __kernel_timespec *ts,
297 sigset_t *sigmask)
298 {
299 int to_submit = 0;
300
301 if (ts) {
302 if (ring->features & IORING_FEAT_EXT_ARG)
303 return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
304 ts, sigmask);
305 to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
306 if (to_submit < 0)
307 return to_submit;
308 }
309
310 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
311 }
312
io_uring_submit_and_wait_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)313 int io_uring_submit_and_wait_timeout(struct io_uring *ring,
314 struct io_uring_cqe **cqe_ptr,
315 unsigned wait_nr,
316 struct __kernel_timespec *ts,
317 sigset_t *sigmask)
318 {
319 int to_submit;
320
321 if (ts) {
322 if (ring->features & IORING_FEAT_EXT_ARG) {
323 struct io_uring_getevents_arg arg = {
324 .sigmask = (unsigned long) sigmask,
325 .sigmask_sz = _NSIG / 8,
326 .ts = (unsigned long) ts
327 };
328 struct get_data data = {
329 .submit = __io_uring_flush_sq(ring),
330 .wait_nr = wait_nr,
331 .get_flags = IORING_ENTER_EXT_ARG,
332 .sz = sizeof(arg),
333 .has_ts = ts != NULL,
334 .arg = &arg
335 };
336
337 return _io_uring_get_cqe(ring, cqe_ptr, &data);
338 }
339 to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
340 if (to_submit < 0)
341 return to_submit;
342 } else
343 to_submit = __io_uring_flush_sq(ring);
344
345 return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
346 }
347
348 /*
349 * See io_uring_wait_cqes() - this function is the same, it just always uses
350 * '1' as the wait_nr.
351 */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)352 int io_uring_wait_cqe_timeout(struct io_uring *ring,
353 struct io_uring_cqe **cqe_ptr,
354 struct __kernel_timespec *ts)
355 {
356 return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
357 }
358
359 /*
360 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
361 *
362 * Returns number of sqes submitted
363 */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr,bool getevents)364 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
365 unsigned wait_nr, bool getevents)
366 {
367 bool cq_needs_enter = getevents || wait_nr || cq_ring_needs_enter(ring);
368 unsigned flags;
369 int ret;
370
371 flags = 0;
372 if (sq_ring_needs_enter(ring, submitted, &flags) || cq_needs_enter) {
373 if (cq_needs_enter)
374 flags |= IORING_ENTER_GETEVENTS;
375 if (ring->int_flags & INT_FLAG_REG_RING)
376 flags |= IORING_ENTER_REGISTERED_RING;
377
378 ret = __sys_io_uring_enter(ring->enter_ring_fd, submitted,
379 wait_nr, flags, NULL);
380 } else
381 ret = submitted;
382
383 return ret;
384 }
385
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)386 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
387 {
388 return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr, false);
389 }
390
391 /*
392 * Submit sqes acquired from io_uring_get_sqe() to the kernel.
393 *
394 * Returns number of sqes submitted
395 */
io_uring_submit(struct io_uring * ring)396 int io_uring_submit(struct io_uring *ring)
397 {
398 return __io_uring_submit_and_wait(ring, 0);
399 }
400
401 /*
402 * Like io_uring_submit(), but allows waiting for events as well.
403 *
404 * Returns number of sqes submitted
405 */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)406 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
407 {
408 return __io_uring_submit_and_wait(ring, wait_nr);
409 }
410
io_uring_submit_and_get_events(struct io_uring * ring)411 int io_uring_submit_and_get_events(struct io_uring *ring)
412 {
413 return __io_uring_submit(ring, __io_uring_flush_sq(ring), 0, true);
414 }
415
416 #ifdef LIBURING_INTERNAL
io_uring_get_sqe(struct io_uring * ring)417 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
418 {
419 return _io_uring_get_sqe(ring);
420 }
421 #endif
422
__io_uring_sqring_wait(struct io_uring * ring)423 int __io_uring_sqring_wait(struct io_uring *ring)
424 {
425 int flags = IORING_ENTER_SQ_WAIT;
426
427 if (ring->int_flags & INT_FLAG_REG_RING)
428 flags |= IORING_ENTER_REGISTERED_RING;
429
430 return __sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
431 }
432