• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // SPDX-License-Identifier: GPL-2.0+
2 /*
3  * Copyright (C) 2020 Google, Inc
4  * Copyright (C) 2020 Palmer Dabbelt <palmerdabbelt@google.com>
5  */
6 
7 #include <linux/device-mapper.h>
8 #include <uapi/linux/dm-user.h>
9 
10 #include <linux/bio.h>
11 #include <linux/init.h>
12 #include <linux/mempool.h>
13 #include <linux/miscdevice.h>
14 #include <linux/module.h>
15 #include <linux/poll.h>
16 #include <linux/uio.h>
17 #include <linux/wait.h>
18 #include <linux/workqueue.h>
19 
20 #define DM_MSG_PREFIX "user"
21 
22 #define MAX_OUTSTANDING_MESSAGES 128
23 
24 static unsigned int daemon_timeout_msec = 4000;
25 module_param_named(dm_user_daemon_timeout_msec, daemon_timeout_msec, uint,
26 		   0644);
27 MODULE_PARM_DESC(dm_user_daemon_timeout_msec,
28 		 "IO Timeout in msec if daemon does not process");
29 
30 /*
31  * dm-user uses four structures:
32  *
33  *  - "struct target", the outermost structure, corresponds to a single device
34  *    mapper target.  This contains the set of outstanding BIOs that have been
35  *    provided by DM and are not actively being processed by the user, along
36  *    with a misc device that userspace can open to communicate with the
37  *    kernel.  Each time userspaces opens the misc device a new channel is
38  *    created.
39  *  - "struct channel", which represents a single active communication channel
40  *    with userspace.  Userspace may choose arbitrary read/write sizes to use
41  *    when processing messages, channels form these into logical accesses.
42  *    When userspace responds to a full message the channel completes the BIO
43  *    and obtains a new message to process from the target.
44  *  - "struct message", which wraps a BIO with the additional information
45  *    required by the kernel to sort out what to do with BIOs when they return
46  *    from userspace.
47  *  - "struct dm_user_message", which is the exact message format that
48  *    userspace sees.
49  *
50  * The hot path contains three distinct operations:
51  *
52  *  - user_map(), which is provided a BIO from device mapper that is queued
53  *    into the target.  This allocates and enqueues a new message.
54  *  - dev_read(), which dequeues a message, copies it to userspace.
55  *  - dev_write(), which looks up a message (keyed by sequence number) and
56  *    completes the corresponding BIO.
57  *
58  * Lock ordering (outer to inner)
59  *
60  * 1) miscdevice's global lock.  This is held around dev_open, so it has to be
61  *    the outermost lock.
62  * 2) target->lock
63  * 3) channel->lock
64  */
65 
66 struct message {
67 	/*
68 	 * Messages themselves do not need a lock, they're protected by either
69 	 * the target or channel's lock, depending on which can reference them
70 	 * directly.
71 	 */
72 	struct dm_user_message msg;
73 	struct bio *bio;
74 	size_t posn_to_user;
75 	size_t total_to_user;
76 	size_t posn_from_user;
77 	size_t total_from_user;
78 
79 	struct list_head from_user;
80 	struct list_head to_user;
81 
82 	/*
83 	 * These are written back from the user.  They live in the same spot in
84 	 * the message, but we need to either keep the old values around or
85 	 * call a bunch more BIO helpers.  These are only valid after write has
86 	 * adopted the message.
87 	 */
88 	u64 return_type;
89 	u64 return_flags;
90 
91 	struct delayed_work work;
92 	bool delayed;
93 	struct target *t;
94 };
95 
96 struct target {
97 	/*
98 	 * A target has a single lock, which protects everything in the target
99 	 * (but does not protect the channels associated with a target).
100 	 */
101 	struct mutex lock;
102 
103 	/*
104 	 * There is only one point at which anything blocks: userspace blocks
105 	 * reading a new message, which is woken up by device mapper providing
106 	 * a new BIO to process (or tearing down the target).  The
107 	 * corresponding write side doesn't block, instead we treat userspace's
108 	 * response containing a message that has yet to be mapped as an
109 	 * invalid operation.
110 	 */
111 	struct wait_queue_head wq;
112 
113 	/*
114 	 * Messages are delivered to userspace in order, but may be returned
115 	 * out of order.  This allows userspace to schedule IO if it wants to.
116 	 */
117 	mempool_t message_pool;
118 	u64 next_seq_to_map;
119 	u64 next_seq_to_user;
120 	struct list_head to_user;
121 
122 	/*
123 	 * There is a misc device per target.  The name is selected by
124 	 * userspace (via a DM create ioctl argument), and each ends up in
125 	 * /dev/dm-user/.  It looks like a better way to do this may be to have
126 	 * a filesystem to manage these, but this was more expedient.  The
127 	 * current mechanism is functional, but does result in an arbitrary
128 	 * number of dynamically created misc devices.
129 	 */
130 	struct miscdevice miscdev;
131 
132 	/*
133 	 * Device mapper's target destructor triggers tearing this all down,
134 	 * but we can't actually free until every channel associated with this
135 	 * target has been destroyed.  Channels each have a reference to their
136 	 * target, and there is an additional single reference that corresponds
137 	 * to both DM and the misc device (both of which are destroyed by DM).
138 	 *
139 	 * In the common case userspace will be asleep waiting for a new
140 	 * message when device mapper decides to destroy the target, which
141 	 * means no new messages will appear.  The destroyed flag triggers a
142 	 * wakeup, which will end up removing the reference.
143 	 */
144 	struct kref references;
145 	int dm_destroyed;
146 	bool daemon_terminated;
147 };
148 
149 struct channel {
150 	struct target *target;
151 
152 	/*
153 	 * A channel has a single lock, which prevents multiple reads (or
154 	 * multiple writes) from conflicting with each other.
155 	 */
156 	struct mutex lock;
157 
158 	struct message *cur_to_user;
159 	struct message *cur_from_user;
160 	ssize_t to_user_error;
161 	ssize_t from_user_error;
162 
163 	/*
164 	 * Once a message has been forwarded to userspace on a channel it must
165 	 * be responded to on the same channel.  This allows us to error out
166 	 * the messages that have not yet been responded to by a channel when
167 	 * that channel closes, which makes handling errors more reasonable for
168 	 * fault-tolerant userspace daemons.  It also happens to make avoiding
169 	 * shared locks between user_map() and dev_read() a lot easier.
170 	 *
171 	 * This does preclude a multi-threaded work stealing userspace
172 	 * implementation (or at least, force a degree of head-of-line blocking
173 	 * on the response path).
174 	 */
175 	struct list_head from_user;
176 
177 	/*
178 	 * Responses from userspace can arrive in arbitrarily small chunks.
179 	 * We need some place to buffer one up until we can find the
180 	 * corresponding kernel-side message to continue processing, so instead
181 	 * of allocating them we just keep one off to the side here.  This can
182 	 * only ever be pointer to by from_user_cur, and will never have a BIO.
183 	 */
184 	struct message scratch_message_from_user;
185 };
186 
message_kill(struct message * m,mempool_t * pool)187 static void message_kill(struct message *m, mempool_t *pool)
188 {
189 	m->bio->bi_status = BLK_STS_IOERR;
190 	bio_endio(m->bio);
191 	mempool_free(m, pool);
192 }
193 
is_user_space_thread_present(struct target * t)194 static inline bool is_user_space_thread_present(struct target *t)
195 {
196 	lockdep_assert_held(&t->lock);
197 	return (kref_read(&t->references) > 1);
198 }
199 
process_delayed_work(struct work_struct * work)200 static void process_delayed_work(struct work_struct *work)
201 {
202 	struct delayed_work *del_work = to_delayed_work(work);
203 	struct message *msg = container_of(del_work, struct message, work);
204 
205 	struct target *t = msg->t;
206 
207 	mutex_lock(&t->lock);
208 
209 	/*
210 	 * There is a atleast one thread to process the IO.
211 	 */
212 	if (is_user_space_thread_present(t)) {
213 		mutex_unlock(&t->lock);
214 		return;
215 	}
216 
217 	/*
218 	 * Terminate the IO with an error
219 	 */
220 	list_del(&msg->to_user);
221 	pr_err("I/O error: sector %llu: no user-space daemon for %s target\n",
222 	       msg->bio->bi_iter.bi_sector,
223 	       t->miscdev.name);
224 	message_kill(msg, &t->message_pool);
225 	mutex_unlock(&t->lock);
226 }
227 
enqueue_delayed_work(struct message * m,bool is_delay)228 static void enqueue_delayed_work(struct message *m, bool is_delay)
229 {
230 	unsigned long delay = 0;
231 
232 	m->delayed = true;
233 	INIT_DELAYED_WORK(&m->work, process_delayed_work);
234 
235 	/*
236 	 * Snapuserd daemon is the user-space process
237 	 * which processes IO request from dm-user
238 	 * when OTA is applied. Per the current design,
239 	 * when a dm-user target is created, daemon
240 	 * attaches to target and starts processing
241 	 * the IO's. Daemon is terminated only when
242 	 * dm-user target is destroyed.
243 	 *
244 	 * If for some reason, daemon crashes or terminates early,
245 	 * without destroying the dm-user target; then
246 	 * there is no mechanism to restart the daemon
247 	 * and start processing the IO's from the same target.
248 	 * Theoretically, it is possible but that infrastructure
249 	 * doesn't exist in the android ecosystem.
250 	 *
251 	 * Thus, when the daemon terminates, there is no way the IO's
252 	 * issued on that target will be processed. Hence,
253 	 * we set the delay to 0 and fail the IO's immediately.
254 	 *
255 	 * On the other hand, when a new dm-user target is created,
256 	 * we wait for the daemon to get attached for the first time.
257 	 * This primarily happens when init first stage spins up
258 	 * the daemon. At this point, since the snapshot device is mounted
259 	 * of a root filesystem, dm-user target may receive IO request
260 	 * even though daemon is not fully launched. We don't want
261 	 * to fail those IO requests immediately. Thus, we queue these
262 	 * requests with a timeout so that daemon is ready to process
263 	 * those IO requests. Again, if the daemon fails to launch within
264 	 * the timeout period, then IO's will be failed.
265 	 */
266 	if (is_delay)
267 		delay = msecs_to_jiffies(daemon_timeout_msec);
268 
269 	queue_delayed_work(system_wq, &m->work, delay);
270 }
271 
target_from_target(struct dm_target * target)272 static inline struct target *target_from_target(struct dm_target *target)
273 {
274 	WARN_ON(target->private == NULL);
275 	return target->private;
276 }
277 
target_from_miscdev(struct miscdevice * miscdev)278 static inline struct target *target_from_miscdev(struct miscdevice *miscdev)
279 {
280 	return container_of(miscdev, struct target, miscdev);
281 }
282 
channel_from_file(struct file * file)283 static inline struct channel *channel_from_file(struct file *file)
284 {
285 	WARN_ON(file->private_data == NULL);
286 	return file->private_data;
287 }
288 
target_from_channel(struct channel * c)289 static inline struct target *target_from_channel(struct channel *c)
290 {
291 	WARN_ON(c->target == NULL);
292 	return c->target;
293 }
294 
bio_size(struct bio * bio)295 static inline size_t bio_size(struct bio *bio)
296 {
297 	struct bio_vec bvec;
298 	struct bvec_iter iter;
299 	size_t out = 0;
300 
301 	bio_for_each_segment (bvec, bio, iter)
302 		out += bio_iter_len(bio, iter);
303 	return out;
304 }
305 
bio_bytes_needed_to_user(struct bio * bio)306 static inline size_t bio_bytes_needed_to_user(struct bio *bio)
307 {
308 	switch (bio_op(bio)) {
309 	case REQ_OP_WRITE:
310 		return sizeof(struct dm_user_message) + bio_size(bio);
311 	case REQ_OP_READ:
312 	case REQ_OP_FLUSH:
313 	case REQ_OP_DISCARD:
314 	case REQ_OP_SECURE_ERASE:
315 	case REQ_OP_WRITE_SAME:
316 	case REQ_OP_WRITE_ZEROES:
317 		return sizeof(struct dm_user_message);
318 
319 	/*
320 	 * These ops are not passed to userspace under the assumption that
321 	 * they're not going to be particularly useful in that context.
322 	 */
323 	default:
324 		return -EOPNOTSUPP;
325 	}
326 }
327 
bio_bytes_needed_from_user(struct bio * bio)328 static inline size_t bio_bytes_needed_from_user(struct bio *bio)
329 {
330 	switch (bio_op(bio)) {
331 	case REQ_OP_READ:
332 		return sizeof(struct dm_user_message) + bio_size(bio);
333 	case REQ_OP_WRITE:
334 	case REQ_OP_FLUSH:
335 	case REQ_OP_DISCARD:
336 	case REQ_OP_SECURE_ERASE:
337 	case REQ_OP_WRITE_SAME:
338 	case REQ_OP_WRITE_ZEROES:
339 		return sizeof(struct dm_user_message);
340 
341 	/*
342 	 * These ops are not passed to userspace under the assumption that
343 	 * they're not going to be particularly useful in that context.
344 	 */
345 	default:
346 		return -EOPNOTSUPP;
347 	}
348 }
349 
bio_type_to_user_type(struct bio * bio)350 static inline long bio_type_to_user_type(struct bio *bio)
351 {
352 	switch (bio_op(bio)) {
353 	case REQ_OP_READ:
354 		return DM_USER_REQ_MAP_READ;
355 	case REQ_OP_WRITE:
356 		return DM_USER_REQ_MAP_WRITE;
357 	case REQ_OP_FLUSH:
358 		return DM_USER_REQ_MAP_FLUSH;
359 	case REQ_OP_DISCARD:
360 		return DM_USER_REQ_MAP_DISCARD;
361 	case REQ_OP_SECURE_ERASE:
362 		return DM_USER_REQ_MAP_SECURE_ERASE;
363 	case REQ_OP_WRITE_SAME:
364 		return DM_USER_REQ_MAP_WRITE_SAME;
365 	case REQ_OP_WRITE_ZEROES:
366 		return DM_USER_REQ_MAP_WRITE_ZEROES;
367 
368 	/*
369 	 * These ops are not passed to userspace under the assumption that
370 	 * they're not going to be particularly useful in that context.
371 	 */
372 	default:
373 		return -EOPNOTSUPP;
374 	}
375 }
376 
bio_flags_to_user_flags(struct bio * bio)377 static inline long bio_flags_to_user_flags(struct bio *bio)
378 {
379 	u64 out = 0;
380 	typeof(bio->bi_opf) opf = bio->bi_opf & ~REQ_OP_MASK;
381 
382 	if (opf & REQ_FAILFAST_DEV) {
383 		opf &= ~REQ_FAILFAST_DEV;
384 		out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DEV;
385 	}
386 
387 	if (opf & REQ_FAILFAST_TRANSPORT) {
388 		opf &= ~REQ_FAILFAST_TRANSPORT;
389 		out |= DM_USER_REQ_MAP_FLAG_FAILFAST_TRANSPORT;
390 	}
391 
392 	if (opf & REQ_FAILFAST_DRIVER) {
393 		opf &= ~REQ_FAILFAST_DRIVER;
394 		out |= DM_USER_REQ_MAP_FLAG_FAILFAST_DRIVER;
395 	}
396 
397 	if (opf & REQ_SYNC) {
398 		opf &= ~REQ_SYNC;
399 		out |= DM_USER_REQ_MAP_FLAG_SYNC;
400 	}
401 
402 	if (opf & REQ_META) {
403 		opf &= ~REQ_META;
404 		out |= DM_USER_REQ_MAP_FLAG_META;
405 	}
406 
407 	if (opf & REQ_PRIO) {
408 		opf &= ~REQ_PRIO;
409 		out |= DM_USER_REQ_MAP_FLAG_PRIO;
410 	}
411 
412 	if (opf & REQ_NOMERGE) {
413 		opf &= ~REQ_NOMERGE;
414 		out |= DM_USER_REQ_MAP_FLAG_NOMERGE;
415 	}
416 
417 	if (opf & REQ_IDLE) {
418 		opf &= ~REQ_IDLE;
419 		out |= DM_USER_REQ_MAP_FLAG_IDLE;
420 	}
421 
422 	if (opf & REQ_INTEGRITY) {
423 		opf &= ~REQ_INTEGRITY;
424 		out |= DM_USER_REQ_MAP_FLAG_INTEGRITY;
425 	}
426 
427 	if (opf & REQ_FUA) {
428 		opf &= ~REQ_FUA;
429 		out |= DM_USER_REQ_MAP_FLAG_FUA;
430 	}
431 
432 	if (opf & REQ_PREFLUSH) {
433 		opf &= ~REQ_PREFLUSH;
434 		out |= DM_USER_REQ_MAP_FLAG_PREFLUSH;
435 	}
436 
437 	if (opf & REQ_RAHEAD) {
438 		opf &= ~REQ_RAHEAD;
439 		out |= DM_USER_REQ_MAP_FLAG_RAHEAD;
440 	}
441 
442 	if (opf & REQ_BACKGROUND) {
443 		opf &= ~REQ_BACKGROUND;
444 		out |= DM_USER_REQ_MAP_FLAG_BACKGROUND;
445 	}
446 
447 	if (opf & REQ_NOWAIT) {
448 		opf &= ~REQ_NOWAIT;
449 		out |= DM_USER_REQ_MAP_FLAG_NOWAIT;
450 	}
451 
452 	if (opf & REQ_NOUNMAP) {
453 		opf &= ~REQ_NOUNMAP;
454 		out |= DM_USER_REQ_MAP_FLAG_NOUNMAP;
455 	}
456 
457 	if (unlikely(opf)) {
458 		pr_warn("unsupported BIO type %x\n", opf);
459 		return -EOPNOTSUPP;
460 	}
461 	WARN_ON(out < 0);
462 	return out;
463 }
464 
465 /*
466  * Not quite what's in blk-map.c, but instead what I thought the functions in
467  * blk-map did.  This one seems more generally useful and I think we could
468  * write the blk-map version in terms of this one.  The differences are that
469  * this has a return value that counts, and blk-map uses the BIO _all iters.
470  * Neither  advance the BIO iter but don't advance the IOV iter, which is a bit
471  * odd here.
472  */
bio_copy_from_iter(struct bio * bio,struct iov_iter * iter)473 static ssize_t bio_copy_from_iter(struct bio *bio, struct iov_iter *iter)
474 {
475 	struct bio_vec bvec;
476 	struct bvec_iter biter;
477 	ssize_t out = 0;
478 
479 	bio_for_each_segment (bvec, bio, biter) {
480 		ssize_t ret;
481 
482 		ret = copy_page_from_iter(bvec.bv_page, bvec.bv_offset,
483 					  bvec.bv_len, iter);
484 
485 		/*
486 		 * FIXME: I thought that IOV copies had a mechanism for
487 		 * terminating early, if for example a signal came in while
488 		 * sleeping waiting for a page to be mapped, but I don't see
489 		 * where that would happen.
490 		 */
491 		WARN_ON(ret < 0);
492 		out += ret;
493 
494 		if (!iov_iter_count(iter))
495 			break;
496 
497 		if (ret < bvec.bv_len)
498 			return ret;
499 	}
500 
501 	return out;
502 }
503 
bio_copy_to_iter(struct bio * bio,struct iov_iter * iter)504 static ssize_t bio_copy_to_iter(struct bio *bio, struct iov_iter *iter)
505 {
506 	struct bio_vec bvec;
507 	struct bvec_iter biter;
508 	ssize_t out = 0;
509 
510 	bio_for_each_segment (bvec, bio, biter) {
511 		ssize_t ret;
512 
513 		ret = copy_page_to_iter(bvec.bv_page, bvec.bv_offset,
514 					bvec.bv_len, iter);
515 
516 		/* as above */
517 		WARN_ON(ret < 0);
518 		out += ret;
519 
520 		if (!iov_iter_count(iter))
521 			break;
522 
523 		if (ret < bvec.bv_len)
524 			return ret;
525 	}
526 
527 	return out;
528 }
529 
msg_copy_to_iov(struct message * msg,struct iov_iter * to)530 static ssize_t msg_copy_to_iov(struct message *msg, struct iov_iter *to)
531 {
532 	ssize_t copied = 0;
533 
534 	if (!iov_iter_count(to))
535 		return 0;
536 
537 	if (msg->posn_to_user < sizeof(msg->msg)) {
538 		copied = copy_to_iter((char *)(&msg->msg) + msg->posn_to_user,
539 				      sizeof(msg->msg) - msg->posn_to_user, to);
540 	} else {
541 		copied = bio_copy_to_iter(msg->bio, to);
542 		if (copied > 0)
543 			bio_advance(msg->bio, copied);
544 	}
545 
546 	if (copied < 0)
547 		return copied;
548 
549 	msg->posn_to_user += copied;
550 	return copied;
551 }
552 
msg_copy_from_iov(struct message * msg,struct iov_iter * from)553 static ssize_t msg_copy_from_iov(struct message *msg, struct iov_iter *from)
554 {
555 	ssize_t copied = 0;
556 
557 	if (!iov_iter_count(from))
558 		return 0;
559 
560 	if (msg->posn_from_user < sizeof(msg->msg)) {
561 		copied = copy_from_iter(
562 			(char *)(&msg->msg) + msg->posn_from_user,
563 			sizeof(msg->msg) - msg->posn_from_user, from);
564 	} else {
565 		copied = bio_copy_from_iter(msg->bio, from);
566 		if (copied > 0)
567 			bio_advance(msg->bio, copied);
568 	}
569 
570 	if (copied < 0)
571 		return copied;
572 
573 	msg->posn_from_user += copied;
574 	return copied;
575 }
576 
msg_get_map(struct target * t)577 static struct message *msg_get_map(struct target *t)
578 {
579 	struct message *m;
580 
581 	lockdep_assert_held(&t->lock);
582 
583 	m = mempool_alloc(&t->message_pool, GFP_NOIO);
584 	m->msg.seq = t->next_seq_to_map++;
585 	INIT_LIST_HEAD(&m->to_user);
586 	INIT_LIST_HEAD(&m->from_user);
587 	return m;
588 }
589 
msg_get_to_user(struct target * t)590 static struct message *msg_get_to_user(struct target *t)
591 {
592 	struct message *m;
593 
594 	lockdep_assert_held(&t->lock);
595 
596 	if (list_empty(&t->to_user))
597 		return NULL;
598 
599 	m = list_first_entry(&t->to_user, struct message, to_user);
600 
601 	list_del(&m->to_user);
602 
603 	/*
604 	 * If the IO was queued to workqueue since there
605 	 * was no daemon to service the IO, then we
606 	 * will have to cancel the delayed work as the
607 	 * IO will be processed by this user-space thread.
608 	 *
609 	 * If the delayed work was already picked up for
610 	 * processing, then wait for it to complete. Note
611 	 * that the IO will not be terminated by the work
612 	 * queue thread.
613 	 */
614 	if (unlikely(m->delayed)) {
615 		mutex_unlock(&t->lock);
616 		cancel_delayed_work_sync(&m->work);
617 		mutex_lock(&t->lock);
618 	}
619 	return m;
620 }
621 
msg_get_from_user(struct channel * c,u64 seq)622 static struct message *msg_get_from_user(struct channel *c, u64 seq)
623 {
624 	struct message *m;
625 	struct list_head *cur, *tmp;
626 
627 	lockdep_assert_held(&c->lock);
628 
629 	list_for_each_safe (cur, tmp, &c->from_user) {
630 		m = list_entry(cur, struct message, from_user);
631 		if (m->msg.seq == seq) {
632 			list_del(&m->from_user);
633 			return m;
634 		}
635 	}
636 
637 	return NULL;
638 }
639 
640 /*
641  * Returns 0 when there is no work left to do.  This must be callable without
642  * holding the target lock, as it is part of the waitqueue's check expression.
643  * When called without the lock it may spuriously indicate there is remaining
644  * work, but when called with the lock it must be accurate.
645  */
target_poll(struct target * t)646 int target_poll(struct target *t)
647 {
648 	return !list_empty(&t->to_user) || t->dm_destroyed;
649 }
650 
target_release(struct kref * ref)651 void target_release(struct kref *ref)
652 {
653 	struct target *t = container_of(ref, struct target, references);
654 	struct list_head *cur, *tmp;
655 
656 	/*
657 	 * There may be outstanding BIOs that have not yet been given to
658 	 * userspace.  At this point there's nothing we can do about them, as
659 	 * there are and will never be any channels.
660 	 */
661 	list_for_each_safe (cur, tmp, &t->to_user) {
662 		struct message *m = list_entry(cur, struct message, to_user);
663 
664 		if (unlikely(m->delayed)) {
665 			bool ret;
666 
667 			mutex_unlock(&t->lock);
668 			ret = cancel_delayed_work_sync(&m->work);
669 			mutex_lock(&t->lock);
670 			if (!ret)
671 				continue;
672 		}
673 		message_kill(m, &t->message_pool);
674 	}
675 
676 	mempool_exit(&t->message_pool);
677 	mutex_unlock(&t->lock);
678 	mutex_destroy(&t->lock);
679 	kfree(t);
680 }
681 
target_put(struct target * t)682 void target_put(struct target *t)
683 {
684 	/*
685 	 * This both releases a reference to the target and the lock.  We leave
686 	 * it up to the caller to hold the lock, as they probably needed it for
687 	 * something else.
688 	 */
689 	lockdep_assert_held(&t->lock);
690 
691 	if (!kref_put(&t->references, target_release)) {
692 		/*
693 		 * User-space thread is getting terminated.
694 		 * We need to scan the list for all those
695 		 * pending IO's which were not processed yet
696 		 * and put them back to work-queue for delayed
697 		 * processing.
698 		 */
699 		if (!is_user_space_thread_present(t)) {
700 			struct list_head *cur, *tmp;
701 
702 			list_for_each_safe(cur, tmp, &t->to_user) {
703 				struct message *m = list_entry(cur,
704 							       struct message,
705 							       to_user);
706 				if (!m->delayed)
707 					enqueue_delayed_work(m, false);
708 			}
709 			/*
710 			 * Daemon attached to this target is terminated.
711 			 */
712 			t->daemon_terminated = true;
713 		}
714 		mutex_unlock(&t->lock);
715 	}
716 }
717 
channel_alloc(struct target * t)718 static struct channel *channel_alloc(struct target *t)
719 {
720 	struct channel *c;
721 
722 	lockdep_assert_held(&t->lock);
723 
724 	c = kzalloc(sizeof(*c), GFP_KERNEL);
725 	if (c == NULL)
726 		return NULL;
727 
728 	kref_get(&t->references);
729 	c->target = t;
730 	c->cur_from_user = &c->scratch_message_from_user;
731 	mutex_init(&c->lock);
732 	INIT_LIST_HEAD(&c->from_user);
733 	return c;
734 }
735 
channel_free(struct channel * c)736 void channel_free(struct channel *c)
737 {
738 	struct list_head *cur, *tmp;
739 
740 	lockdep_assert_held(&c->lock);
741 
742 	/*
743 	 * There may be outstanding BIOs that have been given to userspace but
744 	 * have not yet been completed.  The channel has been shut down so
745 	 * there's no way to process the rest of those messages, so we just go
746 	 * ahead and error out the BIOs.  Hopefully whatever's on the other end
747 	 * can handle the errors.  One could imagine splitting the BIOs and
748 	 * completing as much as we got, but that seems like overkill here.
749 	 *
750 	 * Our only other options would be to let the BIO hang around (which
751 	 * seems way worse) or to resubmit it to userspace in the hope there's
752 	 * another channel.  I don't really like the idea of submitting a
753 	 * message twice.
754 	 */
755 	if (c->cur_to_user != NULL)
756 		message_kill(c->cur_to_user, &c->target->message_pool);
757 	if (c->cur_from_user != &c->scratch_message_from_user)
758 		message_kill(c->cur_from_user, &c->target->message_pool);
759 	list_for_each_safe (cur, tmp, &c->from_user)
760 		message_kill(list_entry(cur, struct message, from_user),
761 			     &c->target->message_pool);
762 
763 	mutex_lock(&c->target->lock);
764 	target_put(c->target);
765 	mutex_unlock(&c->lock);
766 	mutex_destroy(&c->lock);
767 	kfree(c);
768 }
769 
dev_open(struct inode * inode,struct file * file)770 static int dev_open(struct inode *inode, struct file *file)
771 {
772 	struct channel *c;
773 	struct target *t;
774 
775 	/*
776 	 * This is called by miscdev, which sets private_data to point to the
777 	 * struct miscdevice that was opened.  The rest of our file operations
778 	 * want to refer to the channel that's been opened, so we swap that
779 	 * pointer out with a fresh channel.
780 	 *
781 	 * This is called with the miscdev lock held, which is also held while
782 	 * registering/unregistering the miscdev.  The miscdev must be
783 	 * registered for this to get called, which means there must be an
784 	 * outstanding reference to the target, which means it cannot be freed
785 	 * out from under us despite us not holding a reference yet.
786 	 */
787 	t = container_of(file->private_data, struct target, miscdev);
788 	mutex_lock(&t->lock);
789 	file->private_data = c = channel_alloc(t);
790 
791 	if (c == NULL) {
792 		mutex_unlock(&t->lock);
793 		return -ENOMEM;
794 	}
795 
796 	mutex_unlock(&t->lock);
797 	return 0;
798 }
799 
dev_read(struct kiocb * iocb,struct iov_iter * to)800 static ssize_t dev_read(struct kiocb *iocb, struct iov_iter *to)
801 {
802 	struct channel *c = channel_from_file(iocb->ki_filp);
803 	ssize_t total_processed = 0;
804 	ssize_t processed;
805 
806 	mutex_lock(&c->lock);
807 
808 	if (unlikely(c->to_user_error)) {
809 		total_processed = c->to_user_error;
810 		goto cleanup_unlock;
811 	}
812 
813 	if (c->cur_to_user == NULL) {
814 		struct target *t = target_from_channel(c);
815 
816 		mutex_lock(&t->lock);
817 
818 		while (!target_poll(t)) {
819 			int e;
820 
821 			mutex_unlock(&t->lock);
822 			mutex_unlock(&c->lock);
823 			e = wait_event_interruptible(t->wq, target_poll(t));
824 			mutex_lock(&c->lock);
825 			mutex_lock(&t->lock);
826 
827 			if (unlikely(e != 0)) {
828 				/*
829 				 * We haven't processed any bytes in either the
830 				 * BIO or the IOV, so we can just terminate
831 				 * right now.  Elsewhere in the kernel handles
832 				 * restarting the syscall when appropriate.
833 				 */
834 				total_processed = e;
835 				mutex_unlock(&t->lock);
836 				goto cleanup_unlock;
837 			}
838 		}
839 
840 		if (unlikely(t->dm_destroyed)) {
841 			/*
842 			 * DM has destroyed this target, so just lock
843 			 * the user out.  There's really nothing else
844 			 * we can do here.  Note that we don't actually
845 			 * tear any thing down until userspace has
846 			 * closed the FD, as there may still be
847 			 * outstanding BIOs.
848 			 *
849 			 * This is kind of a wacky error code to
850 			 * return.  My goal was really just to try and
851 			 * find something that wasn't likely to be
852 			 * returned by anything else in the miscdev
853 			 * path.  The message "block device required"
854 			 * seems like a somewhat reasonable thing to
855 			 * say when the target has disappeared out from
856 			 * under us, but "not block" isn't sensible.
857 			 */
858 			c->to_user_error = total_processed = -ENOTBLK;
859 			mutex_unlock(&t->lock);
860 			goto cleanup_unlock;
861 		}
862 
863 		/*
864 		 * Ensures that accesses to the message data are not ordered
865 		 * before the remote accesses that produce that message data.
866 		 *
867 		 * This pairs with the barrier in user_map(), via the
868 		 * conditional within the while loop above. Also see the lack
869 		 * of barrier in user_dtr(), which is why this can be after the
870 		 * destroyed check.
871 		 */
872 		smp_rmb();
873 
874 		c->cur_to_user = msg_get_to_user(t);
875 		WARN_ON(c->cur_to_user == NULL);
876 		mutex_unlock(&t->lock);
877 	}
878 
879 	processed = msg_copy_to_iov(c->cur_to_user, to);
880 	total_processed += processed;
881 
882 	WARN_ON(c->cur_to_user->posn_to_user > c->cur_to_user->total_to_user);
883 	if (c->cur_to_user->posn_to_user == c->cur_to_user->total_to_user) {
884 		struct message *m = c->cur_to_user;
885 
886 		c->cur_to_user = NULL;
887 		list_add_tail(&m->from_user, &c->from_user);
888 	}
889 
890 cleanup_unlock:
891 	mutex_unlock(&c->lock);
892 	return total_processed;
893 }
894 
dev_write(struct kiocb * iocb,struct iov_iter * from)895 static ssize_t dev_write(struct kiocb *iocb, struct iov_iter *from)
896 {
897 	struct channel *c = channel_from_file(iocb->ki_filp);
898 	ssize_t total_processed = 0;
899 	ssize_t processed;
900 
901 	mutex_lock(&c->lock);
902 
903 	if (unlikely(c->from_user_error)) {
904 		total_processed = c->from_user_error;
905 		goto cleanup_unlock;
906 	}
907 
908 	/*
909 	 * cur_from_user can never be NULL.  If there's no real message it must
910 	 * point to the scratch space.
911 	 */
912 	WARN_ON(c->cur_from_user == NULL);
913 	if (c->cur_from_user->posn_from_user < sizeof(struct dm_user_message)) {
914 		struct message *msg, *old;
915 
916 		processed = msg_copy_from_iov(c->cur_from_user, from);
917 		if (processed <= 0) {
918 			pr_warn("msg_copy_from_iov() returned %zu\n",
919 				processed);
920 			c->from_user_error = -EINVAL;
921 			goto cleanup_unlock;
922 		}
923 		total_processed += processed;
924 
925 		/*
926 		 * In the unlikely event the user has provided us a very short
927 		 * write, not even big enough to fill a message, just succeed.
928 		 * We'll eventually build up enough bytes to do something.
929 		 */
930 		if (unlikely(c->cur_from_user->posn_from_user <
931 			     sizeof(struct dm_user_message)))
932 			goto cleanup_unlock;
933 
934 		old = c->cur_from_user;
935 		mutex_lock(&c->target->lock);
936 		msg = msg_get_from_user(c, c->cur_from_user->msg.seq);
937 		if (msg == NULL) {
938 			pr_info("user provided an invalid messag seq of %llx\n",
939 				old->msg.seq);
940 			mutex_unlock(&c->target->lock);
941 			c->from_user_error = -EINVAL;
942 			goto cleanup_unlock;
943 		}
944 		mutex_unlock(&c->target->lock);
945 
946 		WARN_ON(old->posn_from_user != sizeof(struct dm_user_message));
947 		msg->posn_from_user = sizeof(struct dm_user_message);
948 		msg->return_type = old->msg.type;
949 		msg->return_flags = old->msg.flags;
950 		WARN_ON(msg->posn_from_user > msg->total_from_user);
951 		c->cur_from_user = msg;
952 		WARN_ON(old != &c->scratch_message_from_user);
953 	}
954 
955 	/*
956 	 * Userspace can signal an error for single requests by overwriting the
957 	 * seq field.
958 	 */
959 	switch (c->cur_from_user->return_type) {
960 	case DM_USER_RESP_SUCCESS:
961 		c->cur_from_user->bio->bi_status = BLK_STS_OK;
962 		break;
963 	case DM_USER_RESP_ERROR:
964 	case DM_USER_RESP_UNSUPPORTED:
965 	default:
966 		c->cur_from_user->bio->bi_status = BLK_STS_IOERR;
967 		goto finish_bio;
968 	}
969 
970 	/*
971 	 * The op was a success as far as userspace is concerned, so process
972 	 * whatever data may come along with it.  The user may provide the BIO
973 	 * data in multiple chunks, in which case we don't need to finish the
974 	 * BIO.
975 	 */
976 	processed = msg_copy_from_iov(c->cur_from_user, from);
977 	total_processed += processed;
978 
979 	if (c->cur_from_user->posn_from_user <
980 	    c->cur_from_user->total_from_user)
981 		goto cleanup_unlock;
982 
983 finish_bio:
984 	/*
985 	 * When we set up this message the BIO's size matched the
986 	 * message size, if that's not still the case then something
987 	 * has gone off the rails.
988 	 */
989 	WARN_ON(bio_size(c->cur_from_user->bio) != 0);
990 	bio_endio(c->cur_from_user->bio);
991 
992 	/*
993 	 * We don't actually need to take the target lock here, as all
994 	 * we're doing is freeing the message and mempools have their
995 	 * own lock.  Each channel has its ows scratch message.
996 	 */
997 	WARN_ON(c->cur_from_user == &c->scratch_message_from_user);
998 	mempool_free(c->cur_from_user, &c->target->message_pool);
999 	c->scratch_message_from_user.posn_from_user = 0;
1000 	c->cur_from_user = &c->scratch_message_from_user;
1001 
1002 cleanup_unlock:
1003 	mutex_unlock(&c->lock);
1004 	return total_processed;
1005 }
1006 
dev_release(struct inode * inode,struct file * file)1007 static int dev_release(struct inode *inode, struct file *file)
1008 {
1009 	struct channel *c;
1010 
1011 	c = channel_from_file(file);
1012 	mutex_lock(&c->lock);
1013 	channel_free(c);
1014 
1015 	return 0;
1016 }
1017 
1018 static const struct file_operations file_operations = {
1019 	.owner = THIS_MODULE,
1020 	.open = dev_open,
1021 	.llseek = no_llseek,
1022 	.read_iter = dev_read,
1023 	.write_iter = dev_write,
1024 	.release = dev_release,
1025 };
1026 
user_ctr(struct dm_target * ti,unsigned int argc,char ** argv)1027 static int user_ctr(struct dm_target *ti, unsigned int argc, char **argv)
1028 {
1029 	struct target *t;
1030 	int r;
1031 
1032 	if (argc != 3) {
1033 		ti->error = "Invalid argument count";
1034 		r = -EINVAL;
1035 		goto cleanup_none;
1036 	}
1037 
1038 	t = kzalloc(sizeof(*t), GFP_KERNEL);
1039 	if (t == NULL) {
1040 		r = -ENOMEM;
1041 		goto cleanup_none;
1042 	}
1043 	ti->private = t;
1044 
1045 	/* Enable more BIO types. */
1046 	ti->num_discard_bios = 1;
1047 	ti->discards_supported = true;
1048 	ti->num_flush_bios = 1;
1049 	ti->flush_supported = true;
1050 
1051 	/*
1052 	 * We begin with a single reference to the target, which is miscdev's
1053 	 * reference.  This ensures that the target won't be freed
1054 	 * until after the miscdev has been unregistered and all extant
1055 	 * channels have been closed.
1056 	 */
1057 	kref_init(&t->references);
1058 
1059 	t->daemon_terminated = false;
1060 	mutex_init(&t->lock);
1061 	init_waitqueue_head(&t->wq);
1062 	INIT_LIST_HEAD(&t->to_user);
1063 	mempool_init_kmalloc_pool(&t->message_pool, MAX_OUTSTANDING_MESSAGES,
1064 				  sizeof(struct message));
1065 
1066 	t->miscdev.minor = MISC_DYNAMIC_MINOR;
1067 	t->miscdev.fops = &file_operations;
1068 	t->miscdev.name = kasprintf(GFP_KERNEL, "dm-user/%s", argv[2]);
1069 	if (t->miscdev.name == NULL) {
1070 		r = -ENOMEM;
1071 		goto cleanup_message_pool;
1072 	}
1073 
1074 	/*
1075 	 * Once the miscdev is registered it can be opened and therefor
1076 	 * concurrent references to the channel can happen.  Holding the target
1077 	 * lock during misc_register() could deadlock.  If registration
1078 	 * succeeds then we will not access the target again so we just stick a
1079 	 * barrier here, which pairs with taking the target lock everywhere
1080 	 * else the target is accessed.
1081 	 *
1082 	 * I forgot where we ended up on the RCpc/RCsc locks.  IIU RCsc locks
1083 	 * would mean that we could take the target lock earlier and release it
1084 	 * here instead of the memory barrier.  I'm not sure that's any better,
1085 	 * though, and this isn't on a hot path so it probably doesn't matter
1086 	 * either way.
1087 	 */
1088 	smp_mb();
1089 
1090 	r = misc_register(&t->miscdev);
1091 	if (r) {
1092 		DMERR("Unable to register miscdev %s for dm-user",
1093 		      t->miscdev.name);
1094 		r = -ENOMEM;
1095 		goto cleanup_misc_name;
1096 	}
1097 
1098 	return 0;
1099 
1100 cleanup_misc_name:
1101 	kfree(t->miscdev.name);
1102 cleanup_message_pool:
1103 	mempool_exit(&t->message_pool);
1104 	kfree(t);
1105 cleanup_none:
1106 	return r;
1107 }
1108 
user_dtr(struct dm_target * ti)1109 static void user_dtr(struct dm_target *ti)
1110 {
1111 	struct target *t = target_from_target(ti);
1112 
1113 	/*
1114 	 * Removes the miscdev.  This must be called without the target lock
1115 	 * held to avoid a possible deadlock because our open implementation is
1116 	 * called holding the miscdev lock and must later take the target lock.
1117 	 *
1118 	 * There is no race here because only DM can register/unregister the
1119 	 * miscdev, and DM ensures that doesn't happen twice.  The internal
1120 	 * miscdev lock is sufficient to ensure there are no races between
1121 	 * deregistering the miscdev and open.
1122 	 */
1123 	misc_deregister(&t->miscdev);
1124 
1125 	/*
1126 	 * We are now free to take the target's lock and drop our reference to
1127 	 * the target.  There are almost certainly tasks sleeping in read on at
1128 	 * least one of the channels associated with this target, this
1129 	 * explicitly wakes them up and terminates the read.
1130 	 */
1131 	mutex_lock(&t->lock);
1132 	/*
1133 	 * No barrier here, as wait/wake ensures that the flag visibility is
1134 	 * correct WRT the wake/sleep state of the target tasks.
1135 	 */
1136 	t->dm_destroyed = true;
1137 	wake_up_all(&t->wq);
1138 	target_put(t);
1139 }
1140 
1141 /*
1142  * Consumes a BIO from device mapper, queueing it up for userspace.
1143  */
user_map(struct dm_target * ti,struct bio * bio)1144 static int user_map(struct dm_target *ti, struct bio *bio)
1145 {
1146 	struct target *t;
1147 	struct message *entry;
1148 
1149 	t = target_from_target(ti);
1150 	/*
1151 	 * FIXME
1152 	 *
1153 	 * This seems like a bad idea.  Specifically, here we're
1154 	 * directly on the IO path when we take the target lock, which may also
1155 	 * be taken from a user context.  The user context doesn't actively
1156 	 * trigger anything that may sleep while holding the lock, but this
1157 	 * still seems like a bad idea.
1158 	 *
1159 	 * The obvious way to fix this would be to use a proper queue, which
1160 	 * would result in no shared locks between the direct IO path and user
1161 	 * tasks.  I had a version that did this, but the head-of-line blocking
1162 	 * from the circular buffer resulted in us needing a fairly large
1163 	 * allocation in order to avoid situations in which the queue fills up
1164 	 * and everything goes off the rails.
1165 	 *
1166 	 * I could jump through a some hoops to avoid a shared lock while still
1167 	 * allowing for a large queue, but I'm not actually sure that allowing
1168 	 * for very large queues is the right thing to do here.  Intuitively it
1169 	 * seems better to keep the queues small in here (essentially sized to
1170 	 * the user latency for performance reasons only) and rely on returning
1171 	 * DM_MAPIO_REQUEUE regularly, as that would give the rest of the
1172 	 * kernel more information.
1173 	 *
1174 	 * I'll spend some time trying to figure out what's going on with
1175 	 * DM_MAPIO_REQUEUE, but if someone has a better idea of how to fix
1176 	 * this I'm all ears.
1177 	 */
1178 	mutex_lock(&t->lock);
1179 
1180 	/*
1181 	 * FIXME
1182 	 *
1183 	 * The assumption here is that there's no benefit to returning
1184 	 * DM_MAPIO_KILL as opposed to just erroring out the BIO, but I'm not
1185 	 * sure that's actually true -- for example, I could imagine users
1186 	 * expecting that submitted BIOs are unlikely to fail and therefor
1187 	 * relying on submission failure to indicate an unsupported type.
1188 	 *
1189 	 * There's two ways I can think of to fix this:
1190 	 *   - Add DM arguments that are parsed during the constructor that
1191 	 *     allow various dm_target flags to be set that indicate the op
1192 	 *     types supported by this target.  This may make sense for things
1193 	 *     like discard, where DM can already transform the BIOs to a form
1194 	 *     that's likely to be supported.
1195 	 *   - Some sort of pre-filter that allows userspace to hook in here
1196 	 *     and kill BIOs before marking them as submitted.  My guess would
1197 	 *     be that a userspace round trip is a bad idea here, but a BPF
1198 	 *     call seems resonable.
1199 	 *
1200 	 * My guess is that we'd likely want to do both.  The first one is easy
1201 	 * and gives DM the proper info, so it seems better.  The BPF call
1202 	 * seems overly complex for just this, but one could imagine wanting to
1203 	 * sometimes return _MAPPED and a BPF filter would be the way to do
1204 	 * that.
1205 	 *
1206 	 * For example, in Android we have an in-kernel DM device called
1207 	 * "dm-bow" that takes advange of some portion of the space that has
1208 	 * been discarded on a device to provide opportunistic block-level
1209 	 * backups.  While one could imagine just implementing this entirely in
1210 	 * userspace, that would come with an appreciable performance penalty.
1211 	 * Instead one could keep a BPF program that forwards most accesses
1212 	 * directly to the backing block device while informing a userspace
1213 	 * daemon of any discarded space and on writes to blocks that are to be
1214 	 * backed up.
1215 	 */
1216 	if (unlikely((bio_type_to_user_type(bio) < 0) ||
1217 		     (bio_flags_to_user_flags(bio) < 0))) {
1218 		mutex_unlock(&t->lock);
1219 		return DM_MAPIO_KILL;
1220 	}
1221 
1222 	entry = msg_get_map(t);
1223 	if (unlikely(entry == NULL)) {
1224 		mutex_unlock(&t->lock);
1225 		return DM_MAPIO_REQUEUE;
1226 	}
1227 
1228 	entry->msg.type = bio_type_to_user_type(bio);
1229 	entry->msg.flags = bio_flags_to_user_flags(bio);
1230 	entry->msg.sector = bio->bi_iter.bi_sector;
1231 	entry->msg.len = bio_size(bio);
1232 	entry->bio = bio;
1233 	entry->posn_to_user = 0;
1234 	entry->total_to_user = bio_bytes_needed_to_user(bio);
1235 	entry->posn_from_user = 0;
1236 	entry->total_from_user = bio_bytes_needed_from_user(bio);
1237 	entry->delayed = false;
1238 	entry->t = t;
1239 	/* Pairs with the barrier in dev_read() */
1240 	smp_wmb();
1241 	list_add_tail(&entry->to_user, &t->to_user);
1242 
1243 	/*
1244 	 * If there is no daemon to process the IO's,
1245 	 * queue these messages into a workqueue with
1246 	 * a timeout.
1247 	 */
1248 	if (!is_user_space_thread_present(t))
1249 		enqueue_delayed_work(entry, !t->daemon_terminated);
1250 
1251 	wake_up_interruptible(&t->wq);
1252 	mutex_unlock(&t->lock);
1253 	return DM_MAPIO_SUBMITTED;
1254 }
1255 
1256 static struct target_type user_target = {
1257 	.name = "user",
1258 	.version = { 1, 0, 0 },
1259 	.module = THIS_MODULE,
1260 	.ctr = user_ctr,
1261 	.dtr = user_dtr,
1262 	.map = user_map,
1263 };
1264 
dm_user_init(void)1265 static int __init dm_user_init(void)
1266 {
1267 	int r;
1268 
1269 	r = dm_register_target(&user_target);
1270 	if (r) {
1271 		DMERR("register failed %d", r);
1272 		goto error;
1273 	}
1274 
1275 	return 0;
1276 
1277 error:
1278 	return r;
1279 }
1280 
dm_user_exit(void)1281 static void __exit dm_user_exit(void)
1282 {
1283 	dm_unregister_target(&user_target);
1284 }
1285 
1286 module_init(dm_user_init);
1287 module_exit(dm_user_exit);
1288 MODULE_AUTHOR("Palmer Dabbelt <palmerdabbelt@google.com>");
1289 MODULE_DESCRIPTION(DM_NAME " target returning blocks from userspace");
1290 MODULE_LICENSE("GPL");
1291