• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /* SPDX-License-Identifier: MIT */
2 #define _POSIX_C_SOURCE 200112L
3 
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <sys/mman.h>
7 #include <unistd.h>
8 #include <errno.h>
9 #include <string.h>
10 #include <stdbool.h>
11 
12 #include "liburing/compat.h"
13 #include "liburing/io_uring.h"
14 #include "liburing.h"
15 #include "liburing/barrier.h"
16 
17 #include "syscall.h"
18 
19 /*
20  * Returns true if we're not using SQ thread (thus nobody submits but us)
21  * or if IORING_SQ_NEED_WAKEUP is set, so submit thread must be explicitly
22  * awakened. For the latter case, we set the thread wakeup flag.
23  */
sq_ring_needs_enter(struct io_uring * ring,unsigned * flags)24 static inline bool sq_ring_needs_enter(struct io_uring *ring, unsigned *flags)
25 {
26 	if (!(ring->flags & IORING_SETUP_SQPOLL))
27 		return true;
28 
29 	if (uring_unlikely(IO_URING_READ_ONCE(*ring->sq.kflags) &
30 			   IORING_SQ_NEED_WAKEUP)) {
31 		*flags |= IORING_ENTER_SQ_WAKEUP;
32 		return true;
33 	}
34 
35 	return false;
36 }
37 
cq_ring_needs_flush(struct io_uring * ring)38 static inline bool cq_ring_needs_flush(struct io_uring *ring)
39 {
40 	return IO_URING_READ_ONCE(*ring->sq.kflags) & IORING_SQ_CQ_OVERFLOW;
41 }
42 
__io_uring_peek_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned * nr_available)43 static int __io_uring_peek_cqe(struct io_uring *ring,
44 			       struct io_uring_cqe **cqe_ptr,
45 			       unsigned *nr_available)
46 {
47 	struct io_uring_cqe *cqe;
48 	int err = 0;
49 	unsigned available;
50 	unsigned mask = *ring->cq.kring_mask;
51 
52 	do {
53 		unsigned tail = io_uring_smp_load_acquire(ring->cq.ktail);
54 		unsigned head = *ring->cq.khead;
55 
56 		cqe = NULL;
57 		available = tail - head;
58 		if (!available)
59 			break;
60 
61 		cqe = &ring->cq.cqes[head & mask];
62 		if (!(ring->features & IORING_FEAT_EXT_ARG) &&
63 				cqe->user_data == LIBURING_UDATA_TIMEOUT) {
64 			if (cqe->res < 0)
65 				err = cqe->res;
66 			io_uring_cq_advance(ring, 1);
67 			if (!err)
68 				continue;
69 			cqe = NULL;
70 		}
71 
72 		break;
73 	} while (1);
74 
75 	*cqe_ptr = cqe;
76 	*nr_available = available;
77 	return err;
78 }
79 
80 struct get_data {
81 	unsigned submit;
82 	unsigned wait_nr;
83 	unsigned get_flags;
84 	int sz;
85 	void *arg;
86 };
87 
_io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct get_data * data)88 static int _io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
89 			     struct get_data *data)
90 {
91 	struct io_uring_cqe *cqe = NULL;
92 	int err;
93 
94 	do {
95 		bool need_enter = false;
96 		bool cq_overflow_flush = false;
97 		unsigned flags = 0;
98 		unsigned nr_available;
99 		int ret;
100 
101 		err = __io_uring_peek_cqe(ring, &cqe, &nr_available);
102 		if (err)
103 			break;
104 		if (!cqe && !data->wait_nr && !data->submit) {
105 			if (!cq_ring_needs_flush(ring)) {
106 				err = -EAGAIN;
107 				break;
108 			}
109 			cq_overflow_flush = true;
110 		}
111 		if (data->wait_nr > nr_available || cq_overflow_flush) {
112 			flags = IORING_ENTER_GETEVENTS | data->get_flags;
113 			need_enter = true;
114 		}
115 		if (data->submit) {
116 			sq_ring_needs_enter(ring, &flags);
117 			need_enter = true;
118 		}
119 		if (!need_enter)
120 			break;
121 
122 		ret = __sys_io_uring_enter2(ring->ring_fd, data->submit,
123 				data->wait_nr, flags, data->arg,
124 				data->sz);
125 		if (ret < 0) {
126 			err = -errno;
127 			break;
128 		}
129 
130 		data->submit -= ret;
131 		if (cqe)
132 			break;
133 	} while (1);
134 
135 	*cqe_ptr = cqe;
136 	return err;
137 }
138 
__io_uring_get_cqe(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned submit,unsigned wait_nr,sigset_t * sigmask)139 int __io_uring_get_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
140 		       unsigned submit, unsigned wait_nr, sigset_t *sigmask)
141 {
142 	struct get_data data = {
143 		.submit		= submit,
144 		.wait_nr 	= wait_nr,
145 		.get_flags	= 0,
146 		.sz		= _NSIG / 8,
147 		.arg		= sigmask,
148 	};
149 
150 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
151 }
152 
153 /*
154  * Fill in an array of IO completions up to count, if any are available.
155  * Returns the amount of IO completions filled.
156  */
io_uring_peek_batch_cqe(struct io_uring * ring,struct io_uring_cqe ** cqes,unsigned count)157 unsigned io_uring_peek_batch_cqe(struct io_uring *ring,
158 				 struct io_uring_cqe **cqes, unsigned count)
159 {
160 	unsigned ready;
161 	bool overflow_checked = false;
162 
163 again:
164 	ready = io_uring_cq_ready(ring);
165 	if (ready) {
166 		unsigned head = *ring->cq.khead;
167 		unsigned mask = *ring->cq.kring_mask;
168 		unsigned last;
169 		int i = 0;
170 
171 		count = count > ready ? ready : count;
172 		last = head + count;
173 		for (;head != last; head++, i++)
174 			cqes[i] = &ring->cq.cqes[head & mask];
175 
176 		return count;
177 	}
178 
179 	if (overflow_checked)
180 		goto done;
181 
182 	if (cq_ring_needs_flush(ring)) {
183 		__sys_io_uring_enter(ring->ring_fd, 0, 0,
184 				     IORING_ENTER_GETEVENTS, NULL);
185 		overflow_checked = true;
186 		goto again;
187 	}
188 
189 done:
190 	return 0;
191 }
192 
193 /*
194  * Sync internal state with kernel ring state on the SQ side. Returns the
195  * number of pending items in the SQ ring, for the shared ring.
196  */
__io_uring_flush_sq(struct io_uring * ring)197 int __io_uring_flush_sq(struct io_uring *ring)
198 {
199 	struct io_uring_sq *sq = &ring->sq;
200 	const unsigned mask = *sq->kring_mask;
201 	unsigned ktail = *sq->ktail;
202 	unsigned to_submit = sq->sqe_tail - sq->sqe_head;
203 
204 	if (!to_submit)
205 		goto out;
206 
207 	/*
208 	 * Fill in sqes that we have queued up, adding them to the kernel ring
209 	 */
210 	do {
211 		sq->array[ktail & mask] = sq->sqe_head & mask;
212 		ktail++;
213 		sq->sqe_head++;
214 	} while (--to_submit);
215 
216 	/*
217 	 * Ensure that the kernel sees the SQE updates before it sees the tail
218 	 * update.
219 	 */
220 	io_uring_smp_store_release(sq->ktail, ktail);
221 out:
222 	/*
223 	 * This _may_ look problematic, as we're not supposed to be reading
224 	 * SQ->head without acquire semantics. When we're in SQPOLL mode, the
225 	 * kernel submitter could be updating this right now. For non-SQPOLL,
226 	 * task itself does it, and there's no potential race. But even for
227 	 * SQPOLL, the load is going to be potentially out-of-date the very
228 	 * instant it's done, regardless or whether or not it's done
229 	 * atomically. Worst case, we're going to be over-estimating what
230 	 * we can submit. The point is, we need to be able to deal with this
231 	 * situation regardless of any perceived atomicity.
232 	 */
233 	return ktail - *sq->khead;
234 }
235 
236 /*
237  * If we have kernel support for IORING_ENTER_EXT_ARG, then we can use that
238  * more efficiently than queueing an internal timeout command.
239  */
io_uring_wait_cqes_new(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)240 static int io_uring_wait_cqes_new(struct io_uring *ring,
241 				  struct io_uring_cqe **cqe_ptr,
242 				  unsigned wait_nr, struct __kernel_timespec *ts,
243 				  sigset_t *sigmask)
244 {
245 	struct io_uring_getevents_arg arg = {
246 		.sigmask	= (unsigned long) sigmask,
247 		.sigmask_sz	= _NSIG / 8,
248 		.ts		= (unsigned long) ts
249 	};
250 	struct get_data data = {
251 		.submit		= __io_uring_flush_sq(ring),
252 		.wait_nr	= wait_nr,
253 		.get_flags	= IORING_ENTER_EXT_ARG,
254 		.sz		= sizeof(arg),
255 		.arg		= &arg
256 	};
257 
258 	return _io_uring_get_cqe(ring, cqe_ptr, &data);
259 }
260 
261 /*
262  * Like io_uring_wait_cqe(), except it accepts a timeout value as well. Note
263  * that an sqe is used internally to handle the timeout. For kernel doesn't
264  * support IORING_FEAT_EXT_ARG, applications using this function must never
265  * set sqe->user_data to LIBURING_UDATA_TIMEOUT!
266  *
267  * For kernels without IORING_FEAT_EXT_ARG (5.10 and older), if 'ts' is
268  * specified, the application need not call io_uring_submit() before
269  * calling this function, as we will do that on its behalf. From this it also
270  * follows that this function isn't safe to use for applications that split SQ
271  * and CQ handling between two threads and expect that to work without
272  * synchronization, as this function manipulates both the SQ and CQ side.
273  *
274  * For kernels with IORING_FEAT_EXT_ARG, no implicit submission is done and
275  * hence this function is safe to use for applications that split SQ and CQ
276  * handling between two threads.
277  */
io_uring_wait_cqes(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,unsigned wait_nr,struct __kernel_timespec * ts,sigset_t * sigmask)278 int io_uring_wait_cqes(struct io_uring *ring, struct io_uring_cqe **cqe_ptr,
279 		       unsigned wait_nr, struct __kernel_timespec *ts,
280 		       sigset_t *sigmask)
281 {
282 	unsigned to_submit = 0;
283 
284 	if (ts) {
285 		struct io_uring_sqe *sqe;
286 		int ret;
287 
288 		if (ring->features & IORING_FEAT_EXT_ARG)
289 			return io_uring_wait_cqes_new(ring, cqe_ptr, wait_nr,
290 							ts, sigmask);
291 
292 		/*
293 		 * If the SQ ring is full, we may need to submit IO first
294 		 */
295 		sqe = io_uring_get_sqe(ring);
296 		if (!sqe) {
297 			ret = io_uring_submit(ring);
298 			if (ret < 0)
299 				return ret;
300 			sqe = io_uring_get_sqe(ring);
301 			if (!sqe)
302 				return -EAGAIN;
303 		}
304 		io_uring_prep_timeout(sqe, ts, wait_nr, 0);
305 		sqe->user_data = LIBURING_UDATA_TIMEOUT;
306 		to_submit = __io_uring_flush_sq(ring);
307 	}
308 
309 	return __io_uring_get_cqe(ring, cqe_ptr, to_submit, wait_nr, sigmask);
310 }
311 
312 /*
313  * See io_uring_wait_cqes() - this function is the same, it just always uses
314  * '1' as the wait_nr.
315  */
io_uring_wait_cqe_timeout(struct io_uring * ring,struct io_uring_cqe ** cqe_ptr,struct __kernel_timespec * ts)316 int io_uring_wait_cqe_timeout(struct io_uring *ring,
317 			      struct io_uring_cqe **cqe_ptr,
318 			      struct __kernel_timespec *ts)
319 {
320 	return io_uring_wait_cqes(ring, cqe_ptr, 1, ts, NULL);
321 }
322 
323 /*
324  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
325  *
326  * Returns number of sqes submitted
327  */
__io_uring_submit(struct io_uring * ring,unsigned submitted,unsigned wait_nr)328 static int __io_uring_submit(struct io_uring *ring, unsigned submitted,
329 			     unsigned wait_nr)
330 {
331 	unsigned flags;
332 	int ret;
333 
334 	flags = 0;
335 	if (sq_ring_needs_enter(ring, &flags) || wait_nr) {
336 		if (wait_nr || (ring->flags & IORING_SETUP_IOPOLL))
337 			flags |= IORING_ENTER_GETEVENTS;
338 
339 		ret = __sys_io_uring_enter(ring->ring_fd, submitted, wait_nr,
340 						flags, NULL);
341 		if (ret < 0)
342 			return -errno;
343 	} else
344 		ret = submitted;
345 
346 	return ret;
347 }
348 
__io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)349 static int __io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
350 {
351 	return __io_uring_submit(ring, __io_uring_flush_sq(ring), wait_nr);
352 }
353 
354 /*
355  * Submit sqes acquired from io_uring_get_sqe() to the kernel.
356  *
357  * Returns number of sqes submitted
358  */
io_uring_submit(struct io_uring * ring)359 int io_uring_submit(struct io_uring *ring)
360 {
361 	return __io_uring_submit_and_wait(ring, 0);
362 }
363 
364 /*
365  * Like io_uring_submit(), but allows waiting for events as well.
366  *
367  * Returns number of sqes submitted
368  */
io_uring_submit_and_wait(struct io_uring * ring,unsigned wait_nr)369 int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr)
370 {
371 	return __io_uring_submit_and_wait(ring, wait_nr);
372 }
373 
374 /*
375  * Return an sqe to fill. Application must later call io_uring_submit()
376  * when it's ready to tell the kernel about it. The caller may call this
377  * function multiple times before calling io_uring_submit().
378  *
379  * Returns a vacant sqe, or NULL if we're full.
380  */
io_uring_get_sqe(struct io_uring * ring)381 struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)
382 {
383 	struct io_uring_sq *sq = &ring->sq;
384 	unsigned int head = io_uring_smp_load_acquire(sq->khead);
385 	unsigned int next = sq->sqe_tail + 1;
386 	struct io_uring_sqe *sqe = NULL;
387 
388 	if (next - head <= *sq->kring_entries) {
389 		sqe = &sq->sqes[sq->sqe_tail & *sq->kring_mask];
390 		sq->sqe_tail = next;
391 	}
392 	return sqe;
393 }
394 
__io_uring_sqring_wait(struct io_uring * ring)395 int __io_uring_sqring_wait(struct io_uring *ring)
396 {
397 	int ret;
398 
399 	ret = __sys_io_uring_enter(ring->ring_fd, 0, 0, IORING_ENTER_SQ_WAIT,
400 					NULL);
401 	if (ret < 0)
402 		ret = -errno;
403 	return ret;
404 }
405