• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2002 Sistina Software (UK) Limited.
3  * Copyright (C) 2006 Red Hat GmbH
4  *
5  * This file is released under the GPL.
6  *
7  * Kcopyd provides a simple interface for copying an area of one
8  * block-device to one or more other block-devices, with an asynchronous
9  * completion notification.
10  */
11 
12 #include <linux/types.h>
13 #include <linux/atomic.h>
14 #include <linux/blkdev.h>
15 #include <linux/fs.h>
16 #include <linux/init.h>
17 #include <linux/list.h>
18 #include <linux/mempool.h>
19 #include <linux/module.h>
20 #include <linux/pagemap.h>
21 #include <linux/slab.h>
22 #include <linux/vmalloc.h>
23 #include <linux/workqueue.h>
24 #include <linux/mutex.h>
25 #include <linux/delay.h>
26 #include <linux/device-mapper.h>
27 #include <linux/dm-kcopyd.h>
28 
29 #include "dm.h"
30 
31 #define SUB_JOB_SIZE	128
32 #define SPLIT_COUNT	8
33 #define MIN_JOBS	8
34 #define RESERVE_PAGES	(DIV_ROUND_UP(SUB_JOB_SIZE << SECTOR_SHIFT, PAGE_SIZE))
35 
36 /*-----------------------------------------------------------------
37  * Each kcopyd client has its own little pool of preallocated
38  * pages for kcopyd io.
39  *---------------------------------------------------------------*/
40 struct dm_kcopyd_client {
41 	struct page_list *pages;
42 	unsigned nr_reserved_pages;
43 	unsigned nr_free_pages;
44 
45 	struct dm_io_client *io_client;
46 
47 	wait_queue_head_t destroyq;
48 	atomic_t nr_jobs;
49 
50 	mempool_t *job_pool;
51 
52 	struct workqueue_struct *kcopyd_wq;
53 	struct work_struct kcopyd_work;
54 
55 	struct dm_kcopyd_throttle *throttle;
56 
57 /*
58  * We maintain four lists of jobs:
59  *
60  * i)   jobs waiting for pages
61  * ii)  jobs that have pages, and are waiting for the io to be issued.
62  * iii) jobs that don't need to do any IO and just run a callback
63  * iv) jobs that have completed.
64  *
65  * All four of these are protected by job_lock.
66  */
67 	spinlock_t job_lock;
68 	struct list_head callback_jobs;
69 	struct list_head complete_jobs;
70 	struct list_head io_jobs;
71 	struct list_head pages_jobs;
72 };
73 
74 static struct page_list zero_page_list;
75 
76 static DEFINE_SPINLOCK(throttle_spinlock);
77 
78 /*
79  * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period.
80  * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided
81  * by 2.
82  */
83 #define ACCOUNT_INTERVAL_SHIFT		SHIFT_HZ
84 
85 /*
86  * Sleep this number of milliseconds.
87  *
88  * The value was decided experimentally.
89  * Smaller values seem to cause an increased copy rate above the limit.
90  * The reason for this is unknown but possibly due to jiffies rounding errors
91  * or read/write cache inside the disk.
92  */
93 #define SLEEP_MSEC			100
94 
95 /*
96  * Maximum number of sleep events. There is a theoretical livelock if more
97  * kcopyd clients do work simultaneously which this limit avoids.
98  */
99 #define MAX_SLEEPS			10
100 
io_job_start(struct dm_kcopyd_throttle * t)101 static void io_job_start(struct dm_kcopyd_throttle *t)
102 {
103 	unsigned throttle, now, difference;
104 	int slept = 0, skew;
105 
106 	if (unlikely(!t))
107 		return;
108 
109 try_again:
110 	spin_lock_irq(&throttle_spinlock);
111 
112 	throttle = ACCESS_ONCE(t->throttle);
113 
114 	if (likely(throttle >= 100))
115 		goto skip_limit;
116 
117 	now = jiffies;
118 	difference = now - t->last_jiffies;
119 	t->last_jiffies = now;
120 	if (t->num_io_jobs)
121 		t->io_period += difference;
122 	t->total_period += difference;
123 
124 	/*
125 	 * Maintain sane values if we got a temporary overflow.
126 	 */
127 	if (unlikely(t->io_period > t->total_period))
128 		t->io_period = t->total_period;
129 
130 	if (unlikely(t->total_period >= (1 << ACCOUNT_INTERVAL_SHIFT))) {
131 		int shift = fls(t->total_period >> ACCOUNT_INTERVAL_SHIFT);
132 		t->total_period >>= shift;
133 		t->io_period >>= shift;
134 	}
135 
136 	skew = t->io_period - throttle * t->total_period / 100;
137 
138 	if (unlikely(skew > 0) && slept < MAX_SLEEPS) {
139 		slept++;
140 		spin_unlock_irq(&throttle_spinlock);
141 		msleep(SLEEP_MSEC);
142 		goto try_again;
143 	}
144 
145 skip_limit:
146 	t->num_io_jobs++;
147 
148 	spin_unlock_irq(&throttle_spinlock);
149 }
150 
io_job_finish(struct dm_kcopyd_throttle * t)151 static void io_job_finish(struct dm_kcopyd_throttle *t)
152 {
153 	unsigned long flags;
154 
155 	if (unlikely(!t))
156 		return;
157 
158 	spin_lock_irqsave(&throttle_spinlock, flags);
159 
160 	t->num_io_jobs--;
161 
162 	if (likely(ACCESS_ONCE(t->throttle) >= 100))
163 		goto skip_limit;
164 
165 	if (!t->num_io_jobs) {
166 		unsigned now, difference;
167 
168 		now = jiffies;
169 		difference = now - t->last_jiffies;
170 		t->last_jiffies = now;
171 
172 		t->io_period += difference;
173 		t->total_period += difference;
174 
175 		/*
176 		 * Maintain sane values if we got a temporary overflow.
177 		 */
178 		if (unlikely(t->io_period > t->total_period))
179 			t->io_period = t->total_period;
180 	}
181 
182 skip_limit:
183 	spin_unlock_irqrestore(&throttle_spinlock, flags);
184 }
185 
186 
wake(struct dm_kcopyd_client * kc)187 static void wake(struct dm_kcopyd_client *kc)
188 {
189 	queue_work(kc->kcopyd_wq, &kc->kcopyd_work);
190 }
191 
192 /*
193  * Obtain one page for the use of kcopyd.
194  */
alloc_pl(gfp_t gfp)195 static struct page_list *alloc_pl(gfp_t gfp)
196 {
197 	struct page_list *pl;
198 
199 	pl = kmalloc(sizeof(*pl), gfp);
200 	if (!pl)
201 		return NULL;
202 
203 	pl->page = alloc_page(gfp);
204 	if (!pl->page) {
205 		kfree(pl);
206 		return NULL;
207 	}
208 
209 	return pl;
210 }
211 
free_pl(struct page_list * pl)212 static void free_pl(struct page_list *pl)
213 {
214 	__free_page(pl->page);
215 	kfree(pl);
216 }
217 
218 /*
219  * Add the provided pages to a client's free page list, releasing
220  * back to the system any beyond the reserved_pages limit.
221  */
kcopyd_put_pages(struct dm_kcopyd_client * kc,struct page_list * pl)222 static void kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
223 {
224 	struct page_list *next;
225 
226 	do {
227 		next = pl->next;
228 
229 		if (kc->nr_free_pages >= kc->nr_reserved_pages)
230 			free_pl(pl);
231 		else {
232 			pl->next = kc->pages;
233 			kc->pages = pl;
234 			kc->nr_free_pages++;
235 		}
236 
237 		pl = next;
238 	} while (pl);
239 }
240 
kcopyd_get_pages(struct dm_kcopyd_client * kc,unsigned int nr,struct page_list ** pages)241 static int kcopyd_get_pages(struct dm_kcopyd_client *kc,
242 			    unsigned int nr, struct page_list **pages)
243 {
244 	struct page_list *pl;
245 
246 	*pages = NULL;
247 
248 	do {
249 		pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM);
250 		if (unlikely(!pl)) {
251 			/* Use reserved pages */
252 			pl = kc->pages;
253 			if (unlikely(!pl))
254 				goto out_of_memory;
255 			kc->pages = pl->next;
256 			kc->nr_free_pages--;
257 		}
258 		pl->next = *pages;
259 		*pages = pl;
260 	} while (--nr);
261 
262 	return 0;
263 
264 out_of_memory:
265 	if (*pages)
266 		kcopyd_put_pages(kc, *pages);
267 	return -ENOMEM;
268 }
269 
270 /*
271  * These three functions resize the page pool.
272  */
drop_pages(struct page_list * pl)273 static void drop_pages(struct page_list *pl)
274 {
275 	struct page_list *next;
276 
277 	while (pl) {
278 		next = pl->next;
279 		free_pl(pl);
280 		pl = next;
281 	}
282 }
283 
284 /*
285  * Allocate and reserve nr_pages for the use of a specific client.
286  */
client_reserve_pages(struct dm_kcopyd_client * kc,unsigned nr_pages)287 static int client_reserve_pages(struct dm_kcopyd_client *kc, unsigned nr_pages)
288 {
289 	unsigned i;
290 	struct page_list *pl = NULL, *next;
291 
292 	for (i = 0; i < nr_pages; i++) {
293 		next = alloc_pl(GFP_KERNEL);
294 		if (!next) {
295 			if (pl)
296 				drop_pages(pl);
297 			return -ENOMEM;
298 		}
299 		next->next = pl;
300 		pl = next;
301 	}
302 
303 	kc->nr_reserved_pages += nr_pages;
304 	kcopyd_put_pages(kc, pl);
305 
306 	return 0;
307 }
308 
client_free_pages(struct dm_kcopyd_client * kc)309 static void client_free_pages(struct dm_kcopyd_client *kc)
310 {
311 	BUG_ON(kc->nr_free_pages != kc->nr_reserved_pages);
312 	drop_pages(kc->pages);
313 	kc->pages = NULL;
314 	kc->nr_free_pages = kc->nr_reserved_pages = 0;
315 }
316 
317 /*-----------------------------------------------------------------
318  * kcopyd_jobs need to be allocated by the *clients* of kcopyd,
319  * for this reason we use a mempool to prevent the client from
320  * ever having to do io (which could cause a deadlock).
321  *---------------------------------------------------------------*/
322 struct kcopyd_job {
323 	struct dm_kcopyd_client *kc;
324 	struct list_head list;
325 	unsigned long flags;
326 
327 	/*
328 	 * Error state of the job.
329 	 */
330 	int read_err;
331 	unsigned long write_err;
332 
333 	/*
334 	 * Either READ or WRITE
335 	 */
336 	int rw;
337 	struct dm_io_region source;
338 
339 	/*
340 	 * The destinations for the transfer.
341 	 */
342 	unsigned int num_dests;
343 	struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
344 
345 	struct page_list *pages;
346 
347 	/*
348 	 * Set this to ensure you are notified when the job has
349 	 * completed.  'context' is for callback to use.
350 	 */
351 	dm_kcopyd_notify_fn fn;
352 	void *context;
353 
354 	/*
355 	 * These fields are only used if the job has been split
356 	 * into more manageable parts.
357 	 */
358 	struct mutex lock;
359 	atomic_t sub_jobs;
360 	sector_t progress;
361 
362 	struct kcopyd_job *master_job;
363 };
364 
365 static struct kmem_cache *_job_cache;
366 
dm_kcopyd_init(void)367 int __init dm_kcopyd_init(void)
368 {
369 	_job_cache = kmem_cache_create("kcopyd_job",
370 				sizeof(struct kcopyd_job) * (SPLIT_COUNT + 1),
371 				__alignof__(struct kcopyd_job), 0, NULL);
372 	if (!_job_cache)
373 		return -ENOMEM;
374 
375 	zero_page_list.next = &zero_page_list;
376 	zero_page_list.page = ZERO_PAGE(0);
377 
378 	return 0;
379 }
380 
dm_kcopyd_exit(void)381 void dm_kcopyd_exit(void)
382 {
383 	kmem_cache_destroy(_job_cache);
384 	_job_cache = NULL;
385 }
386 
387 /*
388  * Functions to push and pop a job onto the head of a given job
389  * list.
390  */
pop(struct list_head * jobs,struct dm_kcopyd_client * kc)391 static struct kcopyd_job *pop(struct list_head *jobs,
392 			      struct dm_kcopyd_client *kc)
393 {
394 	struct kcopyd_job *job = NULL;
395 	unsigned long flags;
396 
397 	spin_lock_irqsave(&kc->job_lock, flags);
398 
399 	if (!list_empty(jobs)) {
400 		job = list_entry(jobs->next, struct kcopyd_job, list);
401 		list_del(&job->list);
402 	}
403 	spin_unlock_irqrestore(&kc->job_lock, flags);
404 
405 	return job;
406 }
407 
push(struct list_head * jobs,struct kcopyd_job * job)408 static void push(struct list_head *jobs, struct kcopyd_job *job)
409 {
410 	unsigned long flags;
411 	struct dm_kcopyd_client *kc = job->kc;
412 
413 	spin_lock_irqsave(&kc->job_lock, flags);
414 	list_add_tail(&job->list, jobs);
415 	spin_unlock_irqrestore(&kc->job_lock, flags);
416 }
417 
418 
push_head(struct list_head * jobs,struct kcopyd_job * job)419 static void push_head(struct list_head *jobs, struct kcopyd_job *job)
420 {
421 	unsigned long flags;
422 	struct dm_kcopyd_client *kc = job->kc;
423 
424 	spin_lock_irqsave(&kc->job_lock, flags);
425 	list_add(&job->list, jobs);
426 	spin_unlock_irqrestore(&kc->job_lock, flags);
427 }
428 
429 /*
430  * These three functions process 1 item from the corresponding
431  * job list.
432  *
433  * They return:
434  * < 0: error
435  *   0: success
436  * > 0: can't process yet.
437  */
run_complete_job(struct kcopyd_job * job)438 static int run_complete_job(struct kcopyd_job *job)
439 {
440 	void *context = job->context;
441 	int read_err = job->read_err;
442 	unsigned long write_err = job->write_err;
443 	dm_kcopyd_notify_fn fn = job->fn;
444 	struct dm_kcopyd_client *kc = job->kc;
445 
446 	if (job->pages && job->pages != &zero_page_list)
447 		kcopyd_put_pages(kc, job->pages);
448 	/*
449 	 * If this is the master job, the sub jobs have already
450 	 * completed so we can free everything.
451 	 */
452 	if (job->master_job == job)
453 		mempool_free(job, kc->job_pool);
454 	fn(read_err, write_err, context);
455 
456 	if (atomic_dec_and_test(&kc->nr_jobs))
457 		wake_up(&kc->destroyq);
458 
459 	cond_resched();
460 
461 	return 0;
462 }
463 
complete_io(unsigned long error,void * context)464 static void complete_io(unsigned long error, void *context)
465 {
466 	struct kcopyd_job *job = (struct kcopyd_job *) context;
467 	struct dm_kcopyd_client *kc = job->kc;
468 
469 	io_job_finish(kc->throttle);
470 
471 	if (error) {
472 		if (job->rw & WRITE)
473 			job->write_err |= error;
474 		else
475 			job->read_err = 1;
476 
477 		if (!test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
478 			push(&kc->complete_jobs, job);
479 			wake(kc);
480 			return;
481 		}
482 	}
483 
484 	if (job->rw & WRITE)
485 		push(&kc->complete_jobs, job);
486 
487 	else {
488 		job->rw = WRITE;
489 		push(&kc->io_jobs, job);
490 	}
491 
492 	wake(kc);
493 }
494 
495 /*
496  * Request io on as many buffer heads as we can currently get for
497  * a particular job.
498  */
run_io_job(struct kcopyd_job * job)499 static int run_io_job(struct kcopyd_job *job)
500 {
501 	int r;
502 	struct dm_io_request io_req = {
503 		.bi_rw = job->rw,
504 		.mem.type = DM_IO_PAGE_LIST,
505 		.mem.ptr.pl = job->pages,
506 		.mem.offset = 0,
507 		.notify.fn = complete_io,
508 		.notify.context = job,
509 		.client = job->kc->io_client,
510 	};
511 
512 	io_job_start(job->kc->throttle);
513 
514 	if (job->rw == READ)
515 		r = dm_io(&io_req, 1, &job->source, NULL);
516 	else
517 		r = dm_io(&io_req, job->num_dests, job->dests, NULL);
518 
519 	return r;
520 }
521 
run_pages_job(struct kcopyd_job * job)522 static int run_pages_job(struct kcopyd_job *job)
523 {
524 	int r;
525 	unsigned nr_pages = dm_div_up(job->dests[0].count, PAGE_SIZE >> 9);
526 
527 	r = kcopyd_get_pages(job->kc, nr_pages, &job->pages);
528 	if (!r) {
529 		/* this job is ready for io */
530 		push(&job->kc->io_jobs, job);
531 		return 0;
532 	}
533 
534 	if (r == -ENOMEM)
535 		/* can't complete now */
536 		return 1;
537 
538 	return r;
539 }
540 
541 /*
542  * Run through a list for as long as possible.  Returns the count
543  * of successful jobs.
544  */
process_jobs(struct list_head * jobs,struct dm_kcopyd_client * kc,int (* fn)(struct kcopyd_job *))545 static int process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc,
546 			int (*fn) (struct kcopyd_job *))
547 {
548 	struct kcopyd_job *job;
549 	int r, count = 0;
550 
551 	while ((job = pop(jobs, kc))) {
552 
553 		r = fn(job);
554 
555 		if (r < 0) {
556 			/* error this rogue job */
557 			if (job->rw & WRITE)
558 				job->write_err = (unsigned long) -1L;
559 			else
560 				job->read_err = 1;
561 			push(&kc->complete_jobs, job);
562 			break;
563 		}
564 
565 		if (r > 0) {
566 			/*
567 			 * We couldn't service this job ATM, so
568 			 * push this job back onto the list.
569 			 */
570 			push_head(jobs, job);
571 			break;
572 		}
573 
574 		count++;
575 	}
576 
577 	return count;
578 }
579 
580 /*
581  * kcopyd does this every time it's woken up.
582  */
do_work(struct work_struct * work)583 static void do_work(struct work_struct *work)
584 {
585 	struct dm_kcopyd_client *kc = container_of(work,
586 					struct dm_kcopyd_client, kcopyd_work);
587 	struct blk_plug plug;
588 	unsigned long flags;
589 
590 	/*
591 	 * The order that these are called is *very* important.
592 	 * complete jobs can free some pages for pages jobs.
593 	 * Pages jobs when successful will jump onto the io jobs
594 	 * list.  io jobs call wake when they complete and it all
595 	 * starts again.
596 	 */
597 	spin_lock_irqsave(&kc->job_lock, flags);
598 	list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
599 	spin_unlock_irqrestore(&kc->job_lock, flags);
600 
601 	blk_start_plug(&plug);
602 	process_jobs(&kc->complete_jobs, kc, run_complete_job);
603 	process_jobs(&kc->pages_jobs, kc, run_pages_job);
604 	process_jobs(&kc->io_jobs, kc, run_io_job);
605 	blk_finish_plug(&plug);
606 }
607 
608 /*
609  * If we are copying a small region we just dispatch a single job
610  * to do the copy, otherwise the io has to be split up into many
611  * jobs.
612  */
dispatch_job(struct kcopyd_job * job)613 static void dispatch_job(struct kcopyd_job *job)
614 {
615 	struct dm_kcopyd_client *kc = job->kc;
616 	atomic_inc(&kc->nr_jobs);
617 	if (unlikely(!job->source.count))
618 		push(&kc->callback_jobs, job);
619 	else if (job->pages == &zero_page_list)
620 		push(&kc->io_jobs, job);
621 	else
622 		push(&kc->pages_jobs, job);
623 	wake(kc);
624 }
625 
segment_complete(int read_err,unsigned long write_err,void * context)626 static void segment_complete(int read_err, unsigned long write_err,
627 			     void *context)
628 {
629 	/* FIXME: tidy this function */
630 	sector_t progress = 0;
631 	sector_t count = 0;
632 	struct kcopyd_job *sub_job = (struct kcopyd_job *) context;
633 	struct kcopyd_job *job = sub_job->master_job;
634 	struct dm_kcopyd_client *kc = job->kc;
635 
636 	mutex_lock(&job->lock);
637 
638 	/* update the error */
639 	if (read_err)
640 		job->read_err = 1;
641 
642 	if (write_err)
643 		job->write_err |= write_err;
644 
645 	/*
646 	 * Only dispatch more work if there hasn't been an error.
647 	 */
648 	if ((!job->read_err && !job->write_err) ||
649 	    test_bit(DM_KCOPYD_IGNORE_ERROR, &job->flags)) {
650 		/* get the next chunk of work */
651 		progress = job->progress;
652 		count = job->source.count - progress;
653 		if (count) {
654 			if (count > SUB_JOB_SIZE)
655 				count = SUB_JOB_SIZE;
656 
657 			job->progress += count;
658 		}
659 	}
660 	mutex_unlock(&job->lock);
661 
662 	if (count) {
663 		int i;
664 
665 		*sub_job = *job;
666 		sub_job->source.sector += progress;
667 		sub_job->source.count = count;
668 
669 		for (i = 0; i < job->num_dests; i++) {
670 			sub_job->dests[i].sector += progress;
671 			sub_job->dests[i].count = count;
672 		}
673 
674 		sub_job->fn = segment_complete;
675 		sub_job->context = sub_job;
676 		dispatch_job(sub_job);
677 
678 	} else if (atomic_dec_and_test(&job->sub_jobs)) {
679 
680 		/*
681 		 * Queue the completion callback to the kcopyd thread.
682 		 *
683 		 * Some callers assume that all the completions are called
684 		 * from a single thread and don't race with each other.
685 		 *
686 		 * We must not call the callback directly here because this
687 		 * code may not be executing in the thread.
688 		 */
689 		push(&kc->complete_jobs, job);
690 		wake(kc);
691 	}
692 }
693 
694 /*
695  * Create some sub jobs to share the work between them.
696  */
split_job(struct kcopyd_job * master_job)697 static void split_job(struct kcopyd_job *master_job)
698 {
699 	int i;
700 
701 	atomic_inc(&master_job->kc->nr_jobs);
702 
703 	atomic_set(&master_job->sub_jobs, SPLIT_COUNT);
704 	for (i = 0; i < SPLIT_COUNT; i++) {
705 		master_job[i + 1].master_job = master_job;
706 		segment_complete(0, 0u, &master_job[i + 1]);
707 	}
708 }
709 
dm_kcopyd_copy(struct dm_kcopyd_client * kc,struct dm_io_region * from,unsigned int num_dests,struct dm_io_region * dests,unsigned int flags,dm_kcopyd_notify_fn fn,void * context)710 int dm_kcopyd_copy(struct dm_kcopyd_client *kc, struct dm_io_region *from,
711 		   unsigned int num_dests, struct dm_io_region *dests,
712 		   unsigned int flags, dm_kcopyd_notify_fn fn, void *context)
713 {
714 	struct kcopyd_job *job;
715 	int i;
716 
717 	/*
718 	 * Allocate an array of jobs consisting of one master job
719 	 * followed by SPLIT_COUNT sub jobs.
720 	 */
721 	job = mempool_alloc(kc->job_pool, GFP_NOIO);
722 
723 	/*
724 	 * set up for the read.
725 	 */
726 	job->kc = kc;
727 	job->flags = flags;
728 	job->read_err = 0;
729 	job->write_err = 0;
730 
731 	job->num_dests = num_dests;
732 	memcpy(&job->dests, dests, sizeof(*dests) * num_dests);
733 
734 	if (from) {
735 		job->source = *from;
736 		job->pages = NULL;
737 		job->rw = READ;
738 	} else {
739 		memset(&job->source, 0, sizeof job->source);
740 		job->source.count = job->dests[0].count;
741 		job->pages = &zero_page_list;
742 
743 		/*
744 		 * Use WRITE SAME to optimize zeroing if all dests support it.
745 		 */
746 		job->rw = WRITE | REQ_WRITE_SAME;
747 		for (i = 0; i < job->num_dests; i++)
748 			if (!bdev_write_same(job->dests[i].bdev)) {
749 				job->rw = WRITE;
750 				break;
751 			}
752 	}
753 
754 	job->fn = fn;
755 	job->context = context;
756 	job->master_job = job;
757 
758 	if (job->source.count <= SUB_JOB_SIZE)
759 		dispatch_job(job);
760 	else {
761 		mutex_init(&job->lock);
762 		job->progress = 0;
763 		split_job(job);
764 	}
765 
766 	return 0;
767 }
768 EXPORT_SYMBOL(dm_kcopyd_copy);
769 
dm_kcopyd_zero(struct dm_kcopyd_client * kc,unsigned num_dests,struct dm_io_region * dests,unsigned flags,dm_kcopyd_notify_fn fn,void * context)770 int dm_kcopyd_zero(struct dm_kcopyd_client *kc,
771 		   unsigned num_dests, struct dm_io_region *dests,
772 		   unsigned flags, dm_kcopyd_notify_fn fn, void *context)
773 {
774 	return dm_kcopyd_copy(kc, NULL, num_dests, dests, flags, fn, context);
775 }
776 EXPORT_SYMBOL(dm_kcopyd_zero);
777 
dm_kcopyd_prepare_callback(struct dm_kcopyd_client * kc,dm_kcopyd_notify_fn fn,void * context)778 void *dm_kcopyd_prepare_callback(struct dm_kcopyd_client *kc,
779 				 dm_kcopyd_notify_fn fn, void *context)
780 {
781 	struct kcopyd_job *job;
782 
783 	job = mempool_alloc(kc->job_pool, GFP_NOIO);
784 
785 	memset(job, 0, sizeof(struct kcopyd_job));
786 	job->kc = kc;
787 	job->fn = fn;
788 	job->context = context;
789 	job->master_job = job;
790 
791 	atomic_inc(&kc->nr_jobs);
792 
793 	return job;
794 }
795 EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
796 
dm_kcopyd_do_callback(void * j,int read_err,unsigned long write_err)797 void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
798 {
799 	struct kcopyd_job *job = j;
800 	struct dm_kcopyd_client *kc = job->kc;
801 
802 	job->read_err = read_err;
803 	job->write_err = write_err;
804 
805 	push(&kc->callback_jobs, job);
806 	wake(kc);
807 }
808 EXPORT_SYMBOL(dm_kcopyd_do_callback);
809 
810 /*
811  * Cancels a kcopyd job, eg. someone might be deactivating a
812  * mirror.
813  */
814 #if 0
815 int kcopyd_cancel(struct kcopyd_job *job, int block)
816 {
817 	/* FIXME: finish */
818 	return -1;
819 }
820 #endif  /*  0  */
821 
822 /*-----------------------------------------------------------------
823  * Client setup
824  *---------------------------------------------------------------*/
dm_kcopyd_client_create(struct dm_kcopyd_throttle * throttle)825 struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
826 {
827 	int r = -ENOMEM;
828 	struct dm_kcopyd_client *kc;
829 
830 	kc = kzalloc(sizeof(*kc), GFP_KERNEL);
831 	if (!kc)
832 		return ERR_PTR(-ENOMEM);
833 
834 	spin_lock_init(&kc->job_lock);
835 	INIT_LIST_HEAD(&kc->callback_jobs);
836 	INIT_LIST_HEAD(&kc->complete_jobs);
837 	INIT_LIST_HEAD(&kc->io_jobs);
838 	INIT_LIST_HEAD(&kc->pages_jobs);
839 	kc->throttle = throttle;
840 
841 	kc->job_pool = mempool_create_slab_pool(MIN_JOBS, _job_cache);
842 	if (!kc->job_pool)
843 		goto bad_slab;
844 
845 	INIT_WORK(&kc->kcopyd_work, do_work);
846 	kc->kcopyd_wq = alloc_workqueue("kcopyd", WQ_MEM_RECLAIM, 0);
847 	if (!kc->kcopyd_wq)
848 		goto bad_workqueue;
849 
850 	kc->pages = NULL;
851 	kc->nr_reserved_pages = kc->nr_free_pages = 0;
852 	r = client_reserve_pages(kc, RESERVE_PAGES);
853 	if (r)
854 		goto bad_client_pages;
855 
856 	kc->io_client = dm_io_client_create();
857 	if (IS_ERR(kc->io_client)) {
858 		r = PTR_ERR(kc->io_client);
859 		goto bad_io_client;
860 	}
861 
862 	init_waitqueue_head(&kc->destroyq);
863 	atomic_set(&kc->nr_jobs, 0);
864 
865 	return kc;
866 
867 bad_io_client:
868 	client_free_pages(kc);
869 bad_client_pages:
870 	destroy_workqueue(kc->kcopyd_wq);
871 bad_workqueue:
872 	mempool_destroy(kc->job_pool);
873 bad_slab:
874 	kfree(kc);
875 
876 	return ERR_PTR(r);
877 }
878 EXPORT_SYMBOL(dm_kcopyd_client_create);
879 
dm_kcopyd_client_destroy(struct dm_kcopyd_client * kc)880 void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
881 {
882 	/* Wait for completion of all jobs submitted by this client. */
883 	wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
884 
885 	BUG_ON(!list_empty(&kc->callback_jobs));
886 	BUG_ON(!list_empty(&kc->complete_jobs));
887 	BUG_ON(!list_empty(&kc->io_jobs));
888 	BUG_ON(!list_empty(&kc->pages_jobs));
889 	destroy_workqueue(kc->kcopyd_wq);
890 	dm_io_client_destroy(kc->io_client);
891 	client_free_pages(kc);
892 	mempool_destroy(kc->job_pool);
893 	kfree(kc);
894 }
895 EXPORT_SYMBOL(dm_kcopyd_client_destroy);
896