• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3 
4 #include "lib.h"
5 #include "syscall.h"
6 #include "liburing.h"
7 #include "int_flags.h"
8 #include "liburing/sanitize.h"
9 #include "liburing/compat.h"
10 #include "liburing/io_uring.h"
11 
12 /*
13  * Returns true if we're not using SQ thread (thus nobody submits but us)
14  * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
15  * awakened. For the latter case, we set the thread wakeup flag.
16  * If no SQEs are ready for submission, returns false.
17  */
sq_ring_needs_enter(struct io_uring * ring,unsigned submit,unsigned * flags)18 static inline bool sq_ring_needs_enter(struct io_uring *ring,
19 				       unsigned submit,
20 				       unsigned *flags)
21 {
22 	if (!submit)
23 		return false;
24 
25 	if (!(ring->flags & IORING_SETUP_SQPOLL))
26 		return true;
27 
28 	/*
29 	 * Ensure the kernel can see the store to the SQ tail before we read
30 	 * the flags.
31 	 */
32 	io_uring_smp_mb();
33 
34 	if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
35 			   IORING_SQ_NEED_WAKEUP)) {
36 		*flags |= IORING_ENTER_SQ_WAKEUP;
37 		return true;
38 	}
39 
40 	return false;
41 }
42 
cq_ring_needs_flush(struct io_uring * ring)43 static inline bool cq_ring_needs_flush(struct io_uring *ring)
44 {
45 	return IO_URING_READ_ONCE(*ring->sq.kflags) &
46 				 (IORING_SQ_CQ_OVERFLOW | IORING_SQ_TASKRUN);
47 }
48 
cq_ring_needs_enter(struct io_uring * ring)49 static inline bool cq_ring_needs_enter(struct io_uring *ring)
50 {
51 	return (ring->flags & IORING_SETUP_IOPOLL) || cq_ring_needs_flush(ring);
52 }
53 
54 struct get_data {
55 	unsigned submit;
56 	unsigned wait_nr;
57 	unsigned get_flags;
58 	int sz;
59 	int has_ts;
60 	void *arg;
61 };
62 
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)63 static int _io_uring_get_cqe(struct io_uring *ring,
64 			     struct io_uring_cqe **cqe_ptr,
65 			     struct get_data *data)
66 {
67 	struct io_uring_cqe *cqe = NULL;
68 	bool looped = false;
69 	int err = 0;
70 
71 	do {
72 		bool need_enter = false;
73 		unsigned flags = ring_enter_flags(ring);
74 		unsigned nr_available;
75 		int ret;
76 
77 		ret = __io_uring_peek_cqe(ring, &cqe, &nr_available);
78 		if (ret) {
79 			if (!err)
80 				err = ret;
81 			break;
82 		}
83 		if (!cqe && !data->wait_nr && !data->submit) {
84 			/*
85 			 * If we already looped once, we already entered
86 			 * the kernel. Since there's nothing to submit or
87 			 * wait for, don't keep retrying.
88 			 */
89 			if (looped || !cq_ring_needs_enter(ring)) {
90 				if (!err)
91 					err = -EAGAIN;
92 				break;
93 			}
94 			need_enter = true;
95 		}
96 		if (data->wait_nr > nr_available || need_enter) {
97 			flags |= IORING_ENTER_GETEVENTS | data->get_flags;
98 			need_enter = true;
99 		}
100 		if (sq_ring_needs_enter(ring, data->submit, &flags))
101 			need_enter = true;
102 		if (!need_enter)
103 			break;
104 		if (looped && data->has_ts) {
105 			struct io_uring_getevents_arg *arg = data->arg;
106 
107 			if (!cqe && arg->ts && !err)
108 				err = -ETIME;
109 			break;
110 		}
111 
112 		ret = __sys_io_uring_enter2(ring->enter_ring_fd, data->submit,
113 					    data->wait_nr, flags, data->arg,
114 					    data->sz);
115 		if (ret < 0) {
116 			if (!err)
117 				err = ret;
118 			break;
119 		}
120 
121 		data->submit -= ret;
122 		if (cqe)
123 			break;
124 		if (!looped) {
125 			looped = true;
126 			err = ret;
127 		}
128 	} while (1);
129 
130 	*cqe_ptr = cqe;
131 	return err;
132 }
133 
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)134 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
135 		       unsigned submit, unsigned wait_nr, sigset_t *sigmask)
136 {
137 	struct get_data data = {
138 		.submit		= submit,
139 		.wait_nr 	= wait_nr,
140 		.get_flags	= 0,
141 		.sz		= _NSIG / 8,
142 		.arg		= sigmask,
143 	};
144 
145 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
146 }
147 
io_uring_get_events(struct io_uring * ring)148 int io_uring_get_events(struct io_uring *ring)
149 {
150 	int flags = IORING_ENTER_GETEVENTS | ring_enter_flags(ring);
151 
152 	return __sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
153 }
154 
155 /*
156  * Fill in an array of IO completions up to count, if any are available.
157  * Returns the amount of IO completions filled.
158  */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)159 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
160 				 struct io_uring_cqe **cqes, unsigned count)
161 {
162 	unsigned ready;
163 	bool overflow_checked = false;
164 	int shift = 0;
165 
166 	if (ring->flags & IORING_SETUP_CQE32)
167 		shift = 1;
168 
169 again:
170 	ready = io_uring_cq_ready(ring);
171 	if (ready) {
172 		unsigned head = *ring->cq.khead;
173 		unsigned mask = ring->cq.ring_mask;
174 		unsigned last;
175 		int i = 0;
176 
177 		count = count > ready ? ready : count;
178 		last = head + count;
179 		for (;head != last; head++, i++)
180 			cqes[i] = &ring->cq.cqes[(head & mask) << shift];
181 
182 		return count;
183 	}
184 
185 	if (overflow_checked)
186 		return 0;
187 
188 	if (cq_ring_needs_flush(ring)) {
189 		io_uring_get_events(ring);
190 		overflow_checked = true;
191 		goto again;
192 	}
193 
194 	return 0;
195 }
196 
197 /*
198  * Sync internal state with kernel ring state on the SQ side. Returns the
199  * number of pending items in the SQ ring, for the shared ring.
200  */
__io_uring_flush_sq(struct io_uring * ring)201 static unsigned __io_uring_flush_sq(struct io_uring *ring)
202 {
203 	struct io_uring_sq *sq = &ring->sq;
204 	unsigned tail = sq->sqe_tail;
205 
206 	if (sq->sqe_head != tail) {
207 		sq->sqe_head = tail;
208 		/*
209 		 * Ensure kernel sees the SQE updates before the tail update.
210 		 */
211 		if (!(ring->flags & IORING_SETUP_SQPOLL))
212 			*sq->ktail = tail;
213 		else
214 			io_uring_smp_store_release(sq->ktail, tail);
215 	}
216 	/*
217 	* This load needs to be atomic, since sq->khead is written concurrently
218 	* by the kernel, but it doesn't need to be load_acquire, since the
219 	* kernel doesn't store to the submission queue; it advances khead just
220 	* to indicate that it's finished reading the submission queue entries
221 	* so they're available for us to write to.
222 	*/
223 	return tail - IO_URING_READ_ONCE(*sq->khead);
224 }
225 
226 /*
227  * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
228  * more efficiently than queueing an internal timeout command.
229  */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,unsigned int min_wait_usec,sigset_t * sigmask)230 static int io_uring_wait_cqes_new(struct io_uring *ring,
231 				  struct io_uring_cqe **cqe_ptr,
232 				  unsigned wait_nr,
233 				  struct __kernel_timespec *ts,
234 				  unsigned int min_wait_usec,
235 				  sigset_t *sigmask)
236 {
237 	struct io_uring_getevents_arg arg = {
238 		.sigmask	= (unsigned long) sigmask,
239 		.sigmask_sz	= _NSIG / 8,
240 		.ts		= (unsigned long) ts
241 	};
242 	struct get_data data = {
243 		.wait_nr	= wait_nr,
244 		.get_flags	= IORING_ENTER_EXT_ARG,
245 		.sz		= sizeof(arg),
246 		.has_ts		= ts != NULL,
247 		.arg		= &arg
248 	};
249 
250 	if (min_wait_usec && ring->features & IORING_FEAT_MIN_TIMEOUT)
251 		arg.min_wait_usec = min_wait_usec;
252 
253 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
254 }
255 
256 /*
257  * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
258  * that an sqe is used internally to handle the timeout. For kernel doesn't
259  * support IORING_FEAT_EXT_ARG, applications using this function must never
260  * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
261  *
262  * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
263  * specified, the application need not call io_uring_submit() before
264  * calling this function, as we will do that on its behalf. From this it also
265  * follows that this function isn't safe to use for applications that split SQ
266  * and CQ handling between two threads and expect that to work without
267  * synchronization, as this function manipulates both the SQ and CQ side.
268  *
269  * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
270  * hence this function is safe to use for applications that split SQ and CQ
271  * handling between two threads.
272  */
__io_uring_submit_timeout(struct io_uring * ring,unsigned wait_nr,struct __kernel_timespec * ts)273 static int __io_uring_submit_timeout(struct io_uring *ring, unsigned wait_nr,
274 				     struct __kernel_timespec *ts)
275 {
276 	struct io_uring_sqe *sqe;
277 	int ret;
278 
279 	/*
280 	 * If the SQ ring is full, we may need to submit IO first
281 	 */
282 	sqe = io_uring_get_sqe(ring);
283 	if (!sqe) {
284 		ret = io_uring_submit(ring);
285 		if (ret < 0)
286 			return ret;
287 		sqe = io_uring_get_sqe(ring);
288 		if (!sqe)
289 			return -EAGAIN;
290 	}
291 	io_uring_prep_timeout(sqe, ts, wait_nr, 0);
292 	sqe->user_data = LIBURING_UDATA_TIMEOUT;
293 	return __io_uring_flush_sq(ring);
294 }
295 
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)296 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
297 		       unsigned wait_nr, struct __kernel_timespec *ts,
298 		       sigset_t *sigmask)
299 {
300 	int to_submit = 0;
301 
302 	if (ts) {
303 		if (ring->features & IORING_FEAT_EXT_ARG)
304 			return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
305 							ts, 0, sigmask);
306 		to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
307 		if (to_submit < 0)
308 			return to_submit;
309 	}
310 
311 	return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
312 }
313 
io_uring_wait_cqes_min_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,unsigned int min_wait_usec,sigset_t * sigmask)314 int io_uring_wait_cqes_min_timeout(struct io_uring *ring,
315 				   struct io_uring_cqe **cqe_ptr,
316 				   unsigned wait_nr,
317 				   struct __kernel_timespec *ts,
318 				   unsigned int min_wait_usec, sigset_t *sigmask)
319 {
320 	return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr, ts, min_wait_usec,
321 					sigmask);
322 }
323 
__io_uring_submit_and_wait_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,unsigned int min_wait,sigset_t * sigmask)324 static int __io_uring_submit_and_wait_timeout(struct io_uring *ring,
325 			struct io_uring_cqe **cqe_ptr, unsigned wait_nr,
326 			struct __kernel_timespec *ts,
327 			unsigned int min_wait, sigset_t *sigmask)
328 {
329 	int to_submit;
330 
331 	if (ts) {
332 		if (ring->features & IORING_FEAT_EXT_ARG) {
333 			struct io_uring_getevents_arg arg = {
334 				.sigmask	= (unsigned long) sigmask,
335 				.sigmask_sz	= _NSIG / 8,
336 				.min_wait_usec	= min_wait,
337 				.ts		= (unsigned long) ts
338 			};
339 			struct get_data data = {
340 				.submit		= __io_uring_flush_sq(ring),
341 				.wait_nr	= wait_nr,
342 				.get_flags	= IORING_ENTER_EXT_ARG,
343 				.sz		= sizeof(arg),
344 				.has_ts		= ts != NULL,
345 				.arg		= &arg
346 			};
347 
348 			return _io_uring_get_cqe(ring, cqe_ptr, &data);
349 		}
350 		to_submit = __io_uring_submit_timeout(ring, wait_nr, ts);
351 		if (to_submit < 0)
352 			return to_submit;
353 	} else
354 		to_submit = __io_uring_flush_sq(ring);
355 
356 	return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
357 }
358 
io_uring_submit_and_wait_min_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,unsigned min_wait,sigset_t * sigmask)359 int io_uring_submit_and_wait_min_timeout(struct io_uring *ring,
360 					 struct io_uring_cqe **cqe_ptr,
361 					 unsigned wait_nr,
362 					 struct __kernel_timespec *ts,
363 					 unsigned min_wait,
364 					 sigset_t *sigmask)
365 {
366 	if (!(ring->features & IORING_FEAT_MIN_TIMEOUT))
367 		return -EINVAL;
368 	return __io_uring_submit_and_wait_timeout(ring, cqe_ptr, wait_nr, ts,
369 						  min_wait, sigmask);
370 }
371 
io_uring_submit_and_wait_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)372 int io_uring_submit_and_wait_timeout(struct io_uring *ring,
373 				     struct io_uring_cqe **cqe_ptr,
374 				     unsigned wait_nr,
375 				     struct __kernel_timespec *ts,
376 				     sigset_t *sigmask)
377 {
378 	return __io_uring_submit_and_wait_timeout(ring, cqe_ptr, wait_nr, ts, 0,
379 						  sigmask);
380 }
381 
382 /*
383  * See io_uring_wait_cqes() - this function is the same, it just always uses
384  * '1' as the wait_nr.
385  */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)386 int io_uring_wait_cqe_timeout(struct io_uring *ring,
387 			      struct io_uring_cqe **cqe_ptr,
388 			      struct __kernel_timespec *ts)
389 {
390 	return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
391 }
392 
393 /*
394  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
395  *
396  * Returns number of sqes submitted
397  */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr,bool getevents)398 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
399 			     unsigned wait_nr, bool getevents)
400 {
401 	bool cq_needs_enter = getevents || wait_nr || cq_ring_needs_enter(ring);
402 	unsigned flags = ring_enter_flags(ring);
403 	int ret;
404 
405 	liburing_sanitize_ring(ring);
406 
407 	if (sq_ring_needs_enter(ring, submitted, &flags) || cq_needs_enter) {
408 		if (cq_needs_enter)
409 			flags |= IORING_ENTER_GETEVENTS;
410 
411 		ret = __sys_io_uring_enter(ring->enter_ring_fd, submitted,
412 					   wait_nr, flags, NULL);
413 	} else
414 		ret = submitted;
415 
416 	return ret;
417 }
418 
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)419 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
420 {
421 	return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr, false);
422 }
423 
424 /*
425  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
426  *
427  * Returns number of sqes submitted
428  */
io_uring_submit(struct io_uring * ring)429 int io_uring_submit(struct io_uring *ring)
430 {
431 	return __io_uring_submit_and_wait(ring, 0);
432 }
433 
434 /*
435  * Like io_uring_submit(), but allows waiting for events as well.
436  *
437  * Returns number of sqes submitted
438  */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)439 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
440 {
441 	return __io_uring_submit_and_wait(ring, wait_nr);
442 }
443 
io_uring_submit_and_get_events(struct io_uring * ring)444 int io_uring_submit_and_get_events(struct io_uring *ring)
445 {
446 	return __io_uring_submit(ring, __io_uring_flush_sq(ring), 0, true);
447 }
448 
449 #ifdef LIBURING_INTERNAL
io_uring_get_sqe(struct io_uring * ring)450 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
451 {
452 	return _io_uring_get_sqe(ring);
453 }
454 #endif
455 
__io_uring_sqring_wait(struct io_uring * ring)456 int __io_uring_sqring_wait(struct io_uring *ring)
457 {
458 	int flags = IORING_ENTER_SQ_WAIT | ring_enter_flags(ring);
459 
460 	return __sys_io_uring_enter(ring->enter_ring_fd, 0, 0, flags, NULL);
461 }
462