• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: MIT or LGPL-2.1-only
2 
3 #include <config.h>
4 #include <sys/mman.h>
5 #include <sys/time.h>
6 #include <sys/resource.h>
7 
8 #include "ublksrv_priv.h"
9 #include "ublksrv_aio.h"
10 
ublksrv_is_recovering(const struct ublksrv_ctrl_dev * ctrl_dev)11 bool ublksrv_is_recovering(const struct ublksrv_ctrl_dev *ctrl_dev)
12 {
13 	return ctrl_dev->tgt_argc == -1;
14 }
15 
ublksrv_get_iod(const struct _ublksrv_queue * q,int tag)16 static inline struct ublksrv_io_desc *ublksrv_get_iod(
17 		const struct _ublksrv_queue *q, int tag)
18 {
19         return (struct ublksrv_io_desc *)
20                 &(q->io_cmd_buf[tag * sizeof(struct ublksrv_io_desc)]);
21 }
22 
23 /*
24  * /dev/ublkbN shares same lifetime with the ublk io daemon:
25  *
26  * 1) IO from /dev/ublkbN is handled by the io daemon directly
27  *
28  * 2) io cmd buffer is allocated from ublk driver, mapped to
29  * io daemon vm space via mmap, and each hw queue has its own
30  * io cmd buffer
31  *
32  * 3) io buffers are pre-allocated from the io daemon and pass
33  * to ublk driver via io command, meantime ublk driver may choose
34  * to pin these user pages before starting device
35  *
36  * Each /dev/ublkcN is owned by only one io daemon, and can't be
37  * opened by other daemon. And the io daemon uses its allocated
38  * io_uring to communicate with ublk driver.
39  *
40  * For each request of /dev/ublkbN, the io daemon submits one
41  * sqe for both fetching IO from ublk driver and commiting IO result
42  * to ublk driver, and the io daemon has to issue all sqes
43  * to /dev/ublkcN before sending START_DEV to /dev/udc-control.
44  *
45  * After STOP_DEV is sent to /dev/udc-control, udc driver needs
46  * to freeze the request queue, and completes all pending sqes,
47  * meantime tell the io daemon via cqe->res that don't issue seq
48  * any more, also delete /dev/ublkbN.  After io daemon figures out
49  * all sqes have been free, exit itself. Then STOP_DEV returns.
50  */
51 
52 /*
53  * If ublksrv queue is idle in the past 20 seconds, start to discard
54  * pages mapped to io buffer via madivise(MADV_DONTNEED), so these
55  * pages can be available for others without needing swap out
56  */
57 #define UBLKSRV_IO_IDLE_SECS    20
58 
__ublksrv_tgt_init(struct _ublksrv_dev * dev,const char * type_name,const struct ublksrv_tgt_type * ops,int type,int argc,char * argv[])59 static int __ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name,
60 		const struct ublksrv_tgt_type *ops, int type,
61 		int argc, char *argv[])
62 {
63 	struct ublksrv_tgt_info *tgt = &dev->tgt;
64 	int ret;
65 
66 	if (!ops)
67 		return -EINVAL;
68 
69 	if (strcmp(ops->name, type_name))
70 		return -EINVAL;
71 
72 	if (!ops->handle_io_async)
73 		return -EINVAL;
74 	if (!ops->alloc_io_buf ^ !ops->free_io_buf)
75 		return -EINVAL;
76 
77 	optind = 0;     /* so that we can parse our arguments */
78 	tgt->ops = ops;
79 
80 	if (!ublksrv_is_recovering(dev->ctrl_dev)) {
81 		if (ops->init_tgt)
82 			ret = ops->init_tgt(local_to_tdev(dev), type, argc, argv);
83 		else
84 			ret = 0;
85 	} else {
86 		if (ops->recovery_tgt)
87 			ret = ops->recovery_tgt(local_to_tdev(dev), type);
88 		else
89 			ret = -ENOTSUP;
90 	}
91 	if (ret) {
92 		tgt->ops = NULL;
93 		return ret;
94 	}
95 	return 0;
96 }
97 
ublksrv_tgt_init(struct _ublksrv_dev * dev,const char * type_name,const struct ublksrv_tgt_type * ops,int argc,char * argv[])98 static int ublksrv_tgt_init(struct _ublksrv_dev *dev, const char *type_name,
99 		const struct ublksrv_tgt_type *ops,
100 		int argc, char *argv[])
101 {
102 	if (type_name == NULL)
103 		return -EINVAL;
104 
105 	if (ops)
106 		return __ublksrv_tgt_init(dev, type_name, ops,
107 				ops->type, argc, argv);
108 
109 	return -EINVAL;
110 }
111 
ublksrv_tgt_exit(struct ublksrv_tgt_info * tgt)112 static inline void ublksrv_tgt_exit(struct ublksrv_tgt_info *tgt)
113 {
114 	int i;
115 
116 	for (i = 1; i < tgt->nr_fds; i++)
117 		close(tgt->fds[i]);
118 }
119 
ublksrv_tgt_deinit(struct _ublksrv_dev * dev)120 static void ublksrv_tgt_deinit(struct _ublksrv_dev *dev)
121 {
122 	struct ublksrv_tgt_info *tgt = &dev->tgt;
123 
124 	ublksrv_tgt_exit(tgt);
125 
126 	if (tgt->ops && tgt->ops->deinit_tgt)
127 		tgt->ops->deinit_tgt(local_to_tdev(dev));
128 }
129 
ublksrv_queue_io_cmd(struct _ublksrv_queue * q,struct ublk_io * io,unsigned tag)130 static inline int ublksrv_queue_io_cmd(struct _ublksrv_queue *q,
131 		struct ublk_io *io, unsigned tag)
132 {
133 	struct ublksrv_io_cmd *cmd;
134 	struct io_uring_sqe *sqe;
135 	unsigned int cmd_op = 0;
136 	__u64 user_data;
137 
138 	/* only freed io can be issued */
139 	if (!(io->flags & UBLKSRV_IO_FREE))
140 		return 0;
141 
142 	/* we issue because we need either fetching or committing */
143 	if (!(io->flags &
144 		(UBLKSRV_NEED_FETCH_RQ | UBLKSRV_NEED_GET_DATA |
145 		 UBLKSRV_NEED_COMMIT_RQ_COMP)))
146 		return 0;
147 
148 	if (io->flags & UBLKSRV_NEED_GET_DATA)
149 		cmd_op = UBLK_IO_NEED_GET_DATA;
150 	else if (io->flags & UBLKSRV_NEED_COMMIT_RQ_COMP)
151 		cmd_op = UBLK_IO_COMMIT_AND_FETCH_REQ;
152 	else if (io->flags & UBLKSRV_NEED_FETCH_RQ)
153 		cmd_op = UBLK_IO_FETCH_REQ;
154 
155 	sqe = io_uring_get_sqe(&q->ring);
156 	if (!sqe) {
157 		ublk_err("%s: run out of sqe %d, tag %d\n",
158 				__func__, q->q_id, tag);
159 		return -1;
160 	}
161 
162 	cmd = (struct ublksrv_io_cmd *)ublksrv_get_sqe_cmd(sqe);
163 
164 	if (cmd_op == UBLK_IO_COMMIT_AND_FETCH_REQ)
165 		cmd->result = io->result;
166 
167 	if (q->state & UBLKSRV_QUEUE_IOCTL_OP)
168 		cmd_op = _IOWR('u', _IOC_NR(cmd_op), struct ublksrv_io_cmd);
169 
170 	/* These fields should be written once, never change */
171 	ublksrv_set_sqe_cmd_op(sqe, cmd_op);
172 	sqe->fd		= 0;	/*dev->cdev_fd*/
173 	sqe->opcode	=  IORING_OP_URING_CMD;
174 	sqe->flags	= IOSQE_FIXED_FILE;
175 	sqe->rw_flags	= 0;
176 	cmd->tag	= tag;
177 	if (!(q->state & UBLKSRV_USER_COPY))
178 		cmd->addr	= (__u64)io->buf_addr;
179 	else
180 		cmd->addr	= 0;
181 	cmd->q_id	= q->q_id;
182 
183 	user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, 0);
184 	io_uring_sqe_set_data64(sqe, user_data);
185 
186 	io->flags = 0;
187 
188 	q->cmd_inflight += 1;
189 
190 	ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x stopping %d\n",
191 			__func__, q->q_id, tag, cmd_op,
192 			io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING));
193 	return 1;
194 }
195 
ublksrv_complete_io(const struct ublksrv_queue * tq,unsigned tag,int res)196 int ublksrv_complete_io(const struct ublksrv_queue *tq, unsigned tag, int res)
197 {
198 	struct _ublksrv_queue *q = tq_to_local(tq);
199 
200 	struct ublk_io *io = &q->ios[tag];
201 
202 	ublksrv_mark_io_done(io, res);
203 
204 	return ublksrv_queue_io_cmd(q, io, tag);
205 }
206 
207 /*
208  * eventfd is treated as special target IO which has to be queued
209  * when queue is setup
210  */
__ublksrv_queue_event(struct _ublksrv_queue * q)211 static inline int __ublksrv_queue_event(struct _ublksrv_queue *q)
212 {
213 	if (q->efd >= 0) {
214 		struct io_uring_sqe *sqe;
215 		__u64 user_data = build_eventfd_data();
216 
217 		if (q->state & UBLKSRV_QUEUE_STOPPING)
218 			return -EINVAL;
219 
220 		sqe = io_uring_get_sqe(&q->ring);
221 		if (!sqe) {
222 			ublk_err("%s: queue %d run out of sqe\n",
223 				__func__, q->q_id);
224 			return -1;
225 		}
226 
227 		io_uring_prep_poll_add(sqe, q->efd, POLLIN);
228 		io_uring_sqe_set_data64(sqe, user_data);
229 	}
230 	return 0;
231 }
232 
233 /*
234  * This API is supposed to be called in ->handle_event() after current
235  * events are handled.
236  */
ublksrv_queue_handled_event(const struct ublksrv_queue * tq)237 int ublksrv_queue_handled_event(const struct ublksrv_queue *tq)
238 {
239 	struct _ublksrv_queue *q = tq_to_local(tq);
240 
241 	if (q->efd >= 0) {
242 		uint64_t data;
243 		const int cnt = sizeof(uint64_t);
244 
245 		/* read has to be done, otherwise poll event won't be stopped */
246 		if (read(q->efd, &data, cnt) != cnt)
247 			ublk_err("%s: read wrong bytes from eventfd\n",
248 					__func__);
249 		/*
250 		 * event needs to be issued immediately, since other io may rely
251 		 * it
252 		 */
253 		if (!__ublksrv_queue_event(q))
254 			io_uring_submit_and_wait(&q->ring, 0);
255 	}
256 	return 0;
257 }
258 
259 /*
260  * Send event to io command uring context, so that the queue pthread
261  * can be waken up for handling io, then ->handle_event() will be
262  * called to notify target code.
263  *
264  * This API is usually called from other context.
265  */
ublksrv_queue_send_event(const struct ublksrv_queue * tq)266 int ublksrv_queue_send_event(const struct ublksrv_queue *tq)
267 {
268 	struct _ublksrv_queue *q = tq_to_local(tq);
269 
270 	if (q->efd >= 0) {
271 		uint64_t data = 1;
272 		const int cnt = sizeof(uint64_t);
273 
274 		if (write(q->efd, &data, cnt) != cnt) {
275 			ublk_err("%s: wrote wrong bytes to eventfd\n",
276 					__func__);
277 			return -EPIPE;
278 		}
279 	}
280 	return 0;
281 }
282 
283 /*
284  * Issue all available commands to /dev/ublkcN  and the exact cmd is figured
285  * out in queue_io_cmd with help of each io->status.
286  *
287  * todo: queue io commands with batching
288  */
ublksrv_submit_fetch_commands(struct _ublksrv_queue * q)289 static void ublksrv_submit_fetch_commands(struct _ublksrv_queue *q)
290 {
291 	int i = 0;
292 
293 	for (i = 0; i < q->q_depth; i++)
294 		ublksrv_queue_io_cmd(q, &q->ios[i], i);
295 
296 	__ublksrv_queue_event(q);
297 }
298 
ublksrv_queue_is_done(struct _ublksrv_queue * q)299 static int ublksrv_queue_is_done(struct _ublksrv_queue *q)
300 {
301 	return (q->state & UBLKSRV_QUEUE_STOPPING) &&
302 		!io_uring_sq_ready(&q->ring);
303 }
304 
305 /* used for allocating zero copy vma space */
ublk_queue_single_io_buf_size(struct _ublksrv_dev * dev)306 static inline int ublk_queue_single_io_buf_size(struct _ublksrv_dev *dev)
307 {
308 	unsigned max_io_sz = dev->ctrl_dev->dev_info.max_io_buf_bytes;
309 	unsigned int page_sz = getpagesize();
310 
311 	return round_up(max_io_sz, page_sz);
312 }
ublk_queue_io_buf_size(struct _ublksrv_dev * dev)313 static inline int ublk_queue_io_buf_size(struct _ublksrv_dev *dev)
314 {
315 	unsigned depth = dev->ctrl_dev->dev_info.queue_depth;
316 
317 	return ublk_queue_single_io_buf_size(dev) * depth;
318 }
ublk_io_buf_size(struct _ublksrv_dev * dev)319 static inline int ublk_io_buf_size(struct _ublksrv_dev *dev)
320 {
321 	unsigned nr_queues = dev->ctrl_dev->dev_info.nr_hw_queues;
322 
323 	return ublk_queue_io_buf_size(dev) * nr_queues;
324 }
325 
ublksrv_queue_cmd_buf_sz(struct _ublksrv_queue * q)326 static int ublksrv_queue_cmd_buf_sz(struct _ublksrv_queue *q)
327 {
328 	int size =  q->q_depth * sizeof(struct ublksrv_io_desc);
329 	unsigned int page_sz = getpagesize();
330 
331 	return round_up(size, page_sz);
332 }
333 
queue_max_cmd_buf_sz(void)334 static int queue_max_cmd_buf_sz(void)
335 {
336 	unsigned int page_sz = getpagesize();
337 
338 	return round_up(UBLK_MAX_QUEUE_DEPTH * sizeof(struct ublksrv_io_desc),
339 			page_sz);
340 }
341 
ublksrv_queue_unconsumed_cqes(const struct ublksrv_queue * tq)342 int ublksrv_queue_unconsumed_cqes(const struct ublksrv_queue *tq)
343 {
344 	if (tq->ring_ptr)
345 		return io_uring_cq_ready(tq->ring_ptr);
346 
347 	return -1;
348 }
349 
ublksrv_queue_deinit(const struct ublksrv_queue * tq)350 void ublksrv_queue_deinit(const struct ublksrv_queue *tq)
351 {
352 	struct _ublksrv_queue *q = tq_to_local(tq);
353 	int i;
354 	int nr_ios = q->dev->tgt.extra_ios + q->q_depth;
355 
356 	if (q->dev->tgt.ops->deinit_queue)
357 		q->dev->tgt.ops->deinit_queue(tq);
358 
359 	if (q->efd >= 0)
360 		close(q->efd);
361 
362 	io_uring_unregister_ring_fd(&q->ring);
363 
364 	if (q->ring.ring_fd > 0) {
365 		io_uring_unregister_files(&q->ring);
366 		close(q->ring.ring_fd);
367 		q->ring.ring_fd = -1;
368 	}
369 	if (q->io_cmd_buf) {
370 		munmap(q->io_cmd_buf, ublksrv_queue_cmd_buf_sz(q));
371 		q->io_cmd_buf = NULL;
372 	}
373 	for (i = 0; i < nr_ios; i++) {
374 		if (q->ios[i].buf_addr) {
375 			if (q->dev->tgt.ops->free_io_buf)
376 				q->dev->tgt.ops->free_io_buf(tq,
377 						q->ios[i].buf_addr, i);
378 			else
379 				free(q->ios[i].buf_addr);
380 			q->ios[i].buf_addr = NULL;
381 		}
382 		free(q->ios[i].data.private_data);
383 	}
384 	q->dev->__queues[q->q_id] = NULL;
385 	free(q);
386 
387 }
388 
ublksrv_build_cpu_str(char * buf,int len,const cpu_set_t * cpuset)389 void ublksrv_build_cpu_str(char *buf, int len, const cpu_set_t *cpuset)
390 {
391 	int nr_cores = sysconf(_SC_NPROCESSORS_ONLN);
392 	int i, offset = 0;
393 
394 	for (i = 0; i < nr_cores; i++) {
395 		int n;
396 
397 		if (!CPU_ISSET(i, cpuset))
398 			continue;
399 		n = snprintf(&buf[offset], len - offset, "%d ", i);
400 		if (n < 0 || n >= len - offset)
401 			break;
402 		offset += n;
403 	}
404 }
405 
ublksrv_set_sched_affinity(struct _ublksrv_dev * dev,unsigned short q_id)406 static void ublksrv_set_sched_affinity(struct _ublksrv_dev *dev,
407 		unsigned short q_id)
408 {
409 	const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev;
410 	unsigned dev_id = cdev->dev_info.dev_id;
411 	cpu_set_t *cpuset = ublksrv_get_queue_affinity(cdev, q_id);
412 
413 	if (sched_setaffinity(0, sizeof(cpu_set_t), cpuset) < 0)
414 		ublk_err("ublk dev %u queue %u set affinity failed",
415 				dev_id, q_id);
416 }
417 
ublksrv_kill_eventfd(struct _ublksrv_queue * q)418 static void ublksrv_kill_eventfd(struct _ublksrv_queue *q)
419 {
420 	if ((q->state & UBLKSRV_QUEUE_STOPPING) && q->efd >= 0) {
421 		uint64_t data = 1;
422 		int ret;
423 
424 		ret = write(q->efd, &data, sizeof(uint64_t));
425 		if (ret != sizeof(uint64_t))
426 			ublk_err("%s:%d write fail %d/%zu\n",
427 					__func__, __LINE__, ret, sizeof(uint64_t));
428 	}
429 }
430 
431 /*
432  * Return eventfs or negative errno
433  */
ublksrv_setup_eventfd(struct _ublksrv_queue * q)434 static int ublksrv_setup_eventfd(struct _ublksrv_queue *q)
435 {
436 	const struct ublksrv_ctrl_dev_info *info = &q->dev->ctrl_dev->dev_info;
437 
438 	if (!(info->ublksrv_flags & UBLKSRV_F_NEED_EVENTFD)) {
439 		q->efd = -1;
440 		return 0;
441 	}
442 
443 	if (q->dev->tgt.tgt_ring_depth == 0) {
444 		ublk_err("ublk dev %d queue %d zero tgt queue depth",
445 			info->dev_id, q->q_id);
446 		return -EINVAL;
447 	}
448 
449 	if (!q->dev->tgt.ops->handle_event) {
450 		ublk_err("ublk dev %d/%d not define ->handle_event",
451 			info->dev_id, q->q_id);
452 		return -EINVAL;
453 	}
454 
455 	q->efd = eventfd(0, 0);
456 	if (q->efd < 0)
457 		return -errno;
458 	return 0;
459 }
460 
ublksrv_queue_adjust_uring_io_wq_workers(struct _ublksrv_queue * q)461 static void ublksrv_queue_adjust_uring_io_wq_workers(struct _ublksrv_queue *q)
462 {
463 	struct _ublksrv_dev *dev = q->dev;
464 	unsigned int val[2] = {0, 0};
465 	int ret;
466 
467 	if (!dev->tgt.iowq_max_workers[0] && !dev->tgt.iowq_max_workers[1])
468 		return;
469 
470 	ret = io_uring_register_iowq_max_workers(&q->ring, val);
471 	if (ret)
472 		ublk_err("%s: register iowq max workers failed %d\n",
473 				__func__, ret);
474 
475 	if (!dev->tgt.iowq_max_workers[0])
476 		dev->tgt.iowq_max_workers[0] = val[0];
477 	if (!dev->tgt.iowq_max_workers[1])
478 		dev->tgt.iowq_max_workers[1] = val[1];
479 
480 	ret = io_uring_register_iowq_max_workers(&q->ring,
481 			dev->tgt.iowq_max_workers);
482 	if (ret)
483 		ublk_err("%s: register iowq max workers failed %d\n",
484 				__func__, ret);
485 }
486 
ublksrv_calculate_depths(const struct _ublksrv_dev * dev,int * ring_depth,int * cq_depth,int * nr_ios)487 static void ublksrv_calculate_depths(const struct _ublksrv_dev *dev, int
488 		*ring_depth, int *cq_depth, int *nr_ios)
489 {
490 	const struct ublksrv_ctrl_dev *cdev = dev->ctrl_dev;
491 
492 	/*
493 	 * eventfd consumes one extra sqe, and it can be thought as one target
494 	 * depth
495 	 */
496 	int aio_depth = (cdev->dev_info.ublksrv_flags & UBLKSRV_F_NEED_EVENTFD)
497 		? 1 : 0;
498 	int depth = cdev->dev_info.queue_depth;
499 	int tgt_depth = dev->tgt.tgt_ring_depth + aio_depth;
500 
501 	*nr_ios = depth + dev->tgt.extra_ios;
502 
503 	/*
504 	 * queue_depth represents the max count of io commands issued from ublk driver.
505 	 *
506 	 * After io command is fetched from ublk driver, the consumed sqe for
507 	 * fetching io command has been available for target usage, so the uring
508 	 * depth can be set as the max(queue_depth, tgt_depth).
509 	 */
510 	depth = depth > tgt_depth ? depth : tgt_depth;
511 	*ring_depth = depth;
512 	*cq_depth = dev->cq_depth ? dev->cq_depth : depth;
513 }
514 
ublksrv_queue_init(const struct ublksrv_dev * tdev,unsigned short q_id,void * queue_data)515 const struct ublksrv_queue *ublksrv_queue_init(const struct ublksrv_dev *tdev,
516 		unsigned short q_id, void *queue_data)
517 {
518 	struct io_uring_params p;
519 	struct _ublksrv_dev *dev = tdev_to_local(tdev);
520 	struct _ublksrv_queue *q;
521 	const struct ublksrv_ctrl_dev *ctrl_dev = dev->ctrl_dev;
522 	int depth = ctrl_dev->dev_info.queue_depth;
523 	int i, ret = -1;
524 	int cmd_buf_size, io_buf_size;
525 	unsigned long off;
526 	int io_data_size = round_up(dev->tgt.io_data_size,
527 			sizeof(unsigned long));
528 	int ring_depth, cq_depth, nr_ios;
529 
530 	ublksrv_calculate_depths(dev, &ring_depth, &cq_depth, &nr_ios);
531 
532 	/*
533 	 * Too many extra ios
534 	 */
535 	if (nr_ios > depth * 3)
536 		return NULL;
537 
538 	q = (struct _ublksrv_queue *)malloc(sizeof(struct _ublksrv_queue) +
539 			sizeof(struct ublk_io) * nr_ios);
540 	dev->__queues[q_id] = q;
541 
542 	q->tgt_ops = dev->tgt.ops;	//cache ops for fast path
543 	q->dev = dev;
544 	if (ctrl_dev->dev_info.flags & UBLK_F_CMD_IOCTL_ENCODE)
545 		q->state = UBLKSRV_QUEUE_IOCTL_OP;
546 	else
547 		q->state = 0;
548 	if (ctrl_dev->dev_info.flags & UBLK_F_USER_COPY)
549 		q->state |= UBLKSRV_USER_COPY;
550 	q->q_id = q_id;
551 	/* FIXME: depth has to be PO 2 */
552 	q->q_depth = depth;
553 	q->io_cmd_buf = NULL;
554 	q->cmd_inflight = 0;
555 	q->tid = ublksrv_gettid();
556 
557 	cmd_buf_size = ublksrv_queue_cmd_buf_sz(q);
558 	off = UBLKSRV_CMD_BUF_OFFSET + q_id * queue_max_cmd_buf_sz();
559 	q->io_cmd_buf = (char *)mmap(0, cmd_buf_size, PROT_READ,
560 			MAP_SHARED | MAP_POPULATE, dev->cdev_fd, off);
561 	if (q->io_cmd_buf == MAP_FAILED) {
562 		ublk_err("ublk dev %d queue %d map io_cmd_buf failed",
563 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id);
564 		goto fail;
565 	}
566 
567 	io_buf_size = ctrl_dev->dev_info.max_io_buf_bytes;
568 	for (i = 0; i < nr_ios; i++) {
569 		q->ios[i].buf_addr = NULL;
570 
571 		/* extra ios needn't to allocate io buffer */
572 		if (i >= q->q_depth)
573 			goto skip_alloc_buf;
574 
575 		if (dev->tgt.ops->alloc_io_buf)
576 			q->ios[i].buf_addr =
577 				dev->tgt.ops->alloc_io_buf(local_to_tq(q),
578 					i, io_buf_size);
579 		else
580 			if (posix_memalign((void **)&q->ios[i].buf_addr,
581 						getpagesize(), io_buf_size)) {
582 				ublk_err("ublk dev %d queue %d io %d posix_memalign failed",
583 						q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i);
584 				goto fail;
585 			}
586 		//q->ios[i].buf_addr = malloc(io_buf_size);
587 		if (!q->ios[i].buf_addr) {
588 			ublk_err("ublk dev %d queue %d io %d alloc io_buf failed",
589 					q->dev->ctrl_dev->dev_info.dev_id, q->q_id, i);
590 			goto fail;
591 		}
592 skip_alloc_buf:
593 		q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE;
594 		q->ios[i].data.private_data = malloc(io_data_size);
595 		q->ios[i].data.tag = i;
596 		if (i < q->q_depth)
597 			q->ios[i].data.iod = ublksrv_get_iod(q, i);
598 		else
599 			q->ios[i].data.iod = NULL;
600 
601 		//ublk_assert(io_data_size ^ (unsigned long)q->ios[i].data.private_data);
602 	}
603 
604 	ublksrv_setup_ring_params(&p, cq_depth,
605 			IORING_SETUP_SQE128 | IORING_SETUP_COOP_TASKRUN);
606 	ret = io_uring_queue_init_params(ring_depth, &q->ring, &p);
607 	if (ret < 0) {
608 		ublk_err("ublk dev %d queue %d setup io_uring failed %d",
609 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret);
610 		goto fail;
611 	}
612 
613 	q->ring_ptr = &q->ring;
614 
615 	ret = io_uring_register_files(&q->ring, dev->tgt.fds,
616 			dev->tgt.nr_fds + 1);
617 	if (ret) {
618 		ublk_err("ublk dev %d queue %d register files failed %d",
619 				q->dev->ctrl_dev->dev_info.dev_id, q->q_id, ret);
620 		goto fail;
621 	}
622 
623 	io_uring_register_ring_fd(&q->ring);
624 
625 	/*
626 	* N.B. PR_SET_IO_FLUSHER was added with Linux 5.6+.
627 	*/
628 #if defined(PR_SET_IO_FLUSHER)
629 	if (prctl(PR_SET_IO_FLUSHER, 0, 0, 0, 0) != 0)
630 		ublk_err("ublk dev %d queue %d set_io_flusher failed",
631 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id);
632 #endif
633 
634 	ublksrv_queue_adjust_uring_io_wq_workers(q);
635 
636 	q->private_data = queue_data;
637 
638 	if (ctrl_dev->tgt_ops->init_queue) {
639 		if (ctrl_dev->tgt_ops->init_queue(local_to_tq(q),
640 					&q->private_data))
641 			goto fail;
642 	}
643 
644 	if (ctrl_dev->queues_cpuset)
645 		ublksrv_set_sched_affinity(dev, q_id);
646 
647 	setpriority(PRIO_PROCESS, getpid(), -20);
648 
649 	ret = ublksrv_setup_eventfd(q);
650 	if (ret < 0) {
651 		ublk_err("ublk dev %d queue %d setup eventfd failed: %s",
652 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id,
653 			strerror(-ret));
654 		goto fail;
655 	}
656 
657 	/* submit all io commands to ublk driver */
658 	ublksrv_submit_fetch_commands(q);
659 
660 	return (struct ublksrv_queue *)q;
661  fail:
662 	ublksrv_queue_deinit(local_to_tq(q));
663 	ublk_err("ublk dev %d queue %d failed",
664 			ctrl_dev->dev_info.dev_id, q_id);
665 	return NULL;
666 }
667 
ublksrv_create_pid_file(struct _ublksrv_dev * dev)668 static int ublksrv_create_pid_file(struct _ublksrv_dev *dev)
669 {
670 	int dev_id = dev->ctrl_dev->dev_info.dev_id;
671 	char pid_file[64];
672 	int ret, pid_fd;
673 
674 	if (!dev->ctrl_dev->run_dir)
675 		return 0;
676 
677 	/* create pid file and lock it, so that others can't */
678 	snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id);
679 
680 	ret = create_pid_file(pid_file, &pid_fd);
681 	if (ret < 0) {
682 		/* -1 means the file is locked, and we need to remove it */
683 		if (ret == -1) {
684 			close(pid_fd);
685 			unlink(pid_file);
686 		}
687 		return ret;
688 	}
689 	dev->pid_file_fd = pid_fd;
690 	return 0;
691 }
692 
ublksrv_remove_pid_file(const struct _ublksrv_dev * dev)693 static void ublksrv_remove_pid_file(const struct _ublksrv_dev *dev)
694 {
695 	int dev_id = dev->ctrl_dev->dev_info.dev_id;
696 	char pid_file[64];
697 
698 	if (!dev->ctrl_dev->run_dir)
699 		return;
700 
701 	close(dev->pid_file_fd);
702 	snprintf(pid_file, 64, "%s/%d.pid", dev->ctrl_dev->run_dir, dev_id);
703 	unlink(pid_file);
704 }
705 
ublksrv_dev_deinit(const struct ublksrv_dev * tdev)706 void ublksrv_dev_deinit(const struct ublksrv_dev *tdev)
707 {
708 	struct _ublksrv_dev *dev = tdev_to_local(tdev);
709 
710 	ublksrv_remove_pid_file(dev);
711 
712 	ublksrv_tgt_deinit(dev);
713 	free(dev->thread);
714 
715 	if (dev->cdev_fd >= 0) {
716 		close(dev->cdev_fd);
717 		dev->cdev_fd = -1;
718 	}
719 	free(dev);
720 }
721 
ublksrv_dev_init(const struct ublksrv_ctrl_dev * ctrl_dev)722 const struct ublksrv_dev *ublksrv_dev_init(const struct ublksrv_ctrl_dev *ctrl_dev)
723 {
724 	int dev_id = ctrl_dev->dev_info.dev_id;
725 	char buf[64];
726 	int ret = -1;
727 	struct _ublksrv_dev *dev = (struct _ublksrv_dev *)calloc(1, sizeof(*dev));
728 	struct ublksrv_tgt_info *tgt;
729 
730 	if (!dev)
731 		return local_to_tdev(dev);
732 
733 	tgt = &dev->tgt;
734 	dev->ctrl_dev = ctrl_dev;
735 	dev->cdev_fd = -1;
736 
737 	snprintf(buf, 64, "%s%d", UBLKC_DEV, dev_id);
738 	dev->cdev_fd = open(buf, O_RDWR | O_NONBLOCK);
739 	if (dev->cdev_fd < 0) {
740 		ublk_err("can't open %s, ret %d\n", buf, dev->cdev_fd);
741 		goto fail;
742 	}
743 
744 	tgt->fds[0] = dev->cdev_fd;
745 
746 	ret = ublksrv_tgt_init(dev, ctrl_dev->tgt_type, ctrl_dev->tgt_ops,
747 			ctrl_dev->tgt_argc, ctrl_dev->tgt_argv);
748 	if (ret) {
749 		ublk_err( "can't init tgt %d/%s/%d, ret %d\n",
750 				dev_id, ctrl_dev->tgt_type, ctrl_dev->tgt_argc,
751 				ret);
752 		goto fail;
753 	}
754 
755 	ret = ublksrv_create_pid_file(dev);
756 	if (ret) {
757 		ublk_err( "can't create pid file for dev %d, ret %d\n",
758 				dev_id, ret);
759 		goto fail;
760 	}
761 
762 	return local_to_tdev(dev);
763 fail:
764 	ublksrv_dev_deinit(local_to_tdev(dev));
765 	return NULL;
766 }
767 
768 /* Be careful, target io may not have one ublk_io associated with  */
ublksrv_handle_tgt_cqe(struct _ublksrv_queue * q,struct io_uring_cqe * cqe)769 static inline void ublksrv_handle_tgt_cqe(struct _ublksrv_queue *q,
770 		struct io_uring_cqe *cqe)
771 {
772 	unsigned tag = user_data_to_tag(cqe->user_data);
773 
774 	if (cqe->res < 0 && cqe->res != -EAGAIN) {
775 		ublk_err("%s: failed tgt io: res %d qid %u tag %u, cmd_op %u\n",
776 			__func__, cqe->res, q->q_id,
777 			user_data_to_tag(cqe->user_data),
778 			user_data_to_op(cqe->user_data));
779 	}
780 
781 	if (is_eventfd_io(cqe->user_data)) {
782 		if (q->tgt_ops->handle_event)
783 			q->tgt_ops->handle_event(local_to_tq(q));
784 	} else {
785 		if (q->tgt_ops->tgt_io_done)
786 			q->tgt_ops->tgt_io_done(local_to_tq(q),
787 					&q->ios[tag].data, cqe);
788 	}
789 }
790 
ublksrv_handle_cqe(struct io_uring * r,struct io_uring_cqe * cqe,void * data)791 static void ublksrv_handle_cqe(struct io_uring *r,
792 		struct io_uring_cqe *cqe, void *data)
793 {
794 	struct _ublksrv_queue *q = container_of(r, struct _ublksrv_queue, ring);
795 	unsigned tag = user_data_to_tag(cqe->user_data);
796 	unsigned cmd_op = user_data_to_op(cqe->user_data);
797 	int fetch = (cqe->res != UBLK_IO_RES_ABORT) &&
798 		!(q->state & UBLKSRV_QUEUE_STOPPING);
799 	struct ublk_io *io;
800 
801 	ublk_dbg(UBLK_DBG_IO_CMD, "%s: res %d (qid %d tag %u cmd_op %u target %d event %d) stopping %d\n",
802 			__func__, cqe->res, q->q_id, tag, cmd_op,
803 			is_target_io(cqe->user_data),
804 			is_eventfd_io(cqe->user_data),
805 			(q->state & UBLKSRV_QUEUE_STOPPING));
806 
807 	/* Don't retrieve io in case of target io */
808 	if (is_target_io(cqe->user_data)) {
809 		ublksrv_handle_tgt_cqe(q, cqe);
810 		return;
811 	}
812 
813 	io = &q->ios[tag];
814 	q->cmd_inflight--;
815 
816 	if (!fetch) {
817 		q->state |= UBLKSRV_QUEUE_STOPPING;
818 		io->flags &= ~UBLKSRV_NEED_FETCH_RQ;
819 	}
820 
821 	/*
822 	 * So far, only sync tgt's io handling is implemented.
823 	 *
824 	 * todo: support async tgt io handling via io_uring, and the ublksrv
825 	 * daemon can poll on both two rings.
826 	 */
827 	if (cqe->res == UBLK_IO_RES_OK) {
828 		//ublk_assert(tag < q->q_depth);
829 		q->tgt_ops->handle_io_async(local_to_tq(q), &io->data);
830 	} else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
831 		io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE;
832 		ublksrv_queue_io_cmd(q, io, tag);
833 	} else {
834 		/*
835 		 * COMMIT_REQ will be completed immediately since no fetching
836 		 * piggyback is required.
837 		 *
838 		 * Marking IO_FREE only, then this io won't be issued since
839 		 * we only issue io with (UBLKSRV_IO_FREE | UBLKSRV_NEED_*)
840 		 *
841 		 * */
842 		io->flags = UBLKSRV_IO_FREE;
843 	}
844 }
845 
ublksrv_reap_events_uring(struct io_uring * r)846 static int ublksrv_reap_events_uring(struct io_uring *r)
847 {
848 	struct io_uring_cqe *cqe;
849 	unsigned head;
850 	int count = 0;
851 
852 	io_uring_for_each_cqe(r, head, cqe) {
853 		ublksrv_handle_cqe(r, cqe, NULL);
854 		count += 1;
855 	}
856 	io_uring_cq_advance(r, count);
857 
858 	return count;
859 }
860 
ublksrv_queue_discard_io_pages(struct _ublksrv_queue * q)861 static void ublksrv_queue_discard_io_pages(struct _ublksrv_queue *q)
862 {
863 	const struct ublksrv_ctrl_dev *cdev = q->dev->ctrl_dev;
864 	unsigned int io_buf_size = cdev->dev_info.max_io_buf_bytes;
865 	int i = 0;
866 
867 	for (i = 0; i < q->q_depth; i++)
868 		madvise(q->ios[i].buf_addr, io_buf_size, MADV_DONTNEED);
869 }
870 
ublksrv_queue_idle_enter(struct _ublksrv_queue * q)871 static void ublksrv_queue_idle_enter(struct _ublksrv_queue *q)
872 {
873 	if (q->state & UBLKSRV_QUEUE_IDLE)
874 		return;
875 
876 	ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: enter idle %x\n",
877 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state);
878 	ublksrv_queue_discard_io_pages(q);
879 	q->state |= UBLKSRV_QUEUE_IDLE;
880 
881 	if (q->tgt_ops->idle_fn)
882 		q->tgt_ops->idle_fn(local_to_tq(q), true);
883 }
884 
ublksrv_queue_idle_exit(struct _ublksrv_queue * q)885 static inline void ublksrv_queue_idle_exit(struct _ublksrv_queue *q)
886 {
887 	if (q->state & UBLKSRV_QUEUE_IDLE) {
888 		ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: exit idle %x\n",
889 			q->dev->ctrl_dev->dev_info.dev_id, q->q_id, q->state);
890 		q->state &= ~UBLKSRV_QUEUE_IDLE;
891 		if (q->tgt_ops->idle_fn)
892 			q->tgt_ops->idle_fn(local_to_tq(q), false);
893 	}
894 }
895 
ublksrv_reset_aio_batch(struct _ublksrv_queue * q)896 static void ublksrv_reset_aio_batch(struct _ublksrv_queue *q)
897 {
898 	q->nr_ctxs = 0;
899 }
900 
ublksrv_submit_aio_batch(struct _ublksrv_queue * q)901 static void ublksrv_submit_aio_batch(struct _ublksrv_queue *q)
902 {
903 	int i;
904 
905 	for (i = 0; i < q->nr_ctxs; i++) {
906 		struct ublksrv_aio_ctx *ctx = q->ctxs[i];
907 		uint64_t data = 1;
908 		int ret;
909 
910 		ret = write(ctx->efd, &data, sizeof(uint64_t));
911 		if (ret != sizeof(uint64_t))
912 			ublk_err("%s:%d write fail ctx[%d]: %d/%zu\n",
913 					__func__, __LINE__, i, ret, sizeof(uint64_t));
914 	}
915 }
916 
ublksrv_process_io(const struct ublksrv_queue * tq)917 int ublksrv_process_io(const struct ublksrv_queue *tq)
918 {
919 	struct _ublksrv_queue *q = tq_to_local(tq);
920 	int ret, reapped;
921 	struct __kernel_timespec ts = {
922 		.tv_sec = UBLKSRV_IO_IDLE_SECS,
923 		.tv_nsec = 0
924         };
925 	struct __kernel_timespec *tsp = (q->state & UBLKSRV_QUEUE_IDLE) ?
926 		NULL : &ts;
927 	struct io_uring_cqe *cqe;
928 
929 	ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight %u/%u stopping %d\n",
930 				q->dev->ctrl_dev->dev_info.dev_id,
931 				q->q_id, io_uring_sq_ready(&q->ring),
932 				q->cmd_inflight, q->tgt_io_inflight,
933 				(q->state & UBLKSRV_QUEUE_STOPPING));
934 
935 	if (ublksrv_queue_is_done(q))
936 		return -ENODEV;
937 
938 	ret = io_uring_submit_and_wait_timeout(&q->ring, &cqe, 1, tsp, NULL);
939 
940 	ublksrv_reset_aio_batch(q);
941 	reapped = ublksrv_reap_events_uring(&q->ring);
942 	ublksrv_submit_aio_batch(q);
943 
944 	if (q->tgt_ops->handle_io_background)
945 		q->tgt_ops->handle_io_background(local_to_tq(q),
946 				io_uring_sq_ready(&q->ring));
947 
948 	ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle %d",
949 			ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING),
950 			(q->state & UBLKSRV_QUEUE_IDLE));
951 
952 	if ((q->state & UBLKSRV_QUEUE_STOPPING))
953 		ublksrv_kill_eventfd(q);
954 	else {
955 		if (ret == -ETIME && reapped == 0 &&
956 				!io_uring_sq_ready(&q->ring))
957 			ublksrv_queue_idle_enter(q);
958 		else
959 			ublksrv_queue_idle_exit(q);
960 	}
961 
962 	return reapped;
963 }
964 
ublksrv_get_queue(const struct ublksrv_dev * dev,int q_id)965 const struct ublksrv_queue *ublksrv_get_queue(const struct ublksrv_dev *dev,
966 		int q_id)
967 {
968 	return (const struct ublksrv_queue *)tdev_to_local(dev)->__queues[q_id];
969 }
970 
971 /* called in ublksrv process context */
ublksrv_apply_oom_protection()972 void ublksrv_apply_oom_protection()
973 {
974 	char oom_score_adj_path[64];
975 	pid_t pid = getpid();
976 	int fd;
977 
978 	snprintf(oom_score_adj_path, 64, "/proc/%d/oom_score_adj", pid);
979 
980 	fd = open(oom_score_adj_path, O_RDWR);
981 	if (fd > 0) {
982 		char val[32];
983 		int len, ret;
984 
985 		len = snprintf(val, 32, "%d", -1000);
986 		ret = write(fd, val, len);
987 		if (ret != len)
988 			ublk_err("%s:%d write fail %d/%d\n",
989 					__func__, __LINE__, ret, len);
990 		close(fd);
991 	}
992 }
993 
ublksrv_get_ctrl_dev(const struct ublksrv_dev * dev)994 const struct ublksrv_ctrl_dev *ublksrv_get_ctrl_dev(
995 		const struct ublksrv_dev *dev)
996 {
997 	return tdev_to_local(dev)->ctrl_dev;
998 }
999 
ublksrv_get_pidfile_fd(const struct ublksrv_dev * dev)1000 int ublksrv_get_pidfile_fd(const struct ublksrv_dev *dev)
1001 {
1002 	return tdev_to_local(dev)->pid_file_fd;
1003 }
1004 
ublksrv_io_private_data(const struct ublksrv_queue * tq,int tag)1005 void *ublksrv_io_private_data(const struct ublksrv_queue *tq, int tag)
1006 {
1007 	struct _ublksrv_queue *q = tq_to_local(tq);
1008 
1009 	return q->ios[tag].data.private_data;
1010 }
1011 
ublksrv_queue_state(const struct ublksrv_queue * q)1012 unsigned int ublksrv_queue_state(const struct ublksrv_queue *q)
1013 {
1014 	return tq_to_local(q)->state;
1015 }
1016 
1017 const struct ublk_io_data *
ublksrv_queue_get_io_data(const struct ublksrv_queue * tq,int tag)1018 ublksrv_queue_get_io_data(const struct ublksrv_queue *tq, int tag)
1019 {
1020 	struct _ublksrv_queue *q = tq_to_local(tq);
1021 
1022 	return &q->ios[tag].data;
1023 }
1024 
ublksrv_queue_get_io_buf(const struct ublksrv_queue * tq,int tag)1025 void *ublksrv_queue_get_io_buf(const struct ublksrv_queue *tq, int tag)
1026 {
1027 	struct _ublksrv_queue *q = tq_to_local(tq);
1028 
1029 	if (tag < q->q_depth)
1030 		return q->ios[tag].buf_addr;
1031 	return NULL;
1032 }
1033 
1034 /*
1035  * The default io_uring cq depth equals to queue depth plus
1036  * .tgt_ring_depth, which is usually enough for typical ublk targets,
1037  * such as loop and qcow2, but it may not be enough for nbd with send_zc
1038  * which needs extra cqe for buffer notification.
1039  *
1040  * So add API to allow target to override default io_uring cq depth.
1041  */
ublksrv_dev_set_cq_depth(struct ublksrv_dev * tdev,int cq_depth)1042 void ublksrv_dev_set_cq_depth(struct ublksrv_dev *tdev, int cq_depth)
1043 {
1044 	tdev_to_local(tdev)->cq_depth = cq_depth;
1045 }
1046 
ublksrv_dev_get_cq_depth(struct ublksrv_dev * tdev)1047 int ublksrv_dev_get_cq_depth(struct ublksrv_dev *tdev)
1048 {
1049 	return tdev_to_local(tdev)->cq_depth;
1050 }
1051