• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2    drbd_receiver.c
3 
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5 
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9 
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14 
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24 
25 
26 #include <linux/module.h>
27 
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30 
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52 
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
54 
55 struct packet_info {
56 	enum drbd_packet cmd;
57 	unsigned int size;
58 	unsigned int vnr;
59 	void *data;
60 };
61 
62 enum finish_epoch {
63 	FE_STILL_LIVE,
64 	FE_DESTROYED,
65 	FE_RECYCLED,
66 };
67 
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74 
75 
76 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
77 
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82 
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
page_chain_del(struct page ** head,int n)87 static struct page *page_chain_del(struct page **head, int n)
88 {
89 	struct page *page;
90 	struct page *tmp;
91 
92 	BUG_ON(!n);
93 	BUG_ON(!head);
94 
95 	page = *head;
96 
97 	if (!page)
98 		return NULL;
99 
100 	while (page) {
101 		tmp = page_chain_next(page);
102 		if (--n == 0)
103 			break; /* found sufficient pages */
104 		if (tmp == NULL)
105 			/* insufficient pages, don't use any of them. */
106 			return NULL;
107 		page = tmp;
108 	}
109 
110 	/* add end of list marker for the returned list */
111 	set_page_private(page, 0);
112 	/* actual return value, and adjustment of head */
113 	page = *head;
114 	*head = tmp;
115 	return page;
116 }
117 
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
page_chain_tail(struct page * page,int * len)121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123 	struct page *tmp;
124 	int i = 1;
125 	while ((tmp = page_chain_next(page)))
126 		++i, page = tmp;
127 	if (len)
128 		*len = i;
129 	return page;
130 }
131 
page_chain_free(struct page * page)132 static int page_chain_free(struct page *page)
133 {
134 	struct page *tmp;
135 	int i = 0;
136 	page_chain_for_each_safe(page, tmp) {
137 		put_page(page);
138 		++i;
139 	}
140 	return i;
141 }
142 
page_chain_add(struct page ** head,struct page * chain_first,struct page * chain_last)143 static void page_chain_add(struct page **head,
144 		struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147 	struct page *tmp;
148 	tmp = page_chain_tail(chain_first, NULL);
149 	BUG_ON(tmp != chain_last);
150 #endif
151 
152 	/* add chain to head */
153 	set_page_private(chain_last, (unsigned long)*head);
154 	*head = chain_first;
155 }
156 
__drbd_alloc_pages(struct drbd_device * device,unsigned int number)157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158 				       unsigned int number)
159 {
160 	struct page *page = NULL;
161 	struct page *tmp = NULL;
162 	unsigned int i = 0;
163 
164 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
165 	 * So what. It saves a spin_lock. */
166 	if (drbd_pp_vacant >= number) {
167 		spin_lock(&drbd_pp_lock);
168 		page = page_chain_del(&drbd_pp_pool, number);
169 		if (page)
170 			drbd_pp_vacant -= number;
171 		spin_unlock(&drbd_pp_lock);
172 		if (page)
173 			return page;
174 	}
175 
176 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
178 	 * which in turn might block on the other node at this very place.  */
179 	for (i = 0; i < number; i++) {
180 		tmp = alloc_page(GFP_TRY);
181 		if (!tmp)
182 			break;
183 		set_page_private(tmp, (unsigned long)page);
184 		page = tmp;
185 	}
186 
187 	if (i == number)
188 		return page;
189 
190 	/* Not enough pages immediately available this time.
191 	 * No need to jump around here, drbd_alloc_pages will retry this
192 	 * function "soon". */
193 	if (page) {
194 		tmp = page_chain_tail(page, NULL);
195 		spin_lock(&drbd_pp_lock);
196 		page_chain_add(&drbd_pp_pool, page, tmp);
197 		drbd_pp_vacant += i;
198 		spin_unlock(&drbd_pp_lock);
199 	}
200 	return NULL;
201 }
202 
reclaim_finished_net_peer_reqs(struct drbd_device * device,struct list_head * to_be_freed)203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204 					   struct list_head *to_be_freed)
205 {
206 	struct drbd_peer_request *peer_req, *tmp;
207 
208 	/* The EEs are always appended to the end of the list. Since
209 	   they are sent in order over the wire, they have to finish
210 	   in order. As soon as we see the first not finished we can
211 	   stop to examine the list... */
212 
213 	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214 		if (drbd_peer_req_has_active_page(peer_req))
215 			break;
216 		list_move(&peer_req->w.list, to_be_freed);
217 	}
218 }
219 
drbd_reclaim_net_peer_reqs(struct drbd_device * device)220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222 	LIST_HEAD(reclaimed);
223 	struct drbd_peer_request *peer_req, *t;
224 
225 	spin_lock_irq(&device->resource->req_lock);
226 	reclaim_finished_net_peer_reqs(device, &reclaimed);
227 	spin_unlock_irq(&device->resource->req_lock);
228 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229 		drbd_free_net_peer_req(device, peer_req);
230 }
231 
conn_reclaim_net_peer_reqs(struct drbd_connection * connection)232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234 	struct drbd_peer_device *peer_device;
235 	int vnr;
236 
237 	rcu_read_lock();
238 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239 		struct drbd_device *device = peer_device->device;
240 		if (!atomic_read(&device->pp_in_use_by_net))
241 			continue;
242 
243 		kref_get(&device->kref);
244 		rcu_read_unlock();
245 		drbd_reclaim_net_peer_reqs(device);
246 		kref_put(&device->kref, drbd_destroy_device);
247 		rcu_read_lock();
248 	}
249 	rcu_read_unlock();
250 }
251 
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:	DRBD device.
255  * @number:	number of pages requested
256  * @retry:	whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
drbd_alloc_pages(struct drbd_peer_device * peer_device,unsigned int number,bool retry)272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273 			      bool retry)
274 {
275 	struct drbd_device *device = peer_device->device;
276 	struct page *page = NULL;
277 	struct net_conf *nc;
278 	DEFINE_WAIT(wait);
279 	unsigned int mxb;
280 
281 	rcu_read_lock();
282 	nc = rcu_dereference(peer_device->connection->net_conf);
283 	mxb = nc ? nc->max_buffers : 1000000;
284 	rcu_read_unlock();
285 
286 	if (atomic_read(&device->pp_in_use) < mxb)
287 		page = __drbd_alloc_pages(device, number);
288 
289 	/* Try to keep the fast path fast, but occasionally we need
290 	 * to reclaim the pages we lended to the network stack. */
291 	if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292 		drbd_reclaim_net_peer_reqs(device);
293 
294 	while (page == NULL) {
295 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296 
297 		drbd_reclaim_net_peer_reqs(device);
298 
299 		if (atomic_read(&device->pp_in_use) < mxb) {
300 			page = __drbd_alloc_pages(device, number);
301 			if (page)
302 				break;
303 		}
304 
305 		if (!retry)
306 			break;
307 
308 		if (signal_pending(current)) {
309 			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310 			break;
311 		}
312 
313 		if (schedule_timeout(HZ/10) == 0)
314 			mxb = UINT_MAX;
315 	}
316 	finish_wait(&drbd_pp_wait, &wait);
317 
318 	if (page)
319 		atomic_add(number, &device->pp_in_use);
320 	return page;
321 }
322 
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
drbd_free_pages(struct drbd_device * device,struct page * page,int is_net)327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329 	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330 	int i;
331 
332 	if (page == NULL)
333 		return;
334 
335 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336 		i = page_chain_free(page);
337 	else {
338 		struct page *tmp;
339 		tmp = page_chain_tail(page, &i);
340 		spin_lock(&drbd_pp_lock);
341 		page_chain_add(&drbd_pp_pool, page, tmp);
342 		drbd_pp_vacant += i;
343 		spin_unlock(&drbd_pp_lock);
344 	}
345 	i = atomic_sub_return(i, a);
346 	if (i < 0)
347 		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348 			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349 	wake_up(&drbd_pp_wait);
350 }
351 
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355 
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365 
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
drbd_alloc_peer_req(struct drbd_peer_device * peer_device,u64 id,sector_t sector,unsigned int request_size,unsigned int payload_size,gfp_t gfp_mask)370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371 		    unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373 	struct drbd_device *device = peer_device->device;
374 	struct drbd_peer_request *peer_req;
375 	struct page *page = NULL;
376 	unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377 
378 	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379 		return NULL;
380 
381 	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382 	if (!peer_req) {
383 		if (!(gfp_mask & __GFP_NOWARN))
384 			drbd_err(device, "%s: allocation failed\n", __func__);
385 		return NULL;
386 	}
387 
388 	if (nr_pages) {
389 		page = drbd_alloc_pages(peer_device, nr_pages,
390 					gfpflags_allow_blocking(gfp_mask));
391 		if (!page)
392 			goto fail;
393 	}
394 
395 	memset(peer_req, 0, sizeof(*peer_req));
396 	INIT_LIST_HEAD(&peer_req->w.list);
397 	drbd_clear_interval(&peer_req->i);
398 	peer_req->i.size = request_size;
399 	peer_req->i.sector = sector;
400 	peer_req->submit_jif = jiffies;
401 	peer_req->peer_device = peer_device;
402 	peer_req->pages = page;
403 	/*
404 	 * The block_id is opaque to the receiver.  It is not endianness
405 	 * converted, and sent back to the sender unchanged.
406 	 */
407 	peer_req->block_id = id;
408 
409 	return peer_req;
410 
411  fail:
412 	mempool_free(peer_req, drbd_ee_mempool);
413 	return NULL;
414 }
415 
__drbd_free_peer_req(struct drbd_device * device,struct drbd_peer_request * peer_req,int is_net)416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417 		       int is_net)
418 {
419 	might_sleep();
420 	if (peer_req->flags & EE_HAS_DIGEST)
421 		kfree(peer_req->digest);
422 	drbd_free_pages(device, peer_req->pages, is_net);
423 	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425 	if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427 		drbd_al_complete_io(device, &peer_req->i);
428 	}
429 	mempool_free(peer_req, drbd_ee_mempool);
430 }
431 
drbd_free_peer_reqs(struct drbd_device * device,struct list_head * list)432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434 	LIST_HEAD(work_list);
435 	struct drbd_peer_request *peer_req, *t;
436 	int count = 0;
437 	int is_net = list == &device->net_ee;
438 
439 	spin_lock_irq(&device->resource->req_lock);
440 	list_splice_init(list, &work_list);
441 	spin_unlock_irq(&device->resource->req_lock);
442 
443 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444 		__drbd_free_peer_req(device, peer_req, is_net);
445 		count++;
446 	}
447 	return count;
448 }
449 
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
drbd_finish_peer_reqs(struct drbd_device * device)453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455 	LIST_HEAD(work_list);
456 	LIST_HEAD(reclaimed);
457 	struct drbd_peer_request *peer_req, *t;
458 	int err = 0;
459 
460 	spin_lock_irq(&device->resource->req_lock);
461 	reclaim_finished_net_peer_reqs(device, &reclaimed);
462 	list_splice_init(&device->done_ee, &work_list);
463 	spin_unlock_irq(&device->resource->req_lock);
464 
465 	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466 		drbd_free_net_peer_req(device, peer_req);
467 
468 	/* possible callbacks here:
469 	 * e_end_block, and e_end_resync_block, e_send_superseded.
470 	 * all ignore the last argument.
471 	 */
472 	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473 		int err2;
474 
475 		/* list_del not necessary, next/prev members not touched */
476 		err2 = peer_req->w.cb(&peer_req->w, !!err);
477 		if (!err)
478 			err = err2;
479 		drbd_free_peer_req(device, peer_req);
480 	}
481 	wake_up(&device->ee_wait);
482 
483 	return err;
484 }
485 
_drbd_wait_ee_list_empty(struct drbd_device * device,struct list_head * head)486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487 				     struct list_head *head)
488 {
489 	DEFINE_WAIT(wait);
490 
491 	/* avoids spin_lock/unlock
492 	 * and calling prepare_to_wait in the fast path */
493 	while (!list_empty(head)) {
494 		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495 		spin_unlock_irq(&device->resource->req_lock);
496 		io_schedule();
497 		finish_wait(&device->ee_wait, &wait);
498 		spin_lock_irq(&device->resource->req_lock);
499 	}
500 }
501 
drbd_wait_ee_list_empty(struct drbd_device * device,struct list_head * head)502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503 				    struct list_head *head)
504 {
505 	spin_lock_irq(&device->resource->req_lock);
506 	_drbd_wait_ee_list_empty(device, head);
507 	spin_unlock_irq(&device->resource->req_lock);
508 }
509 
drbd_recv_short(struct socket * sock,void * buf,size_t size,int flags)510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512 	struct kvec iov = {
513 		.iov_base = buf,
514 		.iov_len = size,
515 	};
516 	struct msghdr msg = {
517 		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518 	};
519 	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
520 }
521 
drbd_recv(struct drbd_connection * connection,void * buf,size_t size)522 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
523 {
524 	int rv;
525 
526 	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
527 
528 	if (rv < 0) {
529 		if (rv == -ECONNRESET)
530 			drbd_info(connection, "sock was reset by peer\n");
531 		else if (rv != -ERESTARTSYS)
532 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
533 	} else if (rv == 0) {
534 		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
535 			long t;
536 			rcu_read_lock();
537 			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
538 			rcu_read_unlock();
539 
540 			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
541 
542 			if (t)
543 				goto out;
544 		}
545 		drbd_info(connection, "sock was shut down by peer\n");
546 	}
547 
548 	if (rv != size)
549 		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
550 
551 out:
552 	return rv;
553 }
554 
drbd_recv_all(struct drbd_connection * connection,void * buf,size_t size)555 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
556 {
557 	int err;
558 
559 	err = drbd_recv(connection, buf, size);
560 	if (err != size) {
561 		if (err >= 0)
562 			err = -EIO;
563 	} else
564 		err = 0;
565 	return err;
566 }
567 
drbd_recv_all_warn(struct drbd_connection * connection,void * buf,size_t size)568 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
569 {
570 	int err;
571 
572 	err = drbd_recv_all(connection, buf, size);
573 	if (err && !signal_pending(current))
574 		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
575 	return err;
576 }
577 
578 /* quoting tcp(7):
579  *   On individual connections, the socket buffer size must be set prior to the
580  *   listen(2) or connect(2) calls in order to have it take effect.
581  * This is our wrapper to do so.
582  */
drbd_setbufsize(struct socket * sock,unsigned int snd,unsigned int rcv)583 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
584 		unsigned int rcv)
585 {
586 	/* open coded SO_SNDBUF, SO_RCVBUF */
587 	if (snd) {
588 		sock->sk->sk_sndbuf = snd;
589 		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
590 	}
591 	if (rcv) {
592 		sock->sk->sk_rcvbuf = rcv;
593 		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
594 	}
595 }
596 
drbd_try_connect(struct drbd_connection * connection)597 static struct socket *drbd_try_connect(struct drbd_connection *connection)
598 {
599 	const char *what;
600 	struct socket *sock;
601 	struct sockaddr_in6 src_in6;
602 	struct sockaddr_in6 peer_in6;
603 	struct net_conf *nc;
604 	int err, peer_addr_len, my_addr_len;
605 	int sndbuf_size, rcvbuf_size, connect_int;
606 	int disconnect_on_error = 1;
607 
608 	rcu_read_lock();
609 	nc = rcu_dereference(connection->net_conf);
610 	if (!nc) {
611 		rcu_read_unlock();
612 		return NULL;
613 	}
614 	sndbuf_size = nc->sndbuf_size;
615 	rcvbuf_size = nc->rcvbuf_size;
616 	connect_int = nc->connect_int;
617 	rcu_read_unlock();
618 
619 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
620 	memcpy(&src_in6, &connection->my_addr, my_addr_len);
621 
622 	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
623 		src_in6.sin6_port = 0;
624 	else
625 		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
626 
627 	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
628 	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
629 
630 	what = "sock_create_kern";
631 	err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
632 			       SOCK_STREAM, IPPROTO_TCP, &sock);
633 	if (err < 0) {
634 		sock = NULL;
635 		goto out;
636 	}
637 
638 	sock->sk->sk_rcvtimeo =
639 	sock->sk->sk_sndtimeo = connect_int * HZ;
640 	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
641 
642        /* explicitly bind to the configured IP as source IP
643 	*  for the outgoing connections.
644 	*  This is needed for multihomed hosts and to be
645 	*  able to use lo: interfaces for drbd.
646 	* Make sure to use 0 as port number, so linux selects
647 	*  a free one dynamically.
648 	*/
649 	what = "bind before connect";
650 	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
651 	if (err < 0)
652 		goto out;
653 
654 	/* connect may fail, peer not yet available.
655 	 * stay C_WF_CONNECTION, don't go Disconnecting! */
656 	disconnect_on_error = 0;
657 	what = "connect";
658 	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
659 
660 out:
661 	if (err < 0) {
662 		if (sock) {
663 			sock_release(sock);
664 			sock = NULL;
665 		}
666 		switch (-err) {
667 			/* timeout, busy, signal pending */
668 		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
669 		case EINTR: case ERESTARTSYS:
670 			/* peer not (yet) available, network problem */
671 		case ECONNREFUSED: case ENETUNREACH:
672 		case EHOSTDOWN:    case EHOSTUNREACH:
673 			disconnect_on_error = 0;
674 			break;
675 		default:
676 			drbd_err(connection, "%s failed, err = %d\n", what, err);
677 		}
678 		if (disconnect_on_error)
679 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
680 	}
681 
682 	return sock;
683 }
684 
685 struct accept_wait_data {
686 	struct drbd_connection *connection;
687 	struct socket *s_listen;
688 	struct completion door_bell;
689 	void (*original_sk_state_change)(struct sock *sk);
690 
691 };
692 
drbd_incoming_connection(struct sock * sk)693 static void drbd_incoming_connection(struct sock *sk)
694 {
695 	struct accept_wait_data *ad = sk->sk_user_data;
696 	void (*state_change)(struct sock *sk);
697 
698 	state_change = ad->original_sk_state_change;
699 	if (sk->sk_state == TCP_ESTABLISHED)
700 		complete(&ad->door_bell);
701 	state_change(sk);
702 }
703 
prepare_listen_socket(struct drbd_connection * connection,struct accept_wait_data * ad)704 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
705 {
706 	int err, sndbuf_size, rcvbuf_size, my_addr_len;
707 	struct sockaddr_in6 my_addr;
708 	struct socket *s_listen;
709 	struct net_conf *nc;
710 	const char *what;
711 
712 	rcu_read_lock();
713 	nc = rcu_dereference(connection->net_conf);
714 	if (!nc) {
715 		rcu_read_unlock();
716 		return -EIO;
717 	}
718 	sndbuf_size = nc->sndbuf_size;
719 	rcvbuf_size = nc->rcvbuf_size;
720 	rcu_read_unlock();
721 
722 	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
723 	memcpy(&my_addr, &connection->my_addr, my_addr_len);
724 
725 	what = "sock_create_kern";
726 	err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
727 			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
728 	if (err) {
729 		s_listen = NULL;
730 		goto out;
731 	}
732 
733 	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
734 	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
735 
736 	what = "bind before listen";
737 	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
738 	if (err < 0)
739 		goto out;
740 
741 	ad->s_listen = s_listen;
742 	write_lock_bh(&s_listen->sk->sk_callback_lock);
743 	ad->original_sk_state_change = s_listen->sk->sk_state_change;
744 	s_listen->sk->sk_state_change = drbd_incoming_connection;
745 	s_listen->sk->sk_user_data = ad;
746 	write_unlock_bh(&s_listen->sk->sk_callback_lock);
747 
748 	what = "listen";
749 	err = s_listen->ops->listen(s_listen, 5);
750 	if (err < 0)
751 		goto out;
752 
753 	return 0;
754 out:
755 	if (s_listen)
756 		sock_release(s_listen);
757 	if (err < 0) {
758 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
759 			drbd_err(connection, "%s failed, err = %d\n", what, err);
760 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
761 		}
762 	}
763 
764 	return -EIO;
765 }
766 
unregister_state_change(struct sock * sk,struct accept_wait_data * ad)767 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
768 {
769 	write_lock_bh(&sk->sk_callback_lock);
770 	sk->sk_state_change = ad->original_sk_state_change;
771 	sk->sk_user_data = NULL;
772 	write_unlock_bh(&sk->sk_callback_lock);
773 }
774 
drbd_wait_for_connect(struct drbd_connection * connection,struct accept_wait_data * ad)775 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
776 {
777 	int timeo, connect_int, err = 0;
778 	struct socket *s_estab = NULL;
779 	struct net_conf *nc;
780 
781 	rcu_read_lock();
782 	nc = rcu_dereference(connection->net_conf);
783 	if (!nc) {
784 		rcu_read_unlock();
785 		return NULL;
786 	}
787 	connect_int = nc->connect_int;
788 	rcu_read_unlock();
789 
790 	timeo = connect_int * HZ;
791 	/* 28.5% random jitter */
792 	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
793 
794 	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
795 	if (err <= 0)
796 		return NULL;
797 
798 	err = kernel_accept(ad->s_listen, &s_estab, 0);
799 	if (err < 0) {
800 		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
801 			drbd_err(connection, "accept failed, err = %d\n", err);
802 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
803 		}
804 	}
805 
806 	if (s_estab)
807 		unregister_state_change(s_estab->sk, ad);
808 
809 	return s_estab;
810 }
811 
812 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
813 
send_first_packet(struct drbd_connection * connection,struct drbd_socket * sock,enum drbd_packet cmd)814 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
815 			     enum drbd_packet cmd)
816 {
817 	if (!conn_prepare_command(connection, sock))
818 		return -EIO;
819 	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
820 }
821 
receive_first_packet(struct drbd_connection * connection,struct socket * sock)822 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
823 {
824 	unsigned int header_size = drbd_header_size(connection);
825 	struct packet_info pi;
826 	struct net_conf *nc;
827 	int err;
828 
829 	rcu_read_lock();
830 	nc = rcu_dereference(connection->net_conf);
831 	if (!nc) {
832 		rcu_read_unlock();
833 		return -EIO;
834 	}
835 	sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
836 	rcu_read_unlock();
837 
838 	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
839 	if (err != header_size) {
840 		if (err >= 0)
841 			err = -EIO;
842 		return err;
843 	}
844 	err = decode_header(connection, connection->data.rbuf, &pi);
845 	if (err)
846 		return err;
847 	return pi.cmd;
848 }
849 
850 /**
851  * drbd_socket_okay() - Free the socket if its connection is not okay
852  * @sock:	pointer to the pointer to the socket.
853  */
drbd_socket_okay(struct socket ** sock)854 static bool drbd_socket_okay(struct socket **sock)
855 {
856 	int rr;
857 	char tb[4];
858 
859 	if (!*sock)
860 		return false;
861 
862 	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
863 
864 	if (rr > 0 || rr == -EAGAIN) {
865 		return true;
866 	} else {
867 		sock_release(*sock);
868 		*sock = NULL;
869 		return false;
870 	}
871 }
872 
connection_established(struct drbd_connection * connection,struct socket ** sock1,struct socket ** sock2)873 static bool connection_established(struct drbd_connection *connection,
874 				   struct socket **sock1,
875 				   struct socket **sock2)
876 {
877 	struct net_conf *nc;
878 	int timeout;
879 	bool ok;
880 
881 	if (!*sock1 || !*sock2)
882 		return false;
883 
884 	rcu_read_lock();
885 	nc = rcu_dereference(connection->net_conf);
886 	timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
887 	rcu_read_unlock();
888 	schedule_timeout_interruptible(timeout);
889 
890 	ok = drbd_socket_okay(sock1);
891 	ok = drbd_socket_okay(sock2) && ok;
892 
893 	return ok;
894 }
895 
896 /* Gets called if a connection is established, or if a new minor gets created
897    in a connection */
drbd_connected(struct drbd_peer_device * peer_device)898 int drbd_connected(struct drbd_peer_device *peer_device)
899 {
900 	struct drbd_device *device = peer_device->device;
901 	int err;
902 
903 	atomic_set(&device->packet_seq, 0);
904 	device->peer_seq = 0;
905 
906 	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
907 		&peer_device->connection->cstate_mutex :
908 		&device->own_state_mutex;
909 
910 	err = drbd_send_sync_param(peer_device);
911 	if (!err)
912 		err = drbd_send_sizes(peer_device, 0, 0);
913 	if (!err)
914 		err = drbd_send_uuids(peer_device);
915 	if (!err)
916 		err = drbd_send_current_state(peer_device);
917 	clear_bit(USE_DEGR_WFC_T, &device->flags);
918 	clear_bit(RESIZE_PENDING, &device->flags);
919 	atomic_set(&device->ap_in_flight, 0);
920 	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
921 	return err;
922 }
923 
924 /*
925  * return values:
926  *   1 yes, we have a valid connection
927  *   0 oops, did not work out, please try again
928  *  -1 peer talks different language,
929  *     no point in trying again, please go standalone.
930  *  -2 We do not have a network config...
931  */
conn_connect(struct drbd_connection * connection)932 static int conn_connect(struct drbd_connection *connection)
933 {
934 	struct drbd_socket sock, msock;
935 	struct drbd_peer_device *peer_device;
936 	struct net_conf *nc;
937 	int vnr, timeout, h;
938 	bool discard_my_data, ok;
939 	enum drbd_state_rv rv;
940 	struct accept_wait_data ad = {
941 		.connection = connection,
942 		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
943 	};
944 
945 	clear_bit(DISCONNECT_SENT, &connection->flags);
946 	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
947 		return -2;
948 
949 	mutex_init(&sock.mutex);
950 	sock.sbuf = connection->data.sbuf;
951 	sock.rbuf = connection->data.rbuf;
952 	sock.socket = NULL;
953 	mutex_init(&msock.mutex);
954 	msock.sbuf = connection->meta.sbuf;
955 	msock.rbuf = connection->meta.rbuf;
956 	msock.socket = NULL;
957 
958 	/* Assume that the peer only understands protocol 80 until we know better.  */
959 	connection->agreed_pro_version = 80;
960 
961 	if (prepare_listen_socket(connection, &ad))
962 		return 0;
963 
964 	do {
965 		struct socket *s;
966 
967 		s = drbd_try_connect(connection);
968 		if (s) {
969 			if (!sock.socket) {
970 				sock.socket = s;
971 				send_first_packet(connection, &sock, P_INITIAL_DATA);
972 			} else if (!msock.socket) {
973 				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
974 				msock.socket = s;
975 				send_first_packet(connection, &msock, P_INITIAL_META);
976 			} else {
977 				drbd_err(connection, "Logic error in conn_connect()\n");
978 				goto out_release_sockets;
979 			}
980 		}
981 
982 		if (connection_established(connection, &sock.socket, &msock.socket))
983 			break;
984 
985 retry:
986 		s = drbd_wait_for_connect(connection, &ad);
987 		if (s) {
988 			int fp = receive_first_packet(connection, s);
989 			drbd_socket_okay(&sock.socket);
990 			drbd_socket_okay(&msock.socket);
991 			switch (fp) {
992 			case P_INITIAL_DATA:
993 				if (sock.socket) {
994 					drbd_warn(connection, "initial packet S crossed\n");
995 					sock_release(sock.socket);
996 					sock.socket = s;
997 					goto randomize;
998 				}
999 				sock.socket = s;
1000 				break;
1001 			case P_INITIAL_META:
1002 				set_bit(RESOLVE_CONFLICTS, &connection->flags);
1003 				if (msock.socket) {
1004 					drbd_warn(connection, "initial packet M crossed\n");
1005 					sock_release(msock.socket);
1006 					msock.socket = s;
1007 					goto randomize;
1008 				}
1009 				msock.socket = s;
1010 				break;
1011 			default:
1012 				drbd_warn(connection, "Error receiving initial packet\n");
1013 				sock_release(s);
1014 randomize:
1015 				if (prandom_u32() & 1)
1016 					goto retry;
1017 			}
1018 		}
1019 
1020 		if (connection->cstate <= C_DISCONNECTING)
1021 			goto out_release_sockets;
1022 		if (signal_pending(current)) {
1023 			flush_signals(current);
1024 			smp_rmb();
1025 			if (get_t_state(&connection->receiver) == EXITING)
1026 				goto out_release_sockets;
1027 		}
1028 
1029 		ok = connection_established(connection, &sock.socket, &msock.socket);
1030 	} while (!ok);
1031 
1032 	if (ad.s_listen)
1033 		sock_release(ad.s_listen);
1034 
1035 	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1036 	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037 
1038 	sock.socket->sk->sk_allocation = GFP_NOIO;
1039 	msock.socket->sk->sk_allocation = GFP_NOIO;
1040 
1041 	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1042 	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1043 
1044 	/* NOT YET ...
1045 	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1046 	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1047 	 * first set it to the P_CONNECTION_FEATURES timeout,
1048 	 * which we set to 4x the configured ping_timeout. */
1049 	rcu_read_lock();
1050 	nc = rcu_dereference(connection->net_conf);
1051 
1052 	sock.socket->sk->sk_sndtimeo =
1053 	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1054 
1055 	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1056 	timeout = nc->timeout * HZ / 10;
1057 	discard_my_data = nc->discard_my_data;
1058 	rcu_read_unlock();
1059 
1060 	msock.socket->sk->sk_sndtimeo = timeout;
1061 
1062 	/* we don't want delays.
1063 	 * we use TCP_CORK where appropriate, though */
1064 	drbd_tcp_nodelay(sock.socket);
1065 	drbd_tcp_nodelay(msock.socket);
1066 
1067 	connection->data.socket = sock.socket;
1068 	connection->meta.socket = msock.socket;
1069 	connection->last_received = jiffies;
1070 
1071 	h = drbd_do_features(connection);
1072 	if (h <= 0)
1073 		return h;
1074 
1075 	if (connection->cram_hmac_tfm) {
1076 		/* drbd_request_state(device, NS(conn, WFAuth)); */
1077 		switch (drbd_do_auth(connection)) {
1078 		case -1:
1079 			drbd_err(connection, "Authentication of peer failed\n");
1080 			return -1;
1081 		case 0:
1082 			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1083 			return 0;
1084 		}
1085 	}
1086 
1087 	connection->data.socket->sk->sk_sndtimeo = timeout;
1088 	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1089 
1090 	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1091 		return -1;
1092 
1093 	/* Prevent a race between resync-handshake and
1094 	 * being promoted to Primary.
1095 	 *
1096 	 * Grab and release the state mutex, so we know that any current
1097 	 * drbd_set_role() is finished, and any incoming drbd_set_role
1098 	 * will see the STATE_SENT flag, and wait for it to be cleared.
1099 	 */
1100 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1101 		mutex_lock(peer_device->device->state_mutex);
1102 
1103 	/* avoid a race with conn_request_state( C_DISCONNECTING ) */
1104 	spin_lock_irq(&connection->resource->req_lock);
1105 	set_bit(STATE_SENT, &connection->flags);
1106 	spin_unlock_irq(&connection->resource->req_lock);
1107 
1108 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1109 		mutex_unlock(peer_device->device->state_mutex);
1110 
1111 	rcu_read_lock();
1112 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1113 		struct drbd_device *device = peer_device->device;
1114 		kref_get(&device->kref);
1115 		rcu_read_unlock();
1116 
1117 		if (discard_my_data)
1118 			set_bit(DISCARD_MY_DATA, &device->flags);
1119 		else
1120 			clear_bit(DISCARD_MY_DATA, &device->flags);
1121 
1122 		drbd_connected(peer_device);
1123 		kref_put(&device->kref, drbd_destroy_device);
1124 		rcu_read_lock();
1125 	}
1126 	rcu_read_unlock();
1127 
1128 	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1129 	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1130 		clear_bit(STATE_SENT, &connection->flags);
1131 		return 0;
1132 	}
1133 
1134 	drbd_thread_start(&connection->ack_receiver);
1135 	/* opencoded create_singlethread_workqueue(),
1136 	 * to be able to use format string arguments */
1137 	connection->ack_sender =
1138 		alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1139 	if (!connection->ack_sender) {
1140 		drbd_err(connection, "Failed to create workqueue ack_sender\n");
1141 		return 0;
1142 	}
1143 
1144 	mutex_lock(&connection->resource->conf_update);
1145 	/* The discard_my_data flag is a single-shot modifier to the next
1146 	 * connection attempt, the handshake of which is now well underway.
1147 	 * No need for rcu style copying of the whole struct
1148 	 * just to clear a single value. */
1149 	connection->net_conf->discard_my_data = 0;
1150 	mutex_unlock(&connection->resource->conf_update);
1151 
1152 	return h;
1153 
1154 out_release_sockets:
1155 	if (ad.s_listen)
1156 		sock_release(ad.s_listen);
1157 	if (sock.socket)
1158 		sock_release(sock.socket);
1159 	if (msock.socket)
1160 		sock_release(msock.socket);
1161 	return -1;
1162 }
1163 
decode_header(struct drbd_connection * connection,void * header,struct packet_info * pi)1164 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1165 {
1166 	unsigned int header_size = drbd_header_size(connection);
1167 
1168 	if (header_size == sizeof(struct p_header100) &&
1169 	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1170 		struct p_header100 *h = header;
1171 		if (h->pad != 0) {
1172 			drbd_err(connection, "Header padding is not zero\n");
1173 			return -EINVAL;
1174 		}
1175 		pi->vnr = be16_to_cpu(h->volume);
1176 		pi->cmd = be16_to_cpu(h->command);
1177 		pi->size = be32_to_cpu(h->length);
1178 	} else if (header_size == sizeof(struct p_header95) &&
1179 		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1180 		struct p_header95 *h = header;
1181 		pi->cmd = be16_to_cpu(h->command);
1182 		pi->size = be32_to_cpu(h->length);
1183 		pi->vnr = 0;
1184 	} else if (header_size == sizeof(struct p_header80) &&
1185 		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1186 		struct p_header80 *h = header;
1187 		pi->cmd = be16_to_cpu(h->command);
1188 		pi->size = be16_to_cpu(h->length);
1189 		pi->vnr = 0;
1190 	} else {
1191 		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1192 			 be32_to_cpu(*(__be32 *)header),
1193 			 connection->agreed_pro_version);
1194 		return -EINVAL;
1195 	}
1196 	pi->data = header + header_size;
1197 	return 0;
1198 }
1199 
drbd_unplug_all_devices(struct drbd_connection * connection)1200 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1201 {
1202 	if (current->plug == &connection->receiver_plug) {
1203 		blk_finish_plug(&connection->receiver_plug);
1204 		blk_start_plug(&connection->receiver_plug);
1205 	} /* else: maybe just schedule() ?? */
1206 }
1207 
drbd_recv_header(struct drbd_connection * connection,struct packet_info * pi)1208 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1209 {
1210 	void *buffer = connection->data.rbuf;
1211 	int err;
1212 
1213 	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1214 	if (err)
1215 		return err;
1216 
1217 	err = decode_header(connection, buffer, pi);
1218 	connection->last_received = jiffies;
1219 
1220 	return err;
1221 }
1222 
drbd_recv_header_maybe_unplug(struct drbd_connection * connection,struct packet_info * pi)1223 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1224 {
1225 	void *buffer = connection->data.rbuf;
1226 	unsigned int size = drbd_header_size(connection);
1227 	int err;
1228 
1229 	err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1230 	if (err != size) {
1231 		/* If we have nothing in the receive buffer now, to reduce
1232 		 * application latency, try to drain the backend queues as
1233 		 * quickly as possible, and let remote TCP know what we have
1234 		 * received so far. */
1235 		if (err == -EAGAIN) {
1236 			drbd_tcp_quickack(connection->data.socket);
1237 			drbd_unplug_all_devices(connection);
1238 		}
1239 		if (err > 0) {
1240 			buffer += err;
1241 			size -= err;
1242 		}
1243 		err = drbd_recv_all_warn(connection, buffer, size);
1244 		if (err)
1245 			return err;
1246 	}
1247 
1248 	err = decode_header(connection, connection->data.rbuf, pi);
1249 	connection->last_received = jiffies;
1250 
1251 	return err;
1252 }
1253 /* This is blkdev_issue_flush, but asynchronous.
1254  * We want to submit to all component volumes in parallel,
1255  * then wait for all completions.
1256  */
1257 struct issue_flush_context {
1258 	atomic_t pending;
1259 	int error;
1260 	struct completion done;
1261 };
1262 struct one_flush_context {
1263 	struct drbd_device *device;
1264 	struct issue_flush_context *ctx;
1265 };
1266 
one_flush_endio(struct bio * bio)1267 static void one_flush_endio(struct bio *bio)
1268 {
1269 	struct one_flush_context *octx = bio->bi_private;
1270 	struct drbd_device *device = octx->device;
1271 	struct issue_flush_context *ctx = octx->ctx;
1272 
1273 	if (bio->bi_status) {
1274 		ctx->error = blk_status_to_errno(bio->bi_status);
1275 		drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1276 	}
1277 	kfree(octx);
1278 	bio_put(bio);
1279 
1280 	clear_bit(FLUSH_PENDING, &device->flags);
1281 	put_ldev(device);
1282 	kref_put(&device->kref, drbd_destroy_device);
1283 
1284 	if (atomic_dec_and_test(&ctx->pending))
1285 		complete(&ctx->done);
1286 }
1287 
submit_one_flush(struct drbd_device * device,struct issue_flush_context * ctx)1288 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1289 {
1290 	struct bio *bio = bio_alloc(GFP_NOIO, 0);
1291 	struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1292 	if (!bio || !octx) {
1293 		drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1294 		/* FIXME: what else can I do now?  disconnecting or detaching
1295 		 * really does not help to improve the state of the world, either.
1296 		 */
1297 		kfree(octx);
1298 		if (bio)
1299 			bio_put(bio);
1300 
1301 		ctx->error = -ENOMEM;
1302 		put_ldev(device);
1303 		kref_put(&device->kref, drbd_destroy_device);
1304 		return;
1305 	}
1306 
1307 	octx->device = device;
1308 	octx->ctx = ctx;
1309 	bio_set_dev(bio, device->ldev->backing_bdev);
1310 	bio->bi_private = octx;
1311 	bio->bi_end_io = one_flush_endio;
1312 	bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1313 
1314 	device->flush_jif = jiffies;
1315 	set_bit(FLUSH_PENDING, &device->flags);
1316 	atomic_inc(&ctx->pending);
1317 	submit_bio(bio);
1318 }
1319 
drbd_flush(struct drbd_connection * connection)1320 static void drbd_flush(struct drbd_connection *connection)
1321 {
1322 	if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1323 		struct drbd_peer_device *peer_device;
1324 		struct issue_flush_context ctx;
1325 		int vnr;
1326 
1327 		atomic_set(&ctx.pending, 1);
1328 		ctx.error = 0;
1329 		init_completion(&ctx.done);
1330 
1331 		rcu_read_lock();
1332 		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1333 			struct drbd_device *device = peer_device->device;
1334 
1335 			if (!get_ldev(device))
1336 				continue;
1337 			kref_get(&device->kref);
1338 			rcu_read_unlock();
1339 
1340 			submit_one_flush(device, &ctx);
1341 
1342 			rcu_read_lock();
1343 		}
1344 		rcu_read_unlock();
1345 
1346 		/* Do we want to add a timeout,
1347 		 * if disk-timeout is set? */
1348 		if (!atomic_dec_and_test(&ctx.pending))
1349 			wait_for_completion(&ctx.done);
1350 
1351 		if (ctx.error) {
1352 			/* would rather check on EOPNOTSUPP, but that is not reliable.
1353 			 * don't try again for ANY return value != 0
1354 			 * if (rv == -EOPNOTSUPP) */
1355 			/* Any error is already reported by bio_endio callback. */
1356 			drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1357 		}
1358 	}
1359 }
1360 
1361 /**
1362  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1363  * @device:	DRBD device.
1364  * @epoch:	Epoch object.
1365  * @ev:		Epoch event.
1366  */
drbd_may_finish_epoch(struct drbd_connection * connection,struct drbd_epoch * epoch,enum epoch_event ev)1367 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1368 					       struct drbd_epoch *epoch,
1369 					       enum epoch_event ev)
1370 {
1371 	int epoch_size;
1372 	struct drbd_epoch *next_epoch;
1373 	enum finish_epoch rv = FE_STILL_LIVE;
1374 
1375 	spin_lock(&connection->epoch_lock);
1376 	do {
1377 		next_epoch = NULL;
1378 
1379 		epoch_size = atomic_read(&epoch->epoch_size);
1380 
1381 		switch (ev & ~EV_CLEANUP) {
1382 		case EV_PUT:
1383 			atomic_dec(&epoch->active);
1384 			break;
1385 		case EV_GOT_BARRIER_NR:
1386 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1387 			break;
1388 		case EV_BECAME_LAST:
1389 			/* nothing to do*/
1390 			break;
1391 		}
1392 
1393 		if (epoch_size != 0 &&
1394 		    atomic_read(&epoch->active) == 0 &&
1395 		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1396 			if (!(ev & EV_CLEANUP)) {
1397 				spin_unlock(&connection->epoch_lock);
1398 				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1399 				spin_lock(&connection->epoch_lock);
1400 			}
1401 #if 0
1402 			/* FIXME: dec unacked on connection, once we have
1403 			 * something to count pending connection packets in. */
1404 			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1405 				dec_unacked(epoch->connection);
1406 #endif
1407 
1408 			if (connection->current_epoch != epoch) {
1409 				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1410 				list_del(&epoch->list);
1411 				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1412 				connection->epochs--;
1413 				kfree(epoch);
1414 
1415 				if (rv == FE_STILL_LIVE)
1416 					rv = FE_DESTROYED;
1417 			} else {
1418 				epoch->flags = 0;
1419 				atomic_set(&epoch->epoch_size, 0);
1420 				/* atomic_set(&epoch->active, 0); is already zero */
1421 				if (rv == FE_STILL_LIVE)
1422 					rv = FE_RECYCLED;
1423 			}
1424 		}
1425 
1426 		if (!next_epoch)
1427 			break;
1428 
1429 		epoch = next_epoch;
1430 	} while (1);
1431 
1432 	spin_unlock(&connection->epoch_lock);
1433 
1434 	return rv;
1435 }
1436 
1437 static enum write_ordering_e
max_allowed_wo(struct drbd_backing_dev * bdev,enum write_ordering_e wo)1438 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1439 {
1440 	struct disk_conf *dc;
1441 
1442 	dc = rcu_dereference(bdev->disk_conf);
1443 
1444 	if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1445 		wo = WO_DRAIN_IO;
1446 	if (wo == WO_DRAIN_IO && !dc->disk_drain)
1447 		wo = WO_NONE;
1448 
1449 	return wo;
1450 }
1451 
1452 /**
1453  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1454  * @connection:	DRBD connection.
1455  * @wo:		Write ordering method to try.
1456  */
drbd_bump_write_ordering(struct drbd_resource * resource,struct drbd_backing_dev * bdev,enum write_ordering_e wo)1457 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1458 			      enum write_ordering_e wo)
1459 {
1460 	struct drbd_device *device;
1461 	enum write_ordering_e pwo;
1462 	int vnr;
1463 	static char *write_ordering_str[] = {
1464 		[WO_NONE] = "none",
1465 		[WO_DRAIN_IO] = "drain",
1466 		[WO_BDEV_FLUSH] = "flush",
1467 	};
1468 
1469 	pwo = resource->write_ordering;
1470 	if (wo != WO_BDEV_FLUSH)
1471 		wo = min(pwo, wo);
1472 	rcu_read_lock();
1473 	idr_for_each_entry(&resource->devices, device, vnr) {
1474 		if (get_ldev(device)) {
1475 			wo = max_allowed_wo(device->ldev, wo);
1476 			if (device->ldev == bdev)
1477 				bdev = NULL;
1478 			put_ldev(device);
1479 		}
1480 	}
1481 
1482 	if (bdev)
1483 		wo = max_allowed_wo(bdev, wo);
1484 
1485 	rcu_read_unlock();
1486 
1487 	resource->write_ordering = wo;
1488 	if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1489 		drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1490 }
1491 
drbd_issue_peer_discard(struct drbd_device * device,struct drbd_peer_request * peer_req)1492 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1493 {
1494 	struct block_device *bdev = device->ldev->backing_bdev;
1495 
1496 	if (blkdev_issue_zeroout(bdev, peer_req->i.sector, peer_req->i.size >> 9,
1497 			GFP_NOIO, 0))
1498 		peer_req->flags |= EE_WAS_ERROR;
1499 
1500 	drbd_endio_write_sec_final(peer_req);
1501 }
1502 
drbd_issue_peer_wsame(struct drbd_device * device,struct drbd_peer_request * peer_req)1503 static void drbd_issue_peer_wsame(struct drbd_device *device,
1504 				  struct drbd_peer_request *peer_req)
1505 {
1506 	struct block_device *bdev = device->ldev->backing_bdev;
1507 	sector_t s = peer_req->i.sector;
1508 	sector_t nr = peer_req->i.size >> 9;
1509 	if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1510 		peer_req->flags |= EE_WAS_ERROR;
1511 	drbd_endio_write_sec_final(peer_req);
1512 }
1513 
1514 
1515 /**
1516  * drbd_submit_peer_request()
1517  * @device:	DRBD device.
1518  * @peer_req:	peer request
1519  * @rw:		flag field, see bio->bi_opf
1520  *
1521  * May spread the pages to multiple bios,
1522  * depending on bio_add_page restrictions.
1523  *
1524  * Returns 0 if all bios have been submitted,
1525  * -ENOMEM if we could not allocate enough bios,
1526  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1527  *  single page to an empty bio (which should never happen and likely indicates
1528  *  that the lower level IO stack is in some way broken). This has been observed
1529  *  on certain Xen deployments.
1530  */
1531 /* TODO allocate from our own bio_set. */
drbd_submit_peer_request(struct drbd_device * device,struct drbd_peer_request * peer_req,const unsigned op,const unsigned op_flags,const int fault_type)1532 int drbd_submit_peer_request(struct drbd_device *device,
1533 			     struct drbd_peer_request *peer_req,
1534 			     const unsigned op, const unsigned op_flags,
1535 			     const int fault_type)
1536 {
1537 	struct bio *bios = NULL;
1538 	struct bio *bio;
1539 	struct page *page = peer_req->pages;
1540 	sector_t sector = peer_req->i.sector;
1541 	unsigned data_size = peer_req->i.size;
1542 	unsigned n_bios = 0;
1543 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1544 	int err = -ENOMEM;
1545 
1546 	/* TRIM/DISCARD: for now, always use the helper function
1547 	 * blkdev_issue_zeroout(..., discard=true).
1548 	 * It's synchronous, but it does the right thing wrt. bio splitting.
1549 	 * Correctness first, performance later.  Next step is to code an
1550 	 * asynchronous variant of the same.
1551 	 */
1552 	if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1553 		/* wait for all pending IO completions, before we start
1554 		 * zeroing things out. */
1555 		conn_wait_active_ee_empty(peer_req->peer_device->connection);
1556 		/* add it to the active list now,
1557 		 * so we can find it to present it in debugfs */
1558 		peer_req->submit_jif = jiffies;
1559 		peer_req->flags |= EE_SUBMITTED;
1560 
1561 		/* If this was a resync request from receive_rs_deallocated(),
1562 		 * it is already on the sync_ee list */
1563 		if (list_empty(&peer_req->w.list)) {
1564 			spin_lock_irq(&device->resource->req_lock);
1565 			list_add_tail(&peer_req->w.list, &device->active_ee);
1566 			spin_unlock_irq(&device->resource->req_lock);
1567 		}
1568 
1569 		if (peer_req->flags & EE_IS_TRIM)
1570 			drbd_issue_peer_discard(device, peer_req);
1571 		else /* EE_WRITE_SAME */
1572 			drbd_issue_peer_wsame(device, peer_req);
1573 		return 0;
1574 	}
1575 
1576 	/* In most cases, we will only need one bio.  But in case the lower
1577 	 * level restrictions happen to be different at this offset on this
1578 	 * side than those of the sending peer, we may need to submit the
1579 	 * request in more than one bio.
1580 	 *
1581 	 * Plain bio_alloc is good enough here, this is no DRBD internally
1582 	 * generated bio, but a bio allocated on behalf of the peer.
1583 	 */
1584 next_bio:
1585 	bio = bio_alloc(GFP_NOIO, nr_pages);
1586 	if (!bio) {
1587 		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1588 		goto fail;
1589 	}
1590 	/* > peer_req->i.sector, unless this is the first bio */
1591 	bio->bi_iter.bi_sector = sector;
1592 	bio_set_dev(bio, device->ldev->backing_bdev);
1593 	bio_set_op_attrs(bio, op, op_flags);
1594 	bio->bi_private = peer_req;
1595 	bio->bi_end_io = drbd_peer_request_endio;
1596 
1597 	bio->bi_next = bios;
1598 	bios = bio;
1599 	++n_bios;
1600 
1601 	page_chain_for_each(page) {
1602 		unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1603 		if (!bio_add_page(bio, page, len, 0))
1604 			goto next_bio;
1605 		data_size -= len;
1606 		sector += len >> 9;
1607 		--nr_pages;
1608 	}
1609 	D_ASSERT(device, data_size == 0);
1610 	D_ASSERT(device, page == NULL);
1611 
1612 	atomic_set(&peer_req->pending_bios, n_bios);
1613 	/* for debugfs: update timestamp, mark as submitted */
1614 	peer_req->submit_jif = jiffies;
1615 	peer_req->flags |= EE_SUBMITTED;
1616 	do {
1617 		bio = bios;
1618 		bios = bios->bi_next;
1619 		bio->bi_next = NULL;
1620 
1621 		drbd_generic_make_request(device, fault_type, bio);
1622 	} while (bios);
1623 	return 0;
1624 
1625 fail:
1626 	while (bios) {
1627 		bio = bios;
1628 		bios = bios->bi_next;
1629 		bio_put(bio);
1630 	}
1631 	return err;
1632 }
1633 
drbd_remove_epoch_entry_interval(struct drbd_device * device,struct drbd_peer_request * peer_req)1634 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1635 					     struct drbd_peer_request *peer_req)
1636 {
1637 	struct drbd_interval *i = &peer_req->i;
1638 
1639 	drbd_remove_interval(&device->write_requests, i);
1640 	drbd_clear_interval(i);
1641 
1642 	/* Wake up any processes waiting for this peer request to complete.  */
1643 	if (i->waiting)
1644 		wake_up(&device->misc_wait);
1645 }
1646 
conn_wait_active_ee_empty(struct drbd_connection * connection)1647 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1648 {
1649 	struct drbd_peer_device *peer_device;
1650 	int vnr;
1651 
1652 	rcu_read_lock();
1653 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1654 		struct drbd_device *device = peer_device->device;
1655 
1656 		kref_get(&device->kref);
1657 		rcu_read_unlock();
1658 		drbd_wait_ee_list_empty(device, &device->active_ee);
1659 		kref_put(&device->kref, drbd_destroy_device);
1660 		rcu_read_lock();
1661 	}
1662 	rcu_read_unlock();
1663 }
1664 
receive_Barrier(struct drbd_connection * connection,struct packet_info * pi)1665 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1666 {
1667 	int rv;
1668 	struct p_barrier *p = pi->data;
1669 	struct drbd_epoch *epoch;
1670 
1671 	/* FIXME these are unacked on connection,
1672 	 * not a specific (peer)device.
1673 	 */
1674 	connection->current_epoch->barrier_nr = p->barrier;
1675 	connection->current_epoch->connection = connection;
1676 	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1677 
1678 	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1679 	 * the activity log, which means it would not be resynced in case the
1680 	 * R_PRIMARY crashes now.
1681 	 * Therefore we must send the barrier_ack after the barrier request was
1682 	 * completed. */
1683 	switch (connection->resource->write_ordering) {
1684 	case WO_NONE:
1685 		if (rv == FE_RECYCLED)
1686 			return 0;
1687 
1688 		/* receiver context, in the writeout path of the other node.
1689 		 * avoid potential distributed deadlock */
1690 		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1691 		if (epoch)
1692 			break;
1693 		else
1694 			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1695 			/* Fall through */
1696 
1697 	case WO_BDEV_FLUSH:
1698 	case WO_DRAIN_IO:
1699 		conn_wait_active_ee_empty(connection);
1700 		drbd_flush(connection);
1701 
1702 		if (atomic_read(&connection->current_epoch->epoch_size)) {
1703 			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1704 			if (epoch)
1705 				break;
1706 		}
1707 
1708 		return 0;
1709 	default:
1710 		drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1711 			 connection->resource->write_ordering);
1712 		return -EIO;
1713 	}
1714 
1715 	epoch->flags = 0;
1716 	atomic_set(&epoch->epoch_size, 0);
1717 	atomic_set(&epoch->active, 0);
1718 
1719 	spin_lock(&connection->epoch_lock);
1720 	if (atomic_read(&connection->current_epoch->epoch_size)) {
1721 		list_add(&epoch->list, &connection->current_epoch->list);
1722 		connection->current_epoch = epoch;
1723 		connection->epochs++;
1724 	} else {
1725 		/* The current_epoch got recycled while we allocated this one... */
1726 		kfree(epoch);
1727 	}
1728 	spin_unlock(&connection->epoch_lock);
1729 
1730 	return 0;
1731 }
1732 
1733 /* quick wrapper in case payload size != request_size (write same) */
drbd_csum_ee_size(struct crypto_ahash * h,struct drbd_peer_request * r,void * d,unsigned int payload_size)1734 static void drbd_csum_ee_size(struct crypto_ahash *h,
1735 			      struct drbd_peer_request *r, void *d,
1736 			      unsigned int payload_size)
1737 {
1738 	unsigned int tmp = r->i.size;
1739 	r->i.size = payload_size;
1740 	drbd_csum_ee(h, r, d);
1741 	r->i.size = tmp;
1742 }
1743 
1744 /* used from receive_RSDataReply (recv_resync_read)
1745  * and from receive_Data.
1746  * data_size: actual payload ("data in")
1747  * 	for normal writes that is bi_size.
1748  * 	for discards, that is zero.
1749  * 	for write same, it is logical_block_size.
1750  * both trim and write same have the bi_size ("data len to be affected")
1751  * as extra argument in the packet header.
1752  */
1753 static struct drbd_peer_request *
read_in_block(struct drbd_peer_device * peer_device,u64 id,sector_t sector,struct packet_info * pi)1754 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1755 	      struct packet_info *pi) __must_hold(local)
1756 {
1757 	struct drbd_device *device = peer_device->device;
1758 	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1759 	struct drbd_peer_request *peer_req;
1760 	struct page *page;
1761 	int digest_size, err;
1762 	unsigned int data_size = pi->size, ds;
1763 	void *dig_in = peer_device->connection->int_dig_in;
1764 	void *dig_vv = peer_device->connection->int_dig_vv;
1765 	unsigned long *data;
1766 	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1767 	struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1768 
1769 	digest_size = 0;
1770 	if (!trim && peer_device->connection->peer_integrity_tfm) {
1771 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1772 		/*
1773 		 * FIXME: Receive the incoming digest into the receive buffer
1774 		 *	  here, together with its struct p_data?
1775 		 */
1776 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1777 		if (err)
1778 			return NULL;
1779 		data_size -= digest_size;
1780 	}
1781 
1782 	/* assume request_size == data_size, but special case trim and wsame. */
1783 	ds = data_size;
1784 	if (trim) {
1785 		if (!expect(data_size == 0))
1786 			return NULL;
1787 		ds = be32_to_cpu(trim->size);
1788 	} else if (wsame) {
1789 		if (data_size != queue_logical_block_size(device->rq_queue)) {
1790 			drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1791 				data_size, queue_logical_block_size(device->rq_queue));
1792 			return NULL;
1793 		}
1794 		if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1795 			drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1796 				data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1797 			return NULL;
1798 		}
1799 		ds = be32_to_cpu(wsame->size);
1800 	}
1801 
1802 	if (!expect(IS_ALIGNED(ds, 512)))
1803 		return NULL;
1804 	if (trim || wsame) {
1805 		if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1806 			return NULL;
1807 	} else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1808 		return NULL;
1809 
1810 	/* even though we trust out peer,
1811 	 * we sometimes have to double check. */
1812 	if (sector + (ds>>9) > capacity) {
1813 		drbd_err(device, "request from peer beyond end of local disk: "
1814 			"capacity: %llus < sector: %llus + size: %u\n",
1815 			(unsigned long long)capacity,
1816 			(unsigned long long)sector, ds);
1817 		return NULL;
1818 	}
1819 
1820 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1821 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
1822 	 * which in turn might block on the other node at this very place.  */
1823 	peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1824 	if (!peer_req)
1825 		return NULL;
1826 
1827 	peer_req->flags |= EE_WRITE;
1828 	if (trim) {
1829 		peer_req->flags |= EE_IS_TRIM;
1830 		return peer_req;
1831 	}
1832 	if (wsame)
1833 		peer_req->flags |= EE_WRITE_SAME;
1834 
1835 	/* receive payload size bytes into page chain */
1836 	ds = data_size;
1837 	page = peer_req->pages;
1838 	page_chain_for_each(page) {
1839 		unsigned len = min_t(int, ds, PAGE_SIZE);
1840 		data = kmap(page);
1841 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1842 		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1843 			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1844 			data[0] = data[0] ^ (unsigned long)-1;
1845 		}
1846 		kunmap(page);
1847 		if (err) {
1848 			drbd_free_peer_req(device, peer_req);
1849 			return NULL;
1850 		}
1851 		ds -= len;
1852 	}
1853 
1854 	if (digest_size) {
1855 		drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1856 		if (memcmp(dig_in, dig_vv, digest_size)) {
1857 			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1858 				(unsigned long long)sector, data_size);
1859 			drbd_free_peer_req(device, peer_req);
1860 			return NULL;
1861 		}
1862 	}
1863 	device->recv_cnt += data_size >> 9;
1864 	return peer_req;
1865 }
1866 
1867 /* drbd_drain_block() just takes a data block
1868  * out of the socket input buffer, and discards it.
1869  */
drbd_drain_block(struct drbd_peer_device * peer_device,int data_size)1870 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1871 {
1872 	struct page *page;
1873 	int err = 0;
1874 	void *data;
1875 
1876 	if (!data_size)
1877 		return 0;
1878 
1879 	page = drbd_alloc_pages(peer_device, 1, 1);
1880 
1881 	data = kmap(page);
1882 	while (data_size) {
1883 		unsigned int len = min_t(int, data_size, PAGE_SIZE);
1884 
1885 		err = drbd_recv_all_warn(peer_device->connection, data, len);
1886 		if (err)
1887 			break;
1888 		data_size -= len;
1889 	}
1890 	kunmap(page);
1891 	drbd_free_pages(peer_device->device, page, 0);
1892 	return err;
1893 }
1894 
recv_dless_read(struct drbd_peer_device * peer_device,struct drbd_request * req,sector_t sector,int data_size)1895 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1896 			   sector_t sector, int data_size)
1897 {
1898 	struct bio_vec bvec;
1899 	struct bvec_iter iter;
1900 	struct bio *bio;
1901 	int digest_size, err, expect;
1902 	void *dig_in = peer_device->connection->int_dig_in;
1903 	void *dig_vv = peer_device->connection->int_dig_vv;
1904 
1905 	digest_size = 0;
1906 	if (peer_device->connection->peer_integrity_tfm) {
1907 		digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1908 		err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1909 		if (err)
1910 			return err;
1911 		data_size -= digest_size;
1912 	}
1913 
1914 	/* optimistically update recv_cnt.  if receiving fails below,
1915 	 * we disconnect anyways, and counters will be reset. */
1916 	peer_device->device->recv_cnt += data_size>>9;
1917 
1918 	bio = req->master_bio;
1919 	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1920 
1921 	bio_for_each_segment(bvec, bio, iter) {
1922 		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1923 		expect = min_t(int, data_size, bvec.bv_len);
1924 		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1925 		kunmap(bvec.bv_page);
1926 		if (err)
1927 			return err;
1928 		data_size -= expect;
1929 	}
1930 
1931 	if (digest_size) {
1932 		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1933 		if (memcmp(dig_in, dig_vv, digest_size)) {
1934 			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1935 			return -EINVAL;
1936 		}
1937 	}
1938 
1939 	D_ASSERT(peer_device->device, data_size == 0);
1940 	return 0;
1941 }
1942 
1943 /*
1944  * e_end_resync_block() is called in ack_sender context via
1945  * drbd_finish_peer_reqs().
1946  */
e_end_resync_block(struct drbd_work * w,int unused)1947 static int e_end_resync_block(struct drbd_work *w, int unused)
1948 {
1949 	struct drbd_peer_request *peer_req =
1950 		container_of(w, struct drbd_peer_request, w);
1951 	struct drbd_peer_device *peer_device = peer_req->peer_device;
1952 	struct drbd_device *device = peer_device->device;
1953 	sector_t sector = peer_req->i.sector;
1954 	int err;
1955 
1956 	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1957 
1958 	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1959 		drbd_set_in_sync(device, sector, peer_req->i.size);
1960 		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1961 	} else {
1962 		/* Record failure to sync */
1963 		drbd_rs_failed_io(device, sector, peer_req->i.size);
1964 
1965 		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1966 	}
1967 	dec_unacked(device);
1968 
1969 	return err;
1970 }
1971 
recv_resync_read(struct drbd_peer_device * peer_device,sector_t sector,struct packet_info * pi)1972 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1973 			    struct packet_info *pi) __releases(local)
1974 {
1975 	struct drbd_device *device = peer_device->device;
1976 	struct drbd_peer_request *peer_req;
1977 
1978 	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1979 	if (!peer_req)
1980 		goto fail;
1981 
1982 	dec_rs_pending(device);
1983 
1984 	inc_unacked(device);
1985 	/* corresponding dec_unacked() in e_end_resync_block()
1986 	 * respective _drbd_clear_done_ee */
1987 
1988 	peer_req->w.cb = e_end_resync_block;
1989 	peer_req->submit_jif = jiffies;
1990 
1991 	spin_lock_irq(&device->resource->req_lock);
1992 	list_add_tail(&peer_req->w.list, &device->sync_ee);
1993 	spin_unlock_irq(&device->resource->req_lock);
1994 
1995 	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1996 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
1997 				     DRBD_FAULT_RS_WR) == 0)
1998 		return 0;
1999 
2000 	/* don't care for the reason here */
2001 	drbd_err(device, "submit failed, triggering re-connect\n");
2002 	spin_lock_irq(&device->resource->req_lock);
2003 	list_del(&peer_req->w.list);
2004 	spin_unlock_irq(&device->resource->req_lock);
2005 
2006 	drbd_free_peer_req(device, peer_req);
2007 fail:
2008 	put_ldev(device);
2009 	return -EIO;
2010 }
2011 
2012 static struct drbd_request *
find_request(struct drbd_device * device,struct rb_root * root,u64 id,sector_t sector,bool missing_ok,const char * func)2013 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2014 	     sector_t sector, bool missing_ok, const char *func)
2015 {
2016 	struct drbd_request *req;
2017 
2018 	/* Request object according to our peer */
2019 	req = (struct drbd_request *)(unsigned long)id;
2020 	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2021 		return req;
2022 	if (!missing_ok) {
2023 		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2024 			(unsigned long)id, (unsigned long long)sector);
2025 	}
2026 	return NULL;
2027 }
2028 
receive_DataReply(struct drbd_connection * connection,struct packet_info * pi)2029 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2030 {
2031 	struct drbd_peer_device *peer_device;
2032 	struct drbd_device *device;
2033 	struct drbd_request *req;
2034 	sector_t sector;
2035 	int err;
2036 	struct p_data *p = pi->data;
2037 
2038 	peer_device = conn_peer_device(connection, pi->vnr);
2039 	if (!peer_device)
2040 		return -EIO;
2041 	device = peer_device->device;
2042 
2043 	sector = be64_to_cpu(p->sector);
2044 
2045 	spin_lock_irq(&device->resource->req_lock);
2046 	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2047 	spin_unlock_irq(&device->resource->req_lock);
2048 	if (unlikely(!req))
2049 		return -EIO;
2050 
2051 	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2052 	 * special casing it there for the various failure cases.
2053 	 * still no race with drbd_fail_pending_reads */
2054 	err = recv_dless_read(peer_device, req, sector, pi->size);
2055 	if (!err)
2056 		req_mod(req, DATA_RECEIVED);
2057 	/* else: nothing. handled from drbd_disconnect...
2058 	 * I don't think we may complete this just yet
2059 	 * in case we are "on-disconnect: freeze" */
2060 
2061 	return err;
2062 }
2063 
receive_RSDataReply(struct drbd_connection * connection,struct packet_info * pi)2064 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2065 {
2066 	struct drbd_peer_device *peer_device;
2067 	struct drbd_device *device;
2068 	sector_t sector;
2069 	int err;
2070 	struct p_data *p = pi->data;
2071 
2072 	peer_device = conn_peer_device(connection, pi->vnr);
2073 	if (!peer_device)
2074 		return -EIO;
2075 	device = peer_device->device;
2076 
2077 	sector = be64_to_cpu(p->sector);
2078 	D_ASSERT(device, p->block_id == ID_SYNCER);
2079 
2080 	if (get_ldev(device)) {
2081 		/* data is submitted to disk within recv_resync_read.
2082 		 * corresponding put_ldev done below on error,
2083 		 * or in drbd_peer_request_endio. */
2084 		err = recv_resync_read(peer_device, sector, pi);
2085 	} else {
2086 		if (__ratelimit(&drbd_ratelimit_state))
2087 			drbd_err(device, "Can not write resync data to local disk.\n");
2088 
2089 		err = drbd_drain_block(peer_device, pi->size);
2090 
2091 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2092 	}
2093 
2094 	atomic_add(pi->size >> 9, &device->rs_sect_in);
2095 
2096 	return err;
2097 }
2098 
restart_conflicting_writes(struct drbd_device * device,sector_t sector,int size)2099 static void restart_conflicting_writes(struct drbd_device *device,
2100 				       sector_t sector, int size)
2101 {
2102 	struct drbd_interval *i;
2103 	struct drbd_request *req;
2104 
2105 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2106 		if (!i->local)
2107 			continue;
2108 		req = container_of(i, struct drbd_request, i);
2109 		if (req->rq_state & RQ_LOCAL_PENDING ||
2110 		    !(req->rq_state & RQ_POSTPONED))
2111 			continue;
2112 		/* as it is RQ_POSTPONED, this will cause it to
2113 		 * be queued on the retry workqueue. */
2114 		__req_mod(req, CONFLICT_RESOLVED, NULL);
2115 	}
2116 }
2117 
2118 /*
2119  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2120  */
e_end_block(struct drbd_work * w,int cancel)2121 static int e_end_block(struct drbd_work *w, int cancel)
2122 {
2123 	struct drbd_peer_request *peer_req =
2124 		container_of(w, struct drbd_peer_request, w);
2125 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2126 	struct drbd_device *device = peer_device->device;
2127 	sector_t sector = peer_req->i.sector;
2128 	int err = 0, pcmd;
2129 
2130 	if (peer_req->flags & EE_SEND_WRITE_ACK) {
2131 		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2132 			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2133 				device->state.conn <= C_PAUSED_SYNC_T &&
2134 				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2135 				P_RS_WRITE_ACK : P_WRITE_ACK;
2136 			err = drbd_send_ack(peer_device, pcmd, peer_req);
2137 			if (pcmd == P_RS_WRITE_ACK)
2138 				drbd_set_in_sync(device, sector, peer_req->i.size);
2139 		} else {
2140 			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2141 			/* we expect it to be marked out of sync anyways...
2142 			 * maybe assert this?  */
2143 		}
2144 		dec_unacked(device);
2145 	}
2146 
2147 	/* we delete from the conflict detection hash _after_ we sent out the
2148 	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2149 	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2150 		spin_lock_irq(&device->resource->req_lock);
2151 		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2152 		drbd_remove_epoch_entry_interval(device, peer_req);
2153 		if (peer_req->flags & EE_RESTART_REQUESTS)
2154 			restart_conflicting_writes(device, sector, peer_req->i.size);
2155 		spin_unlock_irq(&device->resource->req_lock);
2156 	} else
2157 		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2158 
2159 	drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2160 
2161 	return err;
2162 }
2163 
e_send_ack(struct drbd_work * w,enum drbd_packet ack)2164 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2165 {
2166 	struct drbd_peer_request *peer_req =
2167 		container_of(w, struct drbd_peer_request, w);
2168 	struct drbd_peer_device *peer_device = peer_req->peer_device;
2169 	int err;
2170 
2171 	err = drbd_send_ack(peer_device, ack, peer_req);
2172 	dec_unacked(peer_device->device);
2173 
2174 	return err;
2175 }
2176 
e_send_superseded(struct drbd_work * w,int unused)2177 static int e_send_superseded(struct drbd_work *w, int unused)
2178 {
2179 	return e_send_ack(w, P_SUPERSEDED);
2180 }
2181 
e_send_retry_write(struct drbd_work * w,int unused)2182 static int e_send_retry_write(struct drbd_work *w, int unused)
2183 {
2184 	struct drbd_peer_request *peer_req =
2185 		container_of(w, struct drbd_peer_request, w);
2186 	struct drbd_connection *connection = peer_req->peer_device->connection;
2187 
2188 	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2189 			     P_RETRY_WRITE : P_SUPERSEDED);
2190 }
2191 
seq_greater(u32 a,u32 b)2192 static bool seq_greater(u32 a, u32 b)
2193 {
2194 	/*
2195 	 * We assume 32-bit wrap-around here.
2196 	 * For 24-bit wrap-around, we would have to shift:
2197 	 *  a <<= 8; b <<= 8;
2198 	 */
2199 	return (s32)a - (s32)b > 0;
2200 }
2201 
seq_max(u32 a,u32 b)2202 static u32 seq_max(u32 a, u32 b)
2203 {
2204 	return seq_greater(a, b) ? a : b;
2205 }
2206 
update_peer_seq(struct drbd_peer_device * peer_device,unsigned int peer_seq)2207 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2208 {
2209 	struct drbd_device *device = peer_device->device;
2210 	unsigned int newest_peer_seq;
2211 
2212 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2213 		spin_lock(&device->peer_seq_lock);
2214 		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2215 		device->peer_seq = newest_peer_seq;
2216 		spin_unlock(&device->peer_seq_lock);
2217 		/* wake up only if we actually changed device->peer_seq */
2218 		if (peer_seq == newest_peer_seq)
2219 			wake_up(&device->seq_wait);
2220 	}
2221 }
2222 
overlaps(sector_t s1,int l1,sector_t s2,int l2)2223 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2224 {
2225 	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2226 }
2227 
2228 /* maybe change sync_ee into interval trees as well? */
overlapping_resync_write(struct drbd_device * device,struct drbd_peer_request * peer_req)2229 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2230 {
2231 	struct drbd_peer_request *rs_req;
2232 	bool rv = false;
2233 
2234 	spin_lock_irq(&device->resource->req_lock);
2235 	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2236 		if (overlaps(peer_req->i.sector, peer_req->i.size,
2237 			     rs_req->i.sector, rs_req->i.size)) {
2238 			rv = true;
2239 			break;
2240 		}
2241 	}
2242 	spin_unlock_irq(&device->resource->req_lock);
2243 
2244 	return rv;
2245 }
2246 
2247 /* Called from receive_Data.
2248  * Synchronize packets on sock with packets on msock.
2249  *
2250  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2251  * packet traveling on msock, they are still processed in the order they have
2252  * been sent.
2253  *
2254  * Note: we don't care for Ack packets overtaking P_DATA packets.
2255  *
2256  * In case packet_seq is larger than device->peer_seq number, there are
2257  * outstanding packets on the msock. We wait for them to arrive.
2258  * In case we are the logically next packet, we update device->peer_seq
2259  * ourselves. Correctly handles 32bit wrap around.
2260  *
2261  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2262  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2263  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2264  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2265  *
2266  * returns 0 if we may process the packet,
2267  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
wait_for_and_update_peer_seq(struct drbd_peer_device * peer_device,const u32 peer_seq)2268 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2269 {
2270 	struct drbd_device *device = peer_device->device;
2271 	DEFINE_WAIT(wait);
2272 	long timeout;
2273 	int ret = 0, tp;
2274 
2275 	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2276 		return 0;
2277 
2278 	spin_lock(&device->peer_seq_lock);
2279 	for (;;) {
2280 		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2281 			device->peer_seq = seq_max(device->peer_seq, peer_seq);
2282 			break;
2283 		}
2284 
2285 		if (signal_pending(current)) {
2286 			ret = -ERESTARTSYS;
2287 			break;
2288 		}
2289 
2290 		rcu_read_lock();
2291 		tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2292 		rcu_read_unlock();
2293 
2294 		if (!tp)
2295 			break;
2296 
2297 		/* Only need to wait if two_primaries is enabled */
2298 		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2299 		spin_unlock(&device->peer_seq_lock);
2300 		rcu_read_lock();
2301 		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2302 		rcu_read_unlock();
2303 		timeout = schedule_timeout(timeout);
2304 		spin_lock(&device->peer_seq_lock);
2305 		if (!timeout) {
2306 			ret = -ETIMEDOUT;
2307 			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2308 			break;
2309 		}
2310 	}
2311 	spin_unlock(&device->peer_seq_lock);
2312 	finish_wait(&device->seq_wait, &wait);
2313 	return ret;
2314 }
2315 
2316 /* see also bio_flags_to_wire()
2317  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2318  * flags and back. We may replicate to other kernel versions. */
wire_flags_to_bio_flags(u32 dpf)2319 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2320 {
2321 	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2322 		(dpf & DP_FUA ? REQ_FUA : 0) |
2323 		(dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2324 }
2325 
wire_flags_to_bio_op(u32 dpf)2326 static unsigned long wire_flags_to_bio_op(u32 dpf)
2327 {
2328 	if (dpf & DP_DISCARD)
2329 		return REQ_OP_WRITE_ZEROES;
2330 	else
2331 		return REQ_OP_WRITE;
2332 }
2333 
fail_postponed_requests(struct drbd_device * device,sector_t sector,unsigned int size)2334 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2335 				    unsigned int size)
2336 {
2337 	struct drbd_interval *i;
2338 
2339     repeat:
2340 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2341 		struct drbd_request *req;
2342 		struct bio_and_error m;
2343 
2344 		if (!i->local)
2345 			continue;
2346 		req = container_of(i, struct drbd_request, i);
2347 		if (!(req->rq_state & RQ_POSTPONED))
2348 			continue;
2349 		req->rq_state &= ~RQ_POSTPONED;
2350 		__req_mod(req, NEG_ACKED, &m);
2351 		spin_unlock_irq(&device->resource->req_lock);
2352 		if (m.bio)
2353 			complete_master_bio(device, &m);
2354 		spin_lock_irq(&device->resource->req_lock);
2355 		goto repeat;
2356 	}
2357 }
2358 
handle_write_conflicts(struct drbd_device * device,struct drbd_peer_request * peer_req)2359 static int handle_write_conflicts(struct drbd_device *device,
2360 				  struct drbd_peer_request *peer_req)
2361 {
2362 	struct drbd_connection *connection = peer_req->peer_device->connection;
2363 	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2364 	sector_t sector = peer_req->i.sector;
2365 	const unsigned int size = peer_req->i.size;
2366 	struct drbd_interval *i;
2367 	bool equal;
2368 	int err;
2369 
2370 	/*
2371 	 * Inserting the peer request into the write_requests tree will prevent
2372 	 * new conflicting local requests from being added.
2373 	 */
2374 	drbd_insert_interval(&device->write_requests, &peer_req->i);
2375 
2376     repeat:
2377 	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2378 		if (i == &peer_req->i)
2379 			continue;
2380 		if (i->completed)
2381 			continue;
2382 
2383 		if (!i->local) {
2384 			/*
2385 			 * Our peer has sent a conflicting remote request; this
2386 			 * should not happen in a two-node setup.  Wait for the
2387 			 * earlier peer request to complete.
2388 			 */
2389 			err = drbd_wait_misc(device, i);
2390 			if (err)
2391 				goto out;
2392 			goto repeat;
2393 		}
2394 
2395 		equal = i->sector == sector && i->size == size;
2396 		if (resolve_conflicts) {
2397 			/*
2398 			 * If the peer request is fully contained within the
2399 			 * overlapping request, it can be considered overwritten
2400 			 * and thus superseded; otherwise, it will be retried
2401 			 * once all overlapping requests have completed.
2402 			 */
2403 			bool superseded = i->sector <= sector && i->sector +
2404 				       (i->size >> 9) >= sector + (size >> 9);
2405 
2406 			if (!equal)
2407 				drbd_alert(device, "Concurrent writes detected: "
2408 					       "local=%llus +%u, remote=%llus +%u, "
2409 					       "assuming %s came first\n",
2410 					  (unsigned long long)i->sector, i->size,
2411 					  (unsigned long long)sector, size,
2412 					  superseded ? "local" : "remote");
2413 
2414 			peer_req->w.cb = superseded ? e_send_superseded :
2415 						   e_send_retry_write;
2416 			list_add_tail(&peer_req->w.list, &device->done_ee);
2417 			queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2418 
2419 			err = -ENOENT;
2420 			goto out;
2421 		} else {
2422 			struct drbd_request *req =
2423 				container_of(i, struct drbd_request, i);
2424 
2425 			if (!equal)
2426 				drbd_alert(device, "Concurrent writes detected: "
2427 					       "local=%llus +%u, remote=%llus +%u\n",
2428 					  (unsigned long long)i->sector, i->size,
2429 					  (unsigned long long)sector, size);
2430 
2431 			if (req->rq_state & RQ_LOCAL_PENDING ||
2432 			    !(req->rq_state & RQ_POSTPONED)) {
2433 				/*
2434 				 * Wait for the node with the discard flag to
2435 				 * decide if this request has been superseded
2436 				 * or needs to be retried.
2437 				 * Requests that have been superseded will
2438 				 * disappear from the write_requests tree.
2439 				 *
2440 				 * In addition, wait for the conflicting
2441 				 * request to finish locally before submitting
2442 				 * the conflicting peer request.
2443 				 */
2444 				err = drbd_wait_misc(device, &req->i);
2445 				if (err) {
2446 					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2447 					fail_postponed_requests(device, sector, size);
2448 					goto out;
2449 				}
2450 				goto repeat;
2451 			}
2452 			/*
2453 			 * Remember to restart the conflicting requests after
2454 			 * the new peer request has completed.
2455 			 */
2456 			peer_req->flags |= EE_RESTART_REQUESTS;
2457 		}
2458 	}
2459 	err = 0;
2460 
2461     out:
2462 	if (err)
2463 		drbd_remove_epoch_entry_interval(device, peer_req);
2464 	return err;
2465 }
2466 
2467 /* mirrored write */
receive_Data(struct drbd_connection * connection,struct packet_info * pi)2468 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2469 {
2470 	struct drbd_peer_device *peer_device;
2471 	struct drbd_device *device;
2472 	struct net_conf *nc;
2473 	sector_t sector;
2474 	struct drbd_peer_request *peer_req;
2475 	struct p_data *p = pi->data;
2476 	u32 peer_seq = be32_to_cpu(p->seq_num);
2477 	int op, op_flags;
2478 	u32 dp_flags;
2479 	int err, tp;
2480 
2481 	peer_device = conn_peer_device(connection, pi->vnr);
2482 	if (!peer_device)
2483 		return -EIO;
2484 	device = peer_device->device;
2485 
2486 	if (!get_ldev(device)) {
2487 		int err2;
2488 
2489 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2490 		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2491 		atomic_inc(&connection->current_epoch->epoch_size);
2492 		err2 = drbd_drain_block(peer_device, pi->size);
2493 		if (!err)
2494 			err = err2;
2495 		return err;
2496 	}
2497 
2498 	/*
2499 	 * Corresponding put_ldev done either below (on various errors), or in
2500 	 * drbd_peer_request_endio, if we successfully submit the data at the
2501 	 * end of this function.
2502 	 */
2503 
2504 	sector = be64_to_cpu(p->sector);
2505 	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2506 	if (!peer_req) {
2507 		put_ldev(device);
2508 		return -EIO;
2509 	}
2510 
2511 	peer_req->w.cb = e_end_block;
2512 	peer_req->submit_jif = jiffies;
2513 	peer_req->flags |= EE_APPLICATION;
2514 
2515 	dp_flags = be32_to_cpu(p->dp_flags);
2516 	op = wire_flags_to_bio_op(dp_flags);
2517 	op_flags = wire_flags_to_bio_flags(dp_flags);
2518 	if (pi->cmd == P_TRIM) {
2519 		D_ASSERT(peer_device, peer_req->i.size > 0);
2520 		D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2521 		D_ASSERT(peer_device, peer_req->pages == NULL);
2522 	} else if (peer_req->pages == NULL) {
2523 		D_ASSERT(device, peer_req->i.size == 0);
2524 		D_ASSERT(device, dp_flags & DP_FLUSH);
2525 	}
2526 
2527 	if (dp_flags & DP_MAY_SET_IN_SYNC)
2528 		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2529 
2530 	spin_lock(&connection->epoch_lock);
2531 	peer_req->epoch = connection->current_epoch;
2532 	atomic_inc(&peer_req->epoch->epoch_size);
2533 	atomic_inc(&peer_req->epoch->active);
2534 	spin_unlock(&connection->epoch_lock);
2535 
2536 	rcu_read_lock();
2537 	nc = rcu_dereference(peer_device->connection->net_conf);
2538 	tp = nc->two_primaries;
2539 	if (peer_device->connection->agreed_pro_version < 100) {
2540 		switch (nc->wire_protocol) {
2541 		case DRBD_PROT_C:
2542 			dp_flags |= DP_SEND_WRITE_ACK;
2543 			break;
2544 		case DRBD_PROT_B:
2545 			dp_flags |= DP_SEND_RECEIVE_ACK;
2546 			break;
2547 		}
2548 	}
2549 	rcu_read_unlock();
2550 
2551 	if (dp_flags & DP_SEND_WRITE_ACK) {
2552 		peer_req->flags |= EE_SEND_WRITE_ACK;
2553 		inc_unacked(device);
2554 		/* corresponding dec_unacked() in e_end_block()
2555 		 * respective _drbd_clear_done_ee */
2556 	}
2557 
2558 	if (dp_flags & DP_SEND_RECEIVE_ACK) {
2559 		/* I really don't like it that the receiver thread
2560 		 * sends on the msock, but anyways */
2561 		drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2562 	}
2563 
2564 	if (tp) {
2565 		/* two primaries implies protocol C */
2566 		D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2567 		peer_req->flags |= EE_IN_INTERVAL_TREE;
2568 		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2569 		if (err)
2570 			goto out_interrupted;
2571 		spin_lock_irq(&device->resource->req_lock);
2572 		err = handle_write_conflicts(device, peer_req);
2573 		if (err) {
2574 			spin_unlock_irq(&device->resource->req_lock);
2575 			if (err == -ENOENT) {
2576 				put_ldev(device);
2577 				return 0;
2578 			}
2579 			goto out_interrupted;
2580 		}
2581 	} else {
2582 		update_peer_seq(peer_device, peer_seq);
2583 		spin_lock_irq(&device->resource->req_lock);
2584 	}
2585 	/* TRIM and WRITE_SAME are processed synchronously,
2586 	 * we wait for all pending requests, respectively wait for
2587 	 * active_ee to become empty in drbd_submit_peer_request();
2588 	 * better not add ourselves here. */
2589 	if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2590 		list_add_tail(&peer_req->w.list, &device->active_ee);
2591 	spin_unlock_irq(&device->resource->req_lock);
2592 
2593 	if (device->state.conn == C_SYNC_TARGET)
2594 		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2595 
2596 	if (device->state.pdsk < D_INCONSISTENT) {
2597 		/* In case we have the only disk of the cluster, */
2598 		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2599 		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2600 		drbd_al_begin_io(device, &peer_req->i);
2601 		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2602 	}
2603 
2604 	err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2605 				       DRBD_FAULT_DT_WR);
2606 	if (!err)
2607 		return 0;
2608 
2609 	/* don't care for the reason here */
2610 	drbd_err(device, "submit failed, triggering re-connect\n");
2611 	spin_lock_irq(&device->resource->req_lock);
2612 	list_del(&peer_req->w.list);
2613 	drbd_remove_epoch_entry_interval(device, peer_req);
2614 	spin_unlock_irq(&device->resource->req_lock);
2615 	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2616 		peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2617 		drbd_al_complete_io(device, &peer_req->i);
2618 	}
2619 
2620 out_interrupted:
2621 	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2622 	put_ldev(device);
2623 	drbd_free_peer_req(device, peer_req);
2624 	return err;
2625 }
2626 
2627 /* We may throttle resync, if the lower device seems to be busy,
2628  * and current sync rate is above c_min_rate.
2629  *
2630  * To decide whether or not the lower device is busy, we use a scheme similar
2631  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2632  * (more than 64 sectors) of activity we cannot account for with our own resync
2633  * activity, it obviously is "busy".
2634  *
2635  * The current sync rate used here uses only the most recent two step marks,
2636  * to have a short time average so we can react faster.
2637  */
drbd_rs_should_slow_down(struct drbd_device * device,sector_t sector,bool throttle_if_app_is_waiting)2638 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2639 		bool throttle_if_app_is_waiting)
2640 {
2641 	struct lc_element *tmp;
2642 	bool throttle = drbd_rs_c_min_rate_throttle(device);
2643 
2644 	if (!throttle || throttle_if_app_is_waiting)
2645 		return throttle;
2646 
2647 	spin_lock_irq(&device->al_lock);
2648 	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2649 	if (tmp) {
2650 		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2651 		if (test_bit(BME_PRIORITY, &bm_ext->flags))
2652 			throttle = false;
2653 		/* Do not slow down if app IO is already waiting for this extent,
2654 		 * and our progress is necessary for application IO to complete. */
2655 	}
2656 	spin_unlock_irq(&device->al_lock);
2657 
2658 	return throttle;
2659 }
2660 
drbd_rs_c_min_rate_throttle(struct drbd_device * device)2661 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2662 {
2663 	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2664 	unsigned long db, dt, dbdt;
2665 	unsigned int c_min_rate;
2666 	int curr_events;
2667 
2668 	rcu_read_lock();
2669 	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2670 	rcu_read_unlock();
2671 
2672 	/* feature disabled? */
2673 	if (c_min_rate == 0)
2674 		return false;
2675 
2676 	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2677 		      (int)part_stat_read(&disk->part0, sectors[1]) -
2678 			atomic_read(&device->rs_sect_ev);
2679 
2680 	if (atomic_read(&device->ap_actlog_cnt)
2681 	    || curr_events - device->rs_last_events > 64) {
2682 		unsigned long rs_left;
2683 		int i;
2684 
2685 		device->rs_last_events = curr_events;
2686 
2687 		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2688 		 * approx. */
2689 		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2690 
2691 		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2692 			rs_left = device->ov_left;
2693 		else
2694 			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2695 
2696 		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2697 		if (!dt)
2698 			dt++;
2699 		db = device->rs_mark_left[i] - rs_left;
2700 		dbdt = Bit2KB(db/dt);
2701 
2702 		if (dbdt > c_min_rate)
2703 			return true;
2704 	}
2705 	return false;
2706 }
2707 
receive_DataRequest(struct drbd_connection * connection,struct packet_info * pi)2708 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2709 {
2710 	struct drbd_peer_device *peer_device;
2711 	struct drbd_device *device;
2712 	sector_t sector;
2713 	sector_t capacity;
2714 	struct drbd_peer_request *peer_req;
2715 	struct digest_info *di = NULL;
2716 	int size, verb;
2717 	unsigned int fault_type;
2718 	struct p_block_req *p =	pi->data;
2719 
2720 	peer_device = conn_peer_device(connection, pi->vnr);
2721 	if (!peer_device)
2722 		return -EIO;
2723 	device = peer_device->device;
2724 	capacity = drbd_get_capacity(device->this_bdev);
2725 
2726 	sector = be64_to_cpu(p->sector);
2727 	size   = be32_to_cpu(p->blksize);
2728 
2729 	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2730 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2731 				(unsigned long long)sector, size);
2732 		return -EINVAL;
2733 	}
2734 	if (sector + (size>>9) > capacity) {
2735 		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2736 				(unsigned long long)sector, size);
2737 		return -EINVAL;
2738 	}
2739 
2740 	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2741 		verb = 1;
2742 		switch (pi->cmd) {
2743 		case P_DATA_REQUEST:
2744 			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2745 			break;
2746 		case P_RS_THIN_REQ:
2747 		case P_RS_DATA_REQUEST:
2748 		case P_CSUM_RS_REQUEST:
2749 		case P_OV_REQUEST:
2750 			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2751 			break;
2752 		case P_OV_REPLY:
2753 			verb = 0;
2754 			dec_rs_pending(device);
2755 			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2756 			break;
2757 		default:
2758 			BUG();
2759 		}
2760 		if (verb && __ratelimit(&drbd_ratelimit_state))
2761 			drbd_err(device, "Can not satisfy peer's read request, "
2762 			    "no local data.\n");
2763 
2764 		/* drain possibly payload */
2765 		return drbd_drain_block(peer_device, pi->size);
2766 	}
2767 
2768 	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2769 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
2770 	 * which in turn might block on the other node at this very place.  */
2771 	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2772 			size, GFP_NOIO);
2773 	if (!peer_req) {
2774 		put_ldev(device);
2775 		return -ENOMEM;
2776 	}
2777 
2778 	switch (pi->cmd) {
2779 	case P_DATA_REQUEST:
2780 		peer_req->w.cb = w_e_end_data_req;
2781 		fault_type = DRBD_FAULT_DT_RD;
2782 		/* application IO, don't drbd_rs_begin_io */
2783 		peer_req->flags |= EE_APPLICATION;
2784 		goto submit;
2785 
2786 	case P_RS_THIN_REQ:
2787 		/* If at some point in the future we have a smart way to
2788 		   find out if this data block is completely deallocated,
2789 		   then we would do something smarter here than reading
2790 		   the block... */
2791 		peer_req->flags |= EE_RS_THIN_REQ;
2792 	case P_RS_DATA_REQUEST:
2793 		peer_req->w.cb = w_e_end_rsdata_req;
2794 		fault_type = DRBD_FAULT_RS_RD;
2795 		/* used in the sector offset progress display */
2796 		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2797 		break;
2798 
2799 	case P_OV_REPLY:
2800 	case P_CSUM_RS_REQUEST:
2801 		fault_type = DRBD_FAULT_RS_RD;
2802 		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2803 		if (!di)
2804 			goto out_free_e;
2805 
2806 		di->digest_size = pi->size;
2807 		di->digest = (((char *)di)+sizeof(struct digest_info));
2808 
2809 		peer_req->digest = di;
2810 		peer_req->flags |= EE_HAS_DIGEST;
2811 
2812 		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2813 			goto out_free_e;
2814 
2815 		if (pi->cmd == P_CSUM_RS_REQUEST) {
2816 			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2817 			peer_req->w.cb = w_e_end_csum_rs_req;
2818 			/* used in the sector offset progress display */
2819 			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2820 			/* remember to report stats in drbd_resync_finished */
2821 			device->use_csums = true;
2822 		} else if (pi->cmd == P_OV_REPLY) {
2823 			/* track progress, we may need to throttle */
2824 			atomic_add(size >> 9, &device->rs_sect_in);
2825 			peer_req->w.cb = w_e_end_ov_reply;
2826 			dec_rs_pending(device);
2827 			/* drbd_rs_begin_io done when we sent this request,
2828 			 * but accounting still needs to be done. */
2829 			goto submit_for_resync;
2830 		}
2831 		break;
2832 
2833 	case P_OV_REQUEST:
2834 		if (device->ov_start_sector == ~(sector_t)0 &&
2835 		    peer_device->connection->agreed_pro_version >= 90) {
2836 			unsigned long now = jiffies;
2837 			int i;
2838 			device->ov_start_sector = sector;
2839 			device->ov_position = sector;
2840 			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2841 			device->rs_total = device->ov_left;
2842 			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2843 				device->rs_mark_left[i] = device->ov_left;
2844 				device->rs_mark_time[i] = now;
2845 			}
2846 			drbd_info(device, "Online Verify start sector: %llu\n",
2847 					(unsigned long long)sector);
2848 		}
2849 		peer_req->w.cb = w_e_end_ov_req;
2850 		fault_type = DRBD_FAULT_RS_RD;
2851 		break;
2852 
2853 	default:
2854 		BUG();
2855 	}
2856 
2857 	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
2858 	 * wrt the receiver, but it is not as straightforward as it may seem.
2859 	 * Various places in the resync start and stop logic assume resync
2860 	 * requests are processed in order, requeuing this on the worker thread
2861 	 * introduces a bunch of new code for synchronization between threads.
2862 	 *
2863 	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2864 	 * "forever", throttling after drbd_rs_begin_io will lock that extent
2865 	 * for application writes for the same time.  For now, just throttle
2866 	 * here, where the rest of the code expects the receiver to sleep for
2867 	 * a while, anyways.
2868 	 */
2869 
2870 	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
2871 	 * this defers syncer requests for some time, before letting at least
2872 	 * on request through.  The resync controller on the receiving side
2873 	 * will adapt to the incoming rate accordingly.
2874 	 *
2875 	 * We cannot throttle here if remote is Primary/SyncTarget:
2876 	 * we would also throttle its application reads.
2877 	 * In that case, throttling is done on the SyncTarget only.
2878 	 */
2879 
2880 	/* Even though this may be a resync request, we do add to "read_ee";
2881 	 * "sync_ee" is only used for resync WRITEs.
2882 	 * Add to list early, so debugfs can find this request
2883 	 * even if we have to sleep below. */
2884 	spin_lock_irq(&device->resource->req_lock);
2885 	list_add_tail(&peer_req->w.list, &device->read_ee);
2886 	spin_unlock_irq(&device->resource->req_lock);
2887 
2888 	update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2889 	if (device->state.peer != R_PRIMARY
2890 	&& drbd_rs_should_slow_down(device, sector, false))
2891 		schedule_timeout_uninterruptible(HZ/10);
2892 	update_receiver_timing_details(connection, drbd_rs_begin_io);
2893 	if (drbd_rs_begin_io(device, sector))
2894 		goto out_free_e;
2895 
2896 submit_for_resync:
2897 	atomic_add(size >> 9, &device->rs_sect_ev);
2898 
2899 submit:
2900 	update_receiver_timing_details(connection, drbd_submit_peer_request);
2901 	inc_unacked(device);
2902 	if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2903 				     fault_type) == 0)
2904 		return 0;
2905 
2906 	/* don't care for the reason here */
2907 	drbd_err(device, "submit failed, triggering re-connect\n");
2908 
2909 out_free_e:
2910 	spin_lock_irq(&device->resource->req_lock);
2911 	list_del(&peer_req->w.list);
2912 	spin_unlock_irq(&device->resource->req_lock);
2913 	/* no drbd_rs_complete_io(), we are dropping the connection anyways */
2914 
2915 	put_ldev(device);
2916 	drbd_free_peer_req(device, peer_req);
2917 	return -EIO;
2918 }
2919 
2920 /**
2921  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2922  */
drbd_asb_recover_0p(struct drbd_peer_device * peer_device)2923 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2924 {
2925 	struct drbd_device *device = peer_device->device;
2926 	int self, peer, rv = -100;
2927 	unsigned long ch_self, ch_peer;
2928 	enum drbd_after_sb_p after_sb_0p;
2929 
2930 	self = device->ldev->md.uuid[UI_BITMAP] & 1;
2931 	peer = device->p_uuid[UI_BITMAP] & 1;
2932 
2933 	ch_peer = device->p_uuid[UI_SIZE];
2934 	ch_self = device->comm_bm_set;
2935 
2936 	rcu_read_lock();
2937 	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2938 	rcu_read_unlock();
2939 	switch (after_sb_0p) {
2940 	case ASB_CONSENSUS:
2941 	case ASB_DISCARD_SECONDARY:
2942 	case ASB_CALL_HELPER:
2943 	case ASB_VIOLENTLY:
2944 		drbd_err(device, "Configuration error.\n");
2945 		break;
2946 	case ASB_DISCONNECT:
2947 		break;
2948 	case ASB_DISCARD_YOUNGER_PRI:
2949 		if (self == 0 && peer == 1) {
2950 			rv = -1;
2951 			break;
2952 		}
2953 		if (self == 1 && peer == 0) {
2954 			rv =  1;
2955 			break;
2956 		}
2957 		/* Else fall through to one of the other strategies... */
2958 	case ASB_DISCARD_OLDER_PRI:
2959 		if (self == 0 && peer == 1) {
2960 			rv = 1;
2961 			break;
2962 		}
2963 		if (self == 1 && peer == 0) {
2964 			rv = -1;
2965 			break;
2966 		}
2967 		/* Else fall through to one of the other strategies... */
2968 		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2969 		     "Using discard-least-changes instead\n");
2970 	case ASB_DISCARD_ZERO_CHG:
2971 		if (ch_peer == 0 && ch_self == 0) {
2972 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2973 				? -1 : 1;
2974 			break;
2975 		} else {
2976 			if (ch_peer == 0) { rv =  1; break; }
2977 			if (ch_self == 0) { rv = -1; break; }
2978 		}
2979 		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2980 			break;
2981 	case ASB_DISCARD_LEAST_CHG:
2982 		if	(ch_self < ch_peer)
2983 			rv = -1;
2984 		else if (ch_self > ch_peer)
2985 			rv =  1;
2986 		else /* ( ch_self == ch_peer ) */
2987 		     /* Well, then use something else. */
2988 			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2989 				? -1 : 1;
2990 		break;
2991 	case ASB_DISCARD_LOCAL:
2992 		rv = -1;
2993 		break;
2994 	case ASB_DISCARD_REMOTE:
2995 		rv =  1;
2996 	}
2997 
2998 	return rv;
2999 }
3000 
3001 /**
3002  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3003  */
drbd_asb_recover_1p(struct drbd_peer_device * peer_device)3004 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3005 {
3006 	struct drbd_device *device = peer_device->device;
3007 	int hg, rv = -100;
3008 	enum drbd_after_sb_p after_sb_1p;
3009 
3010 	rcu_read_lock();
3011 	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3012 	rcu_read_unlock();
3013 	switch (after_sb_1p) {
3014 	case ASB_DISCARD_YOUNGER_PRI:
3015 	case ASB_DISCARD_OLDER_PRI:
3016 	case ASB_DISCARD_LEAST_CHG:
3017 	case ASB_DISCARD_LOCAL:
3018 	case ASB_DISCARD_REMOTE:
3019 	case ASB_DISCARD_ZERO_CHG:
3020 		drbd_err(device, "Configuration error.\n");
3021 		break;
3022 	case ASB_DISCONNECT:
3023 		break;
3024 	case ASB_CONSENSUS:
3025 		hg = drbd_asb_recover_0p(peer_device);
3026 		if (hg == -1 && device->state.role == R_SECONDARY)
3027 			rv = hg;
3028 		if (hg == 1  && device->state.role == R_PRIMARY)
3029 			rv = hg;
3030 		break;
3031 	case ASB_VIOLENTLY:
3032 		rv = drbd_asb_recover_0p(peer_device);
3033 		break;
3034 	case ASB_DISCARD_SECONDARY:
3035 		return device->state.role == R_PRIMARY ? 1 : -1;
3036 	case ASB_CALL_HELPER:
3037 		hg = drbd_asb_recover_0p(peer_device);
3038 		if (hg == -1 && device->state.role == R_PRIMARY) {
3039 			enum drbd_state_rv rv2;
3040 
3041 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3042 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3043 			  * we do not need to wait for the after state change work either. */
3044 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3045 			if (rv2 != SS_SUCCESS) {
3046 				drbd_khelper(device, "pri-lost-after-sb");
3047 			} else {
3048 				drbd_warn(device, "Successfully gave up primary role.\n");
3049 				rv = hg;
3050 			}
3051 		} else
3052 			rv = hg;
3053 	}
3054 
3055 	return rv;
3056 }
3057 
3058 /**
3059  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3060  */
drbd_asb_recover_2p(struct drbd_peer_device * peer_device)3061 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3062 {
3063 	struct drbd_device *device = peer_device->device;
3064 	int hg, rv = -100;
3065 	enum drbd_after_sb_p after_sb_2p;
3066 
3067 	rcu_read_lock();
3068 	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3069 	rcu_read_unlock();
3070 	switch (after_sb_2p) {
3071 	case ASB_DISCARD_YOUNGER_PRI:
3072 	case ASB_DISCARD_OLDER_PRI:
3073 	case ASB_DISCARD_LEAST_CHG:
3074 	case ASB_DISCARD_LOCAL:
3075 	case ASB_DISCARD_REMOTE:
3076 	case ASB_CONSENSUS:
3077 	case ASB_DISCARD_SECONDARY:
3078 	case ASB_DISCARD_ZERO_CHG:
3079 		drbd_err(device, "Configuration error.\n");
3080 		break;
3081 	case ASB_VIOLENTLY:
3082 		rv = drbd_asb_recover_0p(peer_device);
3083 		break;
3084 	case ASB_DISCONNECT:
3085 		break;
3086 	case ASB_CALL_HELPER:
3087 		hg = drbd_asb_recover_0p(peer_device);
3088 		if (hg == -1) {
3089 			enum drbd_state_rv rv2;
3090 
3091 			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3092 			  * we might be here in C_WF_REPORT_PARAMS which is transient.
3093 			  * we do not need to wait for the after state change work either. */
3094 			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3095 			if (rv2 != SS_SUCCESS) {
3096 				drbd_khelper(device, "pri-lost-after-sb");
3097 			} else {
3098 				drbd_warn(device, "Successfully gave up primary role.\n");
3099 				rv = hg;
3100 			}
3101 		} else
3102 			rv = hg;
3103 	}
3104 
3105 	return rv;
3106 }
3107 
drbd_uuid_dump(struct drbd_device * device,char * text,u64 * uuid,u64 bits,u64 flags)3108 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3109 			   u64 bits, u64 flags)
3110 {
3111 	if (!uuid) {
3112 		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3113 		return;
3114 	}
3115 	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3116 	     text,
3117 	     (unsigned long long)uuid[UI_CURRENT],
3118 	     (unsigned long long)uuid[UI_BITMAP],
3119 	     (unsigned long long)uuid[UI_HISTORY_START],
3120 	     (unsigned long long)uuid[UI_HISTORY_END],
3121 	     (unsigned long long)bits,
3122 	     (unsigned long long)flags);
3123 }
3124 
3125 /*
3126   100	after split brain try auto recover
3127     2	C_SYNC_SOURCE set BitMap
3128     1	C_SYNC_SOURCE use BitMap
3129     0	no Sync
3130    -1	C_SYNC_TARGET use BitMap
3131    -2	C_SYNC_TARGET set BitMap
3132  -100	after split brain, disconnect
3133 -1000	unrelated data
3134 -1091   requires proto 91
3135 -1096   requires proto 96
3136  */
3137 
drbd_uuid_compare(struct drbd_device * const device,enum drbd_role const peer_role,int * rule_nr)3138 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3139 {
3140 	struct drbd_peer_device *const peer_device = first_peer_device(device);
3141 	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3142 	u64 self, peer;
3143 	int i, j;
3144 
3145 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3146 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3147 
3148 	*rule_nr = 10;
3149 	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3150 		return 0;
3151 
3152 	*rule_nr = 20;
3153 	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3154 	     peer != UUID_JUST_CREATED)
3155 		return -2;
3156 
3157 	*rule_nr = 30;
3158 	if (self != UUID_JUST_CREATED &&
3159 	    (peer == UUID_JUST_CREATED || peer == (u64)0))
3160 		return 2;
3161 
3162 	if (self == peer) {
3163 		int rct, dc; /* roles at crash time */
3164 
3165 		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3166 
3167 			if (connection->agreed_pro_version < 91)
3168 				return -1091;
3169 
3170 			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3171 			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3172 				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3173 				drbd_uuid_move_history(device);
3174 				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3175 				device->ldev->md.uuid[UI_BITMAP] = 0;
3176 
3177 				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3178 					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3179 				*rule_nr = 34;
3180 			} else {
3181 				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3182 				*rule_nr = 36;
3183 			}
3184 
3185 			return 1;
3186 		}
3187 
3188 		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3189 
3190 			if (connection->agreed_pro_version < 91)
3191 				return -1091;
3192 
3193 			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3194 			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3195 				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3196 
3197 				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3198 				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3199 				device->p_uuid[UI_BITMAP] = 0UL;
3200 
3201 				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3202 				*rule_nr = 35;
3203 			} else {
3204 				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3205 				*rule_nr = 37;
3206 			}
3207 
3208 			return -1;
3209 		}
3210 
3211 		/* Common power [off|failure] */
3212 		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3213 			(device->p_uuid[UI_FLAGS] & 2);
3214 		/* lowest bit is set when we were primary,
3215 		 * next bit (weight 2) is set when peer was primary */
3216 		*rule_nr = 40;
3217 
3218 		/* Neither has the "crashed primary" flag set,
3219 		 * only a replication link hickup. */
3220 		if (rct == 0)
3221 			return 0;
3222 
3223 		/* Current UUID equal and no bitmap uuid; does not necessarily
3224 		 * mean this was a "simultaneous hard crash", maybe IO was
3225 		 * frozen, so no UUID-bump happened.
3226 		 * This is a protocol change, overload DRBD_FF_WSAME as flag
3227 		 * for "new-enough" peer DRBD version. */
3228 		if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3229 			*rule_nr = 41;
3230 			if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3231 				drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3232 				return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3233 			}
3234 			if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3235 				/* At least one has the "crashed primary" bit set,
3236 				 * both are primary now, but neither has rotated its UUIDs?
3237 				 * "Can not happen." */
3238 				drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3239 				return -100;
3240 			}
3241 			if (device->state.role == R_PRIMARY)
3242 				return 1;
3243 			return -1;
3244 		}
3245 
3246 		/* Both are secondary.
3247 		 * Really looks like recovery from simultaneous hard crash.
3248 		 * Check which had been primary before, and arbitrate. */
3249 		switch (rct) {
3250 		case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3251 		case 1: /*  self_pri && !peer_pri */ return 1;
3252 		case 2: /* !self_pri &&  peer_pri */ return -1;
3253 		case 3: /*  self_pri &&  peer_pri */
3254 			dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3255 			return dc ? -1 : 1;
3256 		}
3257 	}
3258 
3259 	*rule_nr = 50;
3260 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3261 	if (self == peer)
3262 		return -1;
3263 
3264 	*rule_nr = 51;
3265 	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3266 	if (self == peer) {
3267 		if (connection->agreed_pro_version < 96 ?
3268 		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3269 		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3270 		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3271 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3272 			   resync as sync source modifications of the peer's UUIDs. */
3273 
3274 			if (connection->agreed_pro_version < 91)
3275 				return -1091;
3276 
3277 			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3278 			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3279 
3280 			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3281 			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3282 
3283 			return -1;
3284 		}
3285 	}
3286 
3287 	*rule_nr = 60;
3288 	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3289 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3290 		peer = device->p_uuid[i] & ~((u64)1);
3291 		if (self == peer)
3292 			return -2;
3293 	}
3294 
3295 	*rule_nr = 70;
3296 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3297 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3298 	if (self == peer)
3299 		return 1;
3300 
3301 	*rule_nr = 71;
3302 	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3303 	if (self == peer) {
3304 		if (connection->agreed_pro_version < 96 ?
3305 		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3306 		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3307 		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3308 			/* The last P_SYNC_UUID did not get though. Undo the last start of
3309 			   resync as sync source modifications of our UUIDs. */
3310 
3311 			if (connection->agreed_pro_version < 91)
3312 				return -1091;
3313 
3314 			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3315 			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3316 
3317 			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3318 			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3319 				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3320 
3321 			return 1;
3322 		}
3323 	}
3324 
3325 
3326 	*rule_nr = 80;
3327 	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3328 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3329 		self = device->ldev->md.uuid[i] & ~((u64)1);
3330 		if (self == peer)
3331 			return 2;
3332 	}
3333 
3334 	*rule_nr = 90;
3335 	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3336 	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3337 	if (self == peer && self != ((u64)0))
3338 		return 100;
3339 
3340 	*rule_nr = 100;
3341 	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3342 		self = device->ldev->md.uuid[i] & ~((u64)1);
3343 		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3344 			peer = device->p_uuid[j] & ~((u64)1);
3345 			if (self == peer)
3346 				return -100;
3347 		}
3348 	}
3349 
3350 	return -1000;
3351 }
3352 
3353 /* drbd_sync_handshake() returns the new conn state on success, or
3354    CONN_MASK (-1) on failure.
3355  */
drbd_sync_handshake(struct drbd_peer_device * peer_device,enum drbd_role peer_role,enum drbd_disk_state peer_disk)3356 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3357 					   enum drbd_role peer_role,
3358 					   enum drbd_disk_state peer_disk) __must_hold(local)
3359 {
3360 	struct drbd_device *device = peer_device->device;
3361 	enum drbd_conns rv = C_MASK;
3362 	enum drbd_disk_state mydisk;
3363 	struct net_conf *nc;
3364 	int hg, rule_nr, rr_conflict, tentative, always_asbp;
3365 
3366 	mydisk = device->state.disk;
3367 	if (mydisk == D_NEGOTIATING)
3368 		mydisk = device->new_state_tmp.disk;
3369 
3370 	drbd_info(device, "drbd_sync_handshake:\n");
3371 
3372 	spin_lock_irq(&device->ldev->md.uuid_lock);
3373 	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3374 	drbd_uuid_dump(device, "peer", device->p_uuid,
3375 		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3376 
3377 	hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3378 	spin_unlock_irq(&device->ldev->md.uuid_lock);
3379 
3380 	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3381 
3382 	if (hg == -1000) {
3383 		drbd_alert(device, "Unrelated data, aborting!\n");
3384 		return C_MASK;
3385 	}
3386 	if (hg < -0x10000) {
3387 		int proto, fflags;
3388 		hg = -hg;
3389 		proto = hg & 0xff;
3390 		fflags = (hg >> 8) & 0xff;
3391 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3392 					proto, fflags);
3393 		return C_MASK;
3394 	}
3395 	if (hg < -1000) {
3396 		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3397 		return C_MASK;
3398 	}
3399 
3400 	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3401 	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3402 		int f = (hg == -100) || abs(hg) == 2;
3403 		hg = mydisk > D_INCONSISTENT ? 1 : -1;
3404 		if (f)
3405 			hg = hg*2;
3406 		drbd_info(device, "Becoming sync %s due to disk states.\n",
3407 		     hg > 0 ? "source" : "target");
3408 	}
3409 
3410 	if (abs(hg) == 100)
3411 		drbd_khelper(device, "initial-split-brain");
3412 
3413 	rcu_read_lock();
3414 	nc = rcu_dereference(peer_device->connection->net_conf);
3415 	always_asbp = nc->always_asbp;
3416 	rr_conflict = nc->rr_conflict;
3417 	tentative = nc->tentative;
3418 	rcu_read_unlock();
3419 
3420 	if (hg == 100 || (hg == -100 && always_asbp)) {
3421 		int pcount = (device->state.role == R_PRIMARY)
3422 			   + (peer_role == R_PRIMARY);
3423 		int forced = (hg == -100);
3424 
3425 		switch (pcount) {
3426 		case 0:
3427 			hg = drbd_asb_recover_0p(peer_device);
3428 			break;
3429 		case 1:
3430 			hg = drbd_asb_recover_1p(peer_device);
3431 			break;
3432 		case 2:
3433 			hg = drbd_asb_recover_2p(peer_device);
3434 			break;
3435 		}
3436 		if (abs(hg) < 100) {
3437 			drbd_warn(device, "Split-Brain detected, %d primaries, "
3438 			     "automatically solved. Sync from %s node\n",
3439 			     pcount, (hg < 0) ? "peer" : "this");
3440 			if (forced) {
3441 				drbd_warn(device, "Doing a full sync, since"
3442 				     " UUIDs where ambiguous.\n");
3443 				hg = hg*2;
3444 			}
3445 		}
3446 	}
3447 
3448 	if (hg == -100) {
3449 		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3450 			hg = -1;
3451 		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3452 			hg = 1;
3453 
3454 		if (abs(hg) < 100)
3455 			drbd_warn(device, "Split-Brain detected, manually solved. "
3456 			     "Sync from %s node\n",
3457 			     (hg < 0) ? "peer" : "this");
3458 	}
3459 
3460 	if (hg == -100) {
3461 		/* FIXME this log message is not correct if we end up here
3462 		 * after an attempted attach on a diskless node.
3463 		 * We just refuse to attach -- well, we drop the "connection"
3464 		 * to that disk, in a way... */
3465 		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3466 		drbd_khelper(device, "split-brain");
3467 		return C_MASK;
3468 	}
3469 
3470 	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3471 		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3472 		return C_MASK;
3473 	}
3474 
3475 	if (hg < 0 && /* by intention we do not use mydisk here. */
3476 	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3477 		switch (rr_conflict) {
3478 		case ASB_CALL_HELPER:
3479 			drbd_khelper(device, "pri-lost");
3480 			/* fall through */
3481 		case ASB_DISCONNECT:
3482 			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3483 			return C_MASK;
3484 		case ASB_VIOLENTLY:
3485 			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3486 			     "assumption\n");
3487 		}
3488 	}
3489 
3490 	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3491 		if (hg == 0)
3492 			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3493 		else
3494 			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3495 				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3496 				 abs(hg) >= 2 ? "full" : "bit-map based");
3497 		return C_MASK;
3498 	}
3499 
3500 	if (abs(hg) >= 2) {
3501 		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3502 		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3503 					BM_LOCKED_SET_ALLOWED))
3504 			return C_MASK;
3505 	}
3506 
3507 	if (hg > 0) { /* become sync source. */
3508 		rv = C_WF_BITMAP_S;
3509 	} else if (hg < 0) { /* become sync target */
3510 		rv = C_WF_BITMAP_T;
3511 	} else {
3512 		rv = C_CONNECTED;
3513 		if (drbd_bm_total_weight(device)) {
3514 			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3515 			     drbd_bm_total_weight(device));
3516 		}
3517 	}
3518 
3519 	return rv;
3520 }
3521 
convert_after_sb(enum drbd_after_sb_p peer)3522 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3523 {
3524 	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3525 	if (peer == ASB_DISCARD_REMOTE)
3526 		return ASB_DISCARD_LOCAL;
3527 
3528 	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3529 	if (peer == ASB_DISCARD_LOCAL)
3530 		return ASB_DISCARD_REMOTE;
3531 
3532 	/* everything else is valid if they are equal on both sides. */
3533 	return peer;
3534 }
3535 
receive_protocol(struct drbd_connection * connection,struct packet_info * pi)3536 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3537 {
3538 	struct p_protocol *p = pi->data;
3539 	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3540 	int p_proto, p_discard_my_data, p_two_primaries, cf;
3541 	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3542 	char integrity_alg[SHARED_SECRET_MAX] = "";
3543 	struct crypto_ahash *peer_integrity_tfm = NULL;
3544 	void *int_dig_in = NULL, *int_dig_vv = NULL;
3545 
3546 	p_proto		= be32_to_cpu(p->protocol);
3547 	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
3548 	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
3549 	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
3550 	p_two_primaries = be32_to_cpu(p->two_primaries);
3551 	cf		= be32_to_cpu(p->conn_flags);
3552 	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3553 
3554 	if (connection->agreed_pro_version >= 87) {
3555 		int err;
3556 
3557 		if (pi->size > sizeof(integrity_alg))
3558 			return -EIO;
3559 		err = drbd_recv_all(connection, integrity_alg, pi->size);
3560 		if (err)
3561 			return err;
3562 		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3563 	}
3564 
3565 	if (pi->cmd != P_PROTOCOL_UPDATE) {
3566 		clear_bit(CONN_DRY_RUN, &connection->flags);
3567 
3568 		if (cf & CF_DRY_RUN)
3569 			set_bit(CONN_DRY_RUN, &connection->flags);
3570 
3571 		rcu_read_lock();
3572 		nc = rcu_dereference(connection->net_conf);
3573 
3574 		if (p_proto != nc->wire_protocol) {
3575 			drbd_err(connection, "incompatible %s settings\n", "protocol");
3576 			goto disconnect_rcu_unlock;
3577 		}
3578 
3579 		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3580 			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3581 			goto disconnect_rcu_unlock;
3582 		}
3583 
3584 		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3585 			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3586 			goto disconnect_rcu_unlock;
3587 		}
3588 
3589 		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3590 			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3591 			goto disconnect_rcu_unlock;
3592 		}
3593 
3594 		if (p_discard_my_data && nc->discard_my_data) {
3595 			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3596 			goto disconnect_rcu_unlock;
3597 		}
3598 
3599 		if (p_two_primaries != nc->two_primaries) {
3600 			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3601 			goto disconnect_rcu_unlock;
3602 		}
3603 
3604 		if (strcmp(integrity_alg, nc->integrity_alg)) {
3605 			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3606 			goto disconnect_rcu_unlock;
3607 		}
3608 
3609 		rcu_read_unlock();
3610 	}
3611 
3612 	if (integrity_alg[0]) {
3613 		int hash_size;
3614 
3615 		/*
3616 		 * We can only change the peer data integrity algorithm
3617 		 * here.  Changing our own data integrity algorithm
3618 		 * requires that we send a P_PROTOCOL_UPDATE packet at
3619 		 * the same time; otherwise, the peer has no way to
3620 		 * tell between which packets the algorithm should
3621 		 * change.
3622 		 */
3623 
3624 		peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3625 		if (IS_ERR(peer_integrity_tfm)) {
3626 			peer_integrity_tfm = NULL;
3627 			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3628 				 integrity_alg);
3629 			goto disconnect;
3630 		}
3631 
3632 		hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3633 		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3634 		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3635 		if (!(int_dig_in && int_dig_vv)) {
3636 			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3637 			goto disconnect;
3638 		}
3639 	}
3640 
3641 	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3642 	if (!new_net_conf) {
3643 		drbd_err(connection, "Allocation of new net_conf failed\n");
3644 		goto disconnect;
3645 	}
3646 
3647 	mutex_lock(&connection->data.mutex);
3648 	mutex_lock(&connection->resource->conf_update);
3649 	old_net_conf = connection->net_conf;
3650 	*new_net_conf = *old_net_conf;
3651 
3652 	new_net_conf->wire_protocol = p_proto;
3653 	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3654 	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3655 	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3656 	new_net_conf->two_primaries = p_two_primaries;
3657 
3658 	rcu_assign_pointer(connection->net_conf, new_net_conf);
3659 	mutex_unlock(&connection->resource->conf_update);
3660 	mutex_unlock(&connection->data.mutex);
3661 
3662 	crypto_free_ahash(connection->peer_integrity_tfm);
3663 	kfree(connection->int_dig_in);
3664 	kfree(connection->int_dig_vv);
3665 	connection->peer_integrity_tfm = peer_integrity_tfm;
3666 	connection->int_dig_in = int_dig_in;
3667 	connection->int_dig_vv = int_dig_vv;
3668 
3669 	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3670 		drbd_info(connection, "peer data-integrity-alg: %s\n",
3671 			  integrity_alg[0] ? integrity_alg : "(none)");
3672 
3673 	synchronize_rcu();
3674 	kfree(old_net_conf);
3675 	return 0;
3676 
3677 disconnect_rcu_unlock:
3678 	rcu_read_unlock();
3679 disconnect:
3680 	crypto_free_ahash(peer_integrity_tfm);
3681 	kfree(int_dig_in);
3682 	kfree(int_dig_vv);
3683 	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3684 	return -EIO;
3685 }
3686 
3687 /* helper function
3688  * input: alg name, feature name
3689  * return: NULL (alg name was "")
3690  *         ERR_PTR(error) if something goes wrong
3691  *         or the crypto hash ptr, if it worked out ok. */
drbd_crypto_alloc_digest_safe(const struct drbd_device * device,const char * alg,const char * name)3692 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3693 		const char *alg, const char *name)
3694 {
3695 	struct crypto_ahash *tfm;
3696 
3697 	if (!alg[0])
3698 		return NULL;
3699 
3700 	tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3701 	if (IS_ERR(tfm)) {
3702 		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3703 			alg, name, PTR_ERR(tfm));
3704 		return tfm;
3705 	}
3706 	return tfm;
3707 }
3708 
ignore_remaining_packet(struct drbd_connection * connection,struct packet_info * pi)3709 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3710 {
3711 	void *buffer = connection->data.rbuf;
3712 	int size = pi->size;
3713 
3714 	while (size) {
3715 		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3716 		s = drbd_recv(connection, buffer, s);
3717 		if (s <= 0) {
3718 			if (s < 0)
3719 				return s;
3720 			break;
3721 		}
3722 		size -= s;
3723 	}
3724 	if (size)
3725 		return -EIO;
3726 	return 0;
3727 }
3728 
3729 /*
3730  * config_unknown_volume  -  device configuration command for unknown volume
3731  *
3732  * When a device is added to an existing connection, the node on which the
3733  * device is added first will send configuration commands to its peer but the
3734  * peer will not know about the device yet.  It will warn and ignore these
3735  * commands.  Once the device is added on the second node, the second node will
3736  * send the same device configuration commands, but in the other direction.
3737  *
3738  * (We can also end up here if drbd is misconfigured.)
3739  */
config_unknown_volume(struct drbd_connection * connection,struct packet_info * pi)3740 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3741 {
3742 	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3743 		  cmdname(pi->cmd), pi->vnr);
3744 	return ignore_remaining_packet(connection, pi);
3745 }
3746 
receive_SyncParam(struct drbd_connection * connection,struct packet_info * pi)3747 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3748 {
3749 	struct drbd_peer_device *peer_device;
3750 	struct drbd_device *device;
3751 	struct p_rs_param_95 *p;
3752 	unsigned int header_size, data_size, exp_max_sz;
3753 	struct crypto_ahash *verify_tfm = NULL;
3754 	struct crypto_ahash *csums_tfm = NULL;
3755 	struct net_conf *old_net_conf, *new_net_conf = NULL;
3756 	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3757 	const int apv = connection->agreed_pro_version;
3758 	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3759 	int fifo_size = 0;
3760 	int err;
3761 
3762 	peer_device = conn_peer_device(connection, pi->vnr);
3763 	if (!peer_device)
3764 		return config_unknown_volume(connection, pi);
3765 	device = peer_device->device;
3766 
3767 	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3768 		    : apv == 88 ? sizeof(struct p_rs_param)
3769 					+ SHARED_SECRET_MAX
3770 		    : apv <= 94 ? sizeof(struct p_rs_param_89)
3771 		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3772 
3773 	if (pi->size > exp_max_sz) {
3774 		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3775 		    pi->size, exp_max_sz);
3776 		return -EIO;
3777 	}
3778 
3779 	if (apv <= 88) {
3780 		header_size = sizeof(struct p_rs_param);
3781 		data_size = pi->size - header_size;
3782 	} else if (apv <= 94) {
3783 		header_size = sizeof(struct p_rs_param_89);
3784 		data_size = pi->size - header_size;
3785 		D_ASSERT(device, data_size == 0);
3786 	} else {
3787 		header_size = sizeof(struct p_rs_param_95);
3788 		data_size = pi->size - header_size;
3789 		D_ASSERT(device, data_size == 0);
3790 	}
3791 
3792 	/* initialize verify_alg and csums_alg */
3793 	p = pi->data;
3794 	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3795 
3796 	err = drbd_recv_all(peer_device->connection, p, header_size);
3797 	if (err)
3798 		return err;
3799 
3800 	mutex_lock(&connection->resource->conf_update);
3801 	old_net_conf = peer_device->connection->net_conf;
3802 	if (get_ldev(device)) {
3803 		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3804 		if (!new_disk_conf) {
3805 			put_ldev(device);
3806 			mutex_unlock(&connection->resource->conf_update);
3807 			drbd_err(device, "Allocation of new disk_conf failed\n");
3808 			return -ENOMEM;
3809 		}
3810 
3811 		old_disk_conf = device->ldev->disk_conf;
3812 		*new_disk_conf = *old_disk_conf;
3813 
3814 		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3815 	}
3816 
3817 	if (apv >= 88) {
3818 		if (apv == 88) {
3819 			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3820 				drbd_err(device, "verify-alg of wrong size, "
3821 					"peer wants %u, accepting only up to %u byte\n",
3822 					data_size, SHARED_SECRET_MAX);
3823 				err = -EIO;
3824 				goto reconnect;
3825 			}
3826 
3827 			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3828 			if (err)
3829 				goto reconnect;
3830 			/* we expect NUL terminated string */
3831 			/* but just in case someone tries to be evil */
3832 			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3833 			p->verify_alg[data_size-1] = 0;
3834 
3835 		} else /* apv >= 89 */ {
3836 			/* we still expect NUL terminated strings */
3837 			/* but just in case someone tries to be evil */
3838 			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3839 			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3840 			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3841 			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3842 		}
3843 
3844 		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3845 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3846 				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3847 				    old_net_conf->verify_alg, p->verify_alg);
3848 				goto disconnect;
3849 			}
3850 			verify_tfm = drbd_crypto_alloc_digest_safe(device,
3851 					p->verify_alg, "verify-alg");
3852 			if (IS_ERR(verify_tfm)) {
3853 				verify_tfm = NULL;
3854 				goto disconnect;
3855 			}
3856 		}
3857 
3858 		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3859 			if (device->state.conn == C_WF_REPORT_PARAMS) {
3860 				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3861 				    old_net_conf->csums_alg, p->csums_alg);
3862 				goto disconnect;
3863 			}
3864 			csums_tfm = drbd_crypto_alloc_digest_safe(device,
3865 					p->csums_alg, "csums-alg");
3866 			if (IS_ERR(csums_tfm)) {
3867 				csums_tfm = NULL;
3868 				goto disconnect;
3869 			}
3870 		}
3871 
3872 		if (apv > 94 && new_disk_conf) {
3873 			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3874 			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3875 			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3876 			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3877 
3878 			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3879 			if (fifo_size != device->rs_plan_s->size) {
3880 				new_plan = fifo_alloc(fifo_size);
3881 				if (!new_plan) {
3882 					drbd_err(device, "kmalloc of fifo_buffer failed");
3883 					put_ldev(device);
3884 					goto disconnect;
3885 				}
3886 			}
3887 		}
3888 
3889 		if (verify_tfm || csums_tfm) {
3890 			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3891 			if (!new_net_conf) {
3892 				drbd_err(device, "Allocation of new net_conf failed\n");
3893 				goto disconnect;
3894 			}
3895 
3896 			*new_net_conf = *old_net_conf;
3897 
3898 			if (verify_tfm) {
3899 				strcpy(new_net_conf->verify_alg, p->verify_alg);
3900 				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3901 				crypto_free_ahash(peer_device->connection->verify_tfm);
3902 				peer_device->connection->verify_tfm = verify_tfm;
3903 				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3904 			}
3905 			if (csums_tfm) {
3906 				strcpy(new_net_conf->csums_alg, p->csums_alg);
3907 				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3908 				crypto_free_ahash(peer_device->connection->csums_tfm);
3909 				peer_device->connection->csums_tfm = csums_tfm;
3910 				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3911 			}
3912 			rcu_assign_pointer(connection->net_conf, new_net_conf);
3913 		}
3914 	}
3915 
3916 	if (new_disk_conf) {
3917 		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3918 		put_ldev(device);
3919 	}
3920 
3921 	if (new_plan) {
3922 		old_plan = device->rs_plan_s;
3923 		rcu_assign_pointer(device->rs_plan_s, new_plan);
3924 	}
3925 
3926 	mutex_unlock(&connection->resource->conf_update);
3927 	synchronize_rcu();
3928 	if (new_net_conf)
3929 		kfree(old_net_conf);
3930 	kfree(old_disk_conf);
3931 	kfree(old_plan);
3932 
3933 	return 0;
3934 
3935 reconnect:
3936 	if (new_disk_conf) {
3937 		put_ldev(device);
3938 		kfree(new_disk_conf);
3939 	}
3940 	mutex_unlock(&connection->resource->conf_update);
3941 	return -EIO;
3942 
3943 disconnect:
3944 	kfree(new_plan);
3945 	if (new_disk_conf) {
3946 		put_ldev(device);
3947 		kfree(new_disk_conf);
3948 	}
3949 	mutex_unlock(&connection->resource->conf_update);
3950 	/* just for completeness: actually not needed,
3951 	 * as this is not reached if csums_tfm was ok. */
3952 	crypto_free_ahash(csums_tfm);
3953 	/* but free the verify_tfm again, if csums_tfm did not work out */
3954 	crypto_free_ahash(verify_tfm);
3955 	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3956 	return -EIO;
3957 }
3958 
3959 /* warn if the arguments differ by more than 12.5% */
warn_if_differ_considerably(struct drbd_device * device,const char * s,sector_t a,sector_t b)3960 static void warn_if_differ_considerably(struct drbd_device *device,
3961 	const char *s, sector_t a, sector_t b)
3962 {
3963 	sector_t d;
3964 	if (a == 0 || b == 0)
3965 		return;
3966 	d = (a > b) ? (a - b) : (b - a);
3967 	if (d > (a>>3) || d > (b>>3))
3968 		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3969 		     (unsigned long long)a, (unsigned long long)b);
3970 }
3971 
receive_sizes(struct drbd_connection * connection,struct packet_info * pi)3972 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3973 {
3974 	struct drbd_peer_device *peer_device;
3975 	struct drbd_device *device;
3976 	struct p_sizes *p = pi->data;
3977 	struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
3978 	enum determine_dev_size dd = DS_UNCHANGED;
3979 	sector_t p_size, p_usize, p_csize, my_usize;
3980 	sector_t new_size, cur_size;
3981 	int ldsc = 0; /* local disk size changed */
3982 	enum dds_flags ddsf;
3983 
3984 	peer_device = conn_peer_device(connection, pi->vnr);
3985 	if (!peer_device)
3986 		return config_unknown_volume(connection, pi);
3987 	device = peer_device->device;
3988 	cur_size = drbd_get_capacity(device->this_bdev);
3989 
3990 	p_size = be64_to_cpu(p->d_size);
3991 	p_usize = be64_to_cpu(p->u_size);
3992 	p_csize = be64_to_cpu(p->c_size);
3993 
3994 	/* just store the peer's disk size for now.
3995 	 * we still need to figure out whether we accept that. */
3996 	device->p_size = p_size;
3997 
3998 	if (get_ldev(device)) {
3999 		rcu_read_lock();
4000 		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4001 		rcu_read_unlock();
4002 
4003 		warn_if_differ_considerably(device, "lower level device sizes",
4004 			   p_size, drbd_get_max_capacity(device->ldev));
4005 		warn_if_differ_considerably(device, "user requested size",
4006 					    p_usize, my_usize);
4007 
4008 		/* if this is the first connect, or an otherwise expected
4009 		 * param exchange, choose the minimum */
4010 		if (device->state.conn == C_WF_REPORT_PARAMS)
4011 			p_usize = min_not_zero(my_usize, p_usize);
4012 
4013 		/* Never shrink a device with usable data during connect.
4014 		   But allow online shrinking if we are connected. */
4015 		new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4016 		if (new_size < cur_size &&
4017 		    device->state.disk >= D_OUTDATED &&
4018 		    device->state.conn < C_CONNECTED) {
4019 			drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4020 					(unsigned long long)new_size, (unsigned long long)cur_size);
4021 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4022 			put_ldev(device);
4023 			return -EIO;
4024 		}
4025 
4026 		if (my_usize != p_usize) {
4027 			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4028 
4029 			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4030 			if (!new_disk_conf) {
4031 				drbd_err(device, "Allocation of new disk_conf failed\n");
4032 				put_ldev(device);
4033 				return -ENOMEM;
4034 			}
4035 
4036 			mutex_lock(&connection->resource->conf_update);
4037 			old_disk_conf = device->ldev->disk_conf;
4038 			*new_disk_conf = *old_disk_conf;
4039 			new_disk_conf->disk_size = p_usize;
4040 
4041 			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4042 			mutex_unlock(&connection->resource->conf_update);
4043 			synchronize_rcu();
4044 			kfree(old_disk_conf);
4045 
4046 			drbd_info(device, "Peer sets u_size to %lu sectors\n",
4047 				 (unsigned long)my_usize);
4048 		}
4049 
4050 		put_ldev(device);
4051 	}
4052 
4053 	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4054 	/* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4055 	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4056 	   drbd_reconsider_queue_parameters(), we can be sure that after
4057 	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4058 
4059 	ddsf = be16_to_cpu(p->dds_flags);
4060 	if (get_ldev(device)) {
4061 		drbd_reconsider_queue_parameters(device, device->ldev, o);
4062 		dd = drbd_determine_dev_size(device, ddsf, NULL);
4063 		put_ldev(device);
4064 		if (dd == DS_ERROR)
4065 			return -EIO;
4066 		drbd_md_sync(device);
4067 	} else {
4068 		/*
4069 		 * I am diskless, need to accept the peer's *current* size.
4070 		 * I must NOT accept the peers backing disk size,
4071 		 * it may have been larger than mine all along...
4072 		 *
4073 		 * At this point, the peer knows more about my disk, or at
4074 		 * least about what we last agreed upon, than myself.
4075 		 * So if his c_size is less than his d_size, the most likely
4076 		 * reason is that *my* d_size was smaller last time we checked.
4077 		 *
4078 		 * However, if he sends a zero current size,
4079 		 * take his (user-capped or) backing disk size anyways.
4080 		 *
4081 		 * Unless of course he does not have a disk himself.
4082 		 * In which case we ignore this completely.
4083 		 */
4084 		sector_t new_size = p_csize ?: p_usize ?: p_size;
4085 		drbd_reconsider_queue_parameters(device, NULL, o);
4086 		if (new_size == 0) {
4087 			/* Ignore, peer does not know nothing. */
4088 		} else if (new_size == cur_size) {
4089 			/* nothing to do */
4090 		} else if (cur_size != 0 && p_size == 0) {
4091 			drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4092 					(unsigned long long)new_size, (unsigned long long)cur_size);
4093 		} else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4094 			drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4095 					(unsigned long long)new_size, (unsigned long long)cur_size);
4096 			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4097 			return -EIO;
4098 		} else {
4099 			/* I believe the peer, if
4100 			 *  - I don't have a current size myself
4101 			 *  - we agree on the size anyways
4102 			 *  - I do have a current size, am Secondary,
4103 			 *    and he has the only disk
4104 			 *  - I do have a current size, am Primary,
4105 			 *    and he has the only disk,
4106 			 *    which is larger than my current size
4107 			 */
4108 			drbd_set_my_capacity(device, new_size);
4109 		}
4110 	}
4111 
4112 	if (get_ldev(device)) {
4113 		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4114 			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4115 			ldsc = 1;
4116 		}
4117 
4118 		put_ldev(device);
4119 	}
4120 
4121 	if (device->state.conn > C_WF_REPORT_PARAMS) {
4122 		if (be64_to_cpu(p->c_size) !=
4123 		    drbd_get_capacity(device->this_bdev) || ldsc) {
4124 			/* we have different sizes, probably peer
4125 			 * needs to know my new size... */
4126 			drbd_send_sizes(peer_device, 0, ddsf);
4127 		}
4128 		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4129 		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4130 			if (device->state.pdsk >= D_INCONSISTENT &&
4131 			    device->state.disk >= D_INCONSISTENT) {
4132 				if (ddsf & DDSF_NO_RESYNC)
4133 					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4134 				else
4135 					resync_after_online_grow(device);
4136 			} else
4137 				set_bit(RESYNC_AFTER_NEG, &device->flags);
4138 		}
4139 	}
4140 
4141 	return 0;
4142 }
4143 
receive_uuids(struct drbd_connection * connection,struct packet_info * pi)4144 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4145 {
4146 	struct drbd_peer_device *peer_device;
4147 	struct drbd_device *device;
4148 	struct p_uuids *p = pi->data;
4149 	u64 *p_uuid;
4150 	int i, updated_uuids = 0;
4151 
4152 	peer_device = conn_peer_device(connection, pi->vnr);
4153 	if (!peer_device)
4154 		return config_unknown_volume(connection, pi);
4155 	device = peer_device->device;
4156 
4157 	p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4158 	if (!p_uuid) {
4159 		drbd_err(device, "kmalloc of p_uuid failed\n");
4160 		return false;
4161 	}
4162 
4163 	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4164 		p_uuid[i] = be64_to_cpu(p->uuid[i]);
4165 
4166 	kfree(device->p_uuid);
4167 	device->p_uuid = p_uuid;
4168 
4169 	if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4170 	    device->state.disk < D_INCONSISTENT &&
4171 	    device->state.role == R_PRIMARY &&
4172 	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4173 		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4174 		    (unsigned long long)device->ed_uuid);
4175 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4176 		return -EIO;
4177 	}
4178 
4179 	if (get_ldev(device)) {
4180 		int skip_initial_sync =
4181 			device->state.conn == C_CONNECTED &&
4182 			peer_device->connection->agreed_pro_version >= 90 &&
4183 			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4184 			(p_uuid[UI_FLAGS] & 8);
4185 		if (skip_initial_sync) {
4186 			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4187 			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4188 					"clear_n_write from receive_uuids",
4189 					BM_LOCKED_TEST_ALLOWED);
4190 			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4191 			_drbd_uuid_set(device, UI_BITMAP, 0);
4192 			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4193 					CS_VERBOSE, NULL);
4194 			drbd_md_sync(device);
4195 			updated_uuids = 1;
4196 		}
4197 		put_ldev(device);
4198 	} else if (device->state.disk < D_INCONSISTENT &&
4199 		   device->state.role == R_PRIMARY) {
4200 		/* I am a diskless primary, the peer just created a new current UUID
4201 		   for me. */
4202 		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4203 	}
4204 
4205 	/* Before we test for the disk state, we should wait until an eventually
4206 	   ongoing cluster wide state change is finished. That is important if
4207 	   we are primary and are detaching from our disk. We need to see the
4208 	   new disk state... */
4209 	mutex_lock(device->state_mutex);
4210 	mutex_unlock(device->state_mutex);
4211 	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4212 		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4213 
4214 	if (updated_uuids)
4215 		drbd_print_uuids(device, "receiver updated UUIDs to");
4216 
4217 	return 0;
4218 }
4219 
4220 /**
4221  * convert_state() - Converts the peer's view of the cluster state to our point of view
4222  * @ps:		The state as seen by the peer.
4223  */
convert_state(union drbd_state ps)4224 static union drbd_state convert_state(union drbd_state ps)
4225 {
4226 	union drbd_state ms;
4227 
4228 	static enum drbd_conns c_tab[] = {
4229 		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4230 		[C_CONNECTED] = C_CONNECTED,
4231 
4232 		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4233 		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4234 		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4235 		[C_VERIFY_S]       = C_VERIFY_T,
4236 		[C_MASK]   = C_MASK,
4237 	};
4238 
4239 	ms.i = ps.i;
4240 
4241 	ms.conn = c_tab[ps.conn];
4242 	ms.peer = ps.role;
4243 	ms.role = ps.peer;
4244 	ms.pdsk = ps.disk;
4245 	ms.disk = ps.pdsk;
4246 	ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4247 
4248 	return ms;
4249 }
4250 
receive_req_state(struct drbd_connection * connection,struct packet_info * pi)4251 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4252 {
4253 	struct drbd_peer_device *peer_device;
4254 	struct drbd_device *device;
4255 	struct p_req_state *p = pi->data;
4256 	union drbd_state mask, val;
4257 	enum drbd_state_rv rv;
4258 
4259 	peer_device = conn_peer_device(connection, pi->vnr);
4260 	if (!peer_device)
4261 		return -EIO;
4262 	device = peer_device->device;
4263 
4264 	mask.i = be32_to_cpu(p->mask);
4265 	val.i = be32_to_cpu(p->val);
4266 
4267 	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4268 	    mutex_is_locked(device->state_mutex)) {
4269 		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4270 		return 0;
4271 	}
4272 
4273 	mask = convert_state(mask);
4274 	val = convert_state(val);
4275 
4276 	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4277 	drbd_send_sr_reply(peer_device, rv);
4278 
4279 	drbd_md_sync(device);
4280 
4281 	return 0;
4282 }
4283 
receive_req_conn_state(struct drbd_connection * connection,struct packet_info * pi)4284 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4285 {
4286 	struct p_req_state *p = pi->data;
4287 	union drbd_state mask, val;
4288 	enum drbd_state_rv rv;
4289 
4290 	mask.i = be32_to_cpu(p->mask);
4291 	val.i = be32_to_cpu(p->val);
4292 
4293 	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4294 	    mutex_is_locked(&connection->cstate_mutex)) {
4295 		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4296 		return 0;
4297 	}
4298 
4299 	mask = convert_state(mask);
4300 	val = convert_state(val);
4301 
4302 	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4303 	conn_send_sr_reply(connection, rv);
4304 
4305 	return 0;
4306 }
4307 
receive_state(struct drbd_connection * connection,struct packet_info * pi)4308 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4309 {
4310 	struct drbd_peer_device *peer_device;
4311 	struct drbd_device *device;
4312 	struct p_state *p = pi->data;
4313 	union drbd_state os, ns, peer_state;
4314 	enum drbd_disk_state real_peer_disk;
4315 	enum chg_state_flags cs_flags;
4316 	int rv;
4317 
4318 	peer_device = conn_peer_device(connection, pi->vnr);
4319 	if (!peer_device)
4320 		return config_unknown_volume(connection, pi);
4321 	device = peer_device->device;
4322 
4323 	peer_state.i = be32_to_cpu(p->state);
4324 
4325 	real_peer_disk = peer_state.disk;
4326 	if (peer_state.disk == D_NEGOTIATING) {
4327 		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4328 		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4329 	}
4330 
4331 	spin_lock_irq(&device->resource->req_lock);
4332  retry:
4333 	os = ns = drbd_read_state(device);
4334 	spin_unlock_irq(&device->resource->req_lock);
4335 
4336 	/* If some other part of the code (ack_receiver thread, timeout)
4337 	 * already decided to close the connection again,
4338 	 * we must not "re-establish" it here. */
4339 	if (os.conn <= C_TEAR_DOWN)
4340 		return -ECONNRESET;
4341 
4342 	/* If this is the "end of sync" confirmation, usually the peer disk
4343 	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4344 	 * set) resync started in PausedSyncT, or if the timing of pause-/
4345 	 * unpause-sync events has been "just right", the peer disk may
4346 	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4347 	 */
4348 	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4349 	    real_peer_disk == D_UP_TO_DATE &&
4350 	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4351 		/* If we are (becoming) SyncSource, but peer is still in sync
4352 		 * preparation, ignore its uptodate-ness to avoid flapping, it
4353 		 * will change to inconsistent once the peer reaches active
4354 		 * syncing states.
4355 		 * It may have changed syncer-paused flags, however, so we
4356 		 * cannot ignore this completely. */
4357 		if (peer_state.conn > C_CONNECTED &&
4358 		    peer_state.conn < C_SYNC_SOURCE)
4359 			real_peer_disk = D_INCONSISTENT;
4360 
4361 		/* if peer_state changes to connected at the same time,
4362 		 * it explicitly notifies us that it finished resync.
4363 		 * Maybe we should finish it up, too? */
4364 		else if (os.conn >= C_SYNC_SOURCE &&
4365 			 peer_state.conn == C_CONNECTED) {
4366 			if (drbd_bm_total_weight(device) <= device->rs_failed)
4367 				drbd_resync_finished(device);
4368 			return 0;
4369 		}
4370 	}
4371 
4372 	/* explicit verify finished notification, stop sector reached. */
4373 	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4374 	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4375 		ov_out_of_sync_print(device);
4376 		drbd_resync_finished(device);
4377 		return 0;
4378 	}
4379 
4380 	/* peer says his disk is inconsistent, while we think it is uptodate,
4381 	 * and this happens while the peer still thinks we have a sync going on,
4382 	 * but we think we are already done with the sync.
4383 	 * We ignore this to avoid flapping pdsk.
4384 	 * This should not happen, if the peer is a recent version of drbd. */
4385 	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4386 	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4387 		real_peer_disk = D_UP_TO_DATE;
4388 
4389 	if (ns.conn == C_WF_REPORT_PARAMS)
4390 		ns.conn = C_CONNECTED;
4391 
4392 	if (peer_state.conn == C_AHEAD)
4393 		ns.conn = C_BEHIND;
4394 
4395 	/* TODO:
4396 	 * if (primary and diskless and peer uuid != effective uuid)
4397 	 *     abort attach on peer;
4398 	 *
4399 	 * If this node does not have good data, was already connected, but
4400 	 * the peer did a late attach only now, trying to "negotiate" with me,
4401 	 * AND I am currently Primary, possibly frozen, with some specific
4402 	 * "effective" uuid, this should never be reached, really, because
4403 	 * we first send the uuids, then the current state.
4404 	 *
4405 	 * In this scenario, we already dropped the connection hard
4406 	 * when we received the unsuitable uuids (receive_uuids().
4407 	 *
4408 	 * Should we want to change this, that is: not drop the connection in
4409 	 * receive_uuids() already, then we would need to add a branch here
4410 	 * that aborts the attach of "unsuitable uuids" on the peer in case
4411 	 * this node is currently Diskless Primary.
4412 	 */
4413 
4414 	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4415 	    get_ldev_if_state(device, D_NEGOTIATING)) {
4416 		int cr; /* consider resync */
4417 
4418 		/* if we established a new connection */
4419 		cr  = (os.conn < C_CONNECTED);
4420 		/* if we had an established connection
4421 		 * and one of the nodes newly attaches a disk */
4422 		cr |= (os.conn == C_CONNECTED &&
4423 		       (peer_state.disk == D_NEGOTIATING ||
4424 			os.disk == D_NEGOTIATING));
4425 		/* if we have both been inconsistent, and the peer has been
4426 		 * forced to be UpToDate with --overwrite-data */
4427 		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4428 		/* if we had been plain connected, and the admin requested to
4429 		 * start a sync by "invalidate" or "invalidate-remote" */
4430 		cr |= (os.conn == C_CONNECTED &&
4431 				(peer_state.conn >= C_STARTING_SYNC_S &&
4432 				 peer_state.conn <= C_WF_BITMAP_T));
4433 
4434 		if (cr)
4435 			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4436 
4437 		put_ldev(device);
4438 		if (ns.conn == C_MASK) {
4439 			ns.conn = C_CONNECTED;
4440 			if (device->state.disk == D_NEGOTIATING) {
4441 				drbd_force_state(device, NS(disk, D_FAILED));
4442 			} else if (peer_state.disk == D_NEGOTIATING) {
4443 				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4444 				peer_state.disk = D_DISKLESS;
4445 				real_peer_disk = D_DISKLESS;
4446 			} else {
4447 				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4448 					return -EIO;
4449 				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4450 				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4451 				return -EIO;
4452 			}
4453 		}
4454 	}
4455 
4456 	spin_lock_irq(&device->resource->req_lock);
4457 	if (os.i != drbd_read_state(device).i)
4458 		goto retry;
4459 	clear_bit(CONSIDER_RESYNC, &device->flags);
4460 	ns.peer = peer_state.role;
4461 	ns.pdsk = real_peer_disk;
4462 	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4463 	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4464 		ns.disk = device->new_state_tmp.disk;
4465 	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4466 	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4467 	    test_bit(NEW_CUR_UUID, &device->flags)) {
4468 		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4469 		   for temporal network outages! */
4470 		spin_unlock_irq(&device->resource->req_lock);
4471 		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4472 		tl_clear(peer_device->connection);
4473 		drbd_uuid_new_current(device);
4474 		clear_bit(NEW_CUR_UUID, &device->flags);
4475 		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4476 		return -EIO;
4477 	}
4478 	rv = _drbd_set_state(device, ns, cs_flags, NULL);
4479 	ns = drbd_read_state(device);
4480 	spin_unlock_irq(&device->resource->req_lock);
4481 
4482 	if (rv < SS_SUCCESS) {
4483 		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4484 		return -EIO;
4485 	}
4486 
4487 	if (os.conn > C_WF_REPORT_PARAMS) {
4488 		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4489 		    peer_state.disk != D_NEGOTIATING ) {
4490 			/* we want resync, peer has not yet decided to sync... */
4491 			/* Nowadays only used when forcing a node into primary role and
4492 			   setting its disk to UpToDate with that */
4493 			drbd_send_uuids(peer_device);
4494 			drbd_send_current_state(peer_device);
4495 		}
4496 	}
4497 
4498 	clear_bit(DISCARD_MY_DATA, &device->flags);
4499 
4500 	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4501 
4502 	return 0;
4503 }
4504 
receive_sync_uuid(struct drbd_connection * connection,struct packet_info * pi)4505 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4506 {
4507 	struct drbd_peer_device *peer_device;
4508 	struct drbd_device *device;
4509 	struct p_rs_uuid *p = pi->data;
4510 
4511 	peer_device = conn_peer_device(connection, pi->vnr);
4512 	if (!peer_device)
4513 		return -EIO;
4514 	device = peer_device->device;
4515 
4516 	wait_event(device->misc_wait,
4517 		   device->state.conn == C_WF_SYNC_UUID ||
4518 		   device->state.conn == C_BEHIND ||
4519 		   device->state.conn < C_CONNECTED ||
4520 		   device->state.disk < D_NEGOTIATING);
4521 
4522 	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4523 
4524 	/* Here the _drbd_uuid_ functions are right, current should
4525 	   _not_ be rotated into the history */
4526 	if (get_ldev_if_state(device, D_NEGOTIATING)) {
4527 		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4528 		_drbd_uuid_set(device, UI_BITMAP, 0UL);
4529 
4530 		drbd_print_uuids(device, "updated sync uuid");
4531 		drbd_start_resync(device, C_SYNC_TARGET);
4532 
4533 		put_ldev(device);
4534 	} else
4535 		drbd_err(device, "Ignoring SyncUUID packet!\n");
4536 
4537 	return 0;
4538 }
4539 
4540 /**
4541  * receive_bitmap_plain
4542  *
4543  * Return 0 when done, 1 when another iteration is needed, and a negative error
4544  * code upon failure.
4545  */
4546 static int
receive_bitmap_plain(struct drbd_peer_device * peer_device,unsigned int size,unsigned long * p,struct bm_xfer_ctx * c)4547 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4548 		     unsigned long *p, struct bm_xfer_ctx *c)
4549 {
4550 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4551 				 drbd_header_size(peer_device->connection);
4552 	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4553 				       c->bm_words - c->word_offset);
4554 	unsigned int want = num_words * sizeof(*p);
4555 	int err;
4556 
4557 	if (want != size) {
4558 		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4559 		return -EIO;
4560 	}
4561 	if (want == 0)
4562 		return 0;
4563 	err = drbd_recv_all(peer_device->connection, p, want);
4564 	if (err)
4565 		return err;
4566 
4567 	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4568 
4569 	c->word_offset += num_words;
4570 	c->bit_offset = c->word_offset * BITS_PER_LONG;
4571 	if (c->bit_offset > c->bm_bits)
4572 		c->bit_offset = c->bm_bits;
4573 
4574 	return 1;
4575 }
4576 
dcbp_get_code(struct p_compressed_bm * p)4577 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4578 {
4579 	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4580 }
4581 
dcbp_get_start(struct p_compressed_bm * p)4582 static int dcbp_get_start(struct p_compressed_bm *p)
4583 {
4584 	return (p->encoding & 0x80) != 0;
4585 }
4586 
dcbp_get_pad_bits(struct p_compressed_bm * p)4587 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4588 {
4589 	return (p->encoding >> 4) & 0x7;
4590 }
4591 
4592 /**
4593  * recv_bm_rle_bits
4594  *
4595  * Return 0 when done, 1 when another iteration is needed, and a negative error
4596  * code upon failure.
4597  */
4598 static int
recv_bm_rle_bits(struct drbd_peer_device * peer_device,struct p_compressed_bm * p,struct bm_xfer_ctx * c,unsigned int len)4599 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4600 		struct p_compressed_bm *p,
4601 		 struct bm_xfer_ctx *c,
4602 		 unsigned int len)
4603 {
4604 	struct bitstream bs;
4605 	u64 look_ahead;
4606 	u64 rl;
4607 	u64 tmp;
4608 	unsigned long s = c->bit_offset;
4609 	unsigned long e;
4610 	int toggle = dcbp_get_start(p);
4611 	int have;
4612 	int bits;
4613 
4614 	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4615 
4616 	bits = bitstream_get_bits(&bs, &look_ahead, 64);
4617 	if (bits < 0)
4618 		return -EIO;
4619 
4620 	for (have = bits; have > 0; s += rl, toggle = !toggle) {
4621 		bits = vli_decode_bits(&rl, look_ahead);
4622 		if (bits <= 0)
4623 			return -EIO;
4624 
4625 		if (toggle) {
4626 			e = s + rl -1;
4627 			if (e >= c->bm_bits) {
4628 				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4629 				return -EIO;
4630 			}
4631 			_drbd_bm_set_bits(peer_device->device, s, e);
4632 		}
4633 
4634 		if (have < bits) {
4635 			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4636 				have, bits, look_ahead,
4637 				(unsigned int)(bs.cur.b - p->code),
4638 				(unsigned int)bs.buf_len);
4639 			return -EIO;
4640 		}
4641 		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4642 		if (likely(bits < 64))
4643 			look_ahead >>= bits;
4644 		else
4645 			look_ahead = 0;
4646 		have -= bits;
4647 
4648 		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4649 		if (bits < 0)
4650 			return -EIO;
4651 		look_ahead |= tmp << have;
4652 		have += bits;
4653 	}
4654 
4655 	c->bit_offset = s;
4656 	bm_xfer_ctx_bit_to_word_offset(c);
4657 
4658 	return (s != c->bm_bits);
4659 }
4660 
4661 /**
4662  * decode_bitmap_c
4663  *
4664  * Return 0 when done, 1 when another iteration is needed, and a negative error
4665  * code upon failure.
4666  */
4667 static int
decode_bitmap_c(struct drbd_peer_device * peer_device,struct p_compressed_bm * p,struct bm_xfer_ctx * c,unsigned int len)4668 decode_bitmap_c(struct drbd_peer_device *peer_device,
4669 		struct p_compressed_bm *p,
4670 		struct bm_xfer_ctx *c,
4671 		unsigned int len)
4672 {
4673 	if (dcbp_get_code(p) == RLE_VLI_Bits)
4674 		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4675 
4676 	/* other variants had been implemented for evaluation,
4677 	 * but have been dropped as this one turned out to be "best"
4678 	 * during all our tests. */
4679 
4680 	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4681 	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4682 	return -EIO;
4683 }
4684 
INFO_bm_xfer_stats(struct drbd_device * device,const char * direction,struct bm_xfer_ctx * c)4685 void INFO_bm_xfer_stats(struct drbd_device *device,
4686 		const char *direction, struct bm_xfer_ctx *c)
4687 {
4688 	/* what would it take to transfer it "plaintext" */
4689 	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4690 	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4691 	unsigned int plain =
4692 		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4693 		c->bm_words * sizeof(unsigned long);
4694 	unsigned int total = c->bytes[0] + c->bytes[1];
4695 	unsigned int r;
4696 
4697 	/* total can not be zero. but just in case: */
4698 	if (total == 0)
4699 		return;
4700 
4701 	/* don't report if not compressed */
4702 	if (total >= plain)
4703 		return;
4704 
4705 	/* total < plain. check for overflow, still */
4706 	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4707 		                    : (1000 * total / plain);
4708 
4709 	if (r > 1000)
4710 		r = 1000;
4711 
4712 	r = 1000 - r;
4713 	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4714 	     "total %u; compression: %u.%u%%\n",
4715 			direction,
4716 			c->bytes[1], c->packets[1],
4717 			c->bytes[0], c->packets[0],
4718 			total, r/10, r % 10);
4719 }
4720 
4721 /* Since we are processing the bitfield from lower addresses to higher,
4722    it does not matter if the process it in 32 bit chunks or 64 bit
4723    chunks as long as it is little endian. (Understand it as byte stream,
4724    beginning with the lowest byte...) If we would use big endian
4725    we would need to process it from the highest address to the lowest,
4726    in order to be agnostic to the 32 vs 64 bits issue.
4727 
4728    returns 0 on failure, 1 if we successfully received it. */
receive_bitmap(struct drbd_connection * connection,struct packet_info * pi)4729 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4730 {
4731 	struct drbd_peer_device *peer_device;
4732 	struct drbd_device *device;
4733 	struct bm_xfer_ctx c;
4734 	int err;
4735 
4736 	peer_device = conn_peer_device(connection, pi->vnr);
4737 	if (!peer_device)
4738 		return -EIO;
4739 	device = peer_device->device;
4740 
4741 	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4742 	/* you are supposed to send additional out-of-sync information
4743 	 * if you actually set bits during this phase */
4744 
4745 	c = (struct bm_xfer_ctx) {
4746 		.bm_bits = drbd_bm_bits(device),
4747 		.bm_words = drbd_bm_words(device),
4748 	};
4749 
4750 	for(;;) {
4751 		if (pi->cmd == P_BITMAP)
4752 			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4753 		else if (pi->cmd == P_COMPRESSED_BITMAP) {
4754 			/* MAYBE: sanity check that we speak proto >= 90,
4755 			 * and the feature is enabled! */
4756 			struct p_compressed_bm *p = pi->data;
4757 
4758 			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4759 				drbd_err(device, "ReportCBitmap packet too large\n");
4760 				err = -EIO;
4761 				goto out;
4762 			}
4763 			if (pi->size <= sizeof(*p)) {
4764 				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4765 				err = -EIO;
4766 				goto out;
4767 			}
4768 			err = drbd_recv_all(peer_device->connection, p, pi->size);
4769 			if (err)
4770 			       goto out;
4771 			err = decode_bitmap_c(peer_device, p, &c, pi->size);
4772 		} else {
4773 			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4774 			err = -EIO;
4775 			goto out;
4776 		}
4777 
4778 		c.packets[pi->cmd == P_BITMAP]++;
4779 		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4780 
4781 		if (err <= 0) {
4782 			if (err < 0)
4783 				goto out;
4784 			break;
4785 		}
4786 		err = drbd_recv_header(peer_device->connection, pi);
4787 		if (err)
4788 			goto out;
4789 	}
4790 
4791 	INFO_bm_xfer_stats(device, "receive", &c);
4792 
4793 	if (device->state.conn == C_WF_BITMAP_T) {
4794 		enum drbd_state_rv rv;
4795 
4796 		err = drbd_send_bitmap(device);
4797 		if (err)
4798 			goto out;
4799 		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4800 		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4801 		D_ASSERT(device, rv == SS_SUCCESS);
4802 	} else if (device->state.conn != C_WF_BITMAP_S) {
4803 		/* admin may have requested C_DISCONNECTING,
4804 		 * other threads may have noticed network errors */
4805 		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4806 		    drbd_conn_str(device->state.conn));
4807 	}
4808 	err = 0;
4809 
4810  out:
4811 	drbd_bm_unlock(device);
4812 	if (!err && device->state.conn == C_WF_BITMAP_S)
4813 		drbd_start_resync(device, C_SYNC_SOURCE);
4814 	return err;
4815 }
4816 
receive_skip(struct drbd_connection * connection,struct packet_info * pi)4817 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4818 {
4819 	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4820 		 pi->cmd, pi->size);
4821 
4822 	return ignore_remaining_packet(connection, pi);
4823 }
4824 
receive_UnplugRemote(struct drbd_connection * connection,struct packet_info * pi)4825 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4826 {
4827 	/* Make sure we've acked all the TCP data associated
4828 	 * with the data requests being unplugged */
4829 	drbd_tcp_quickack(connection->data.socket);
4830 
4831 	return 0;
4832 }
4833 
receive_out_of_sync(struct drbd_connection * connection,struct packet_info * pi)4834 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4835 {
4836 	struct drbd_peer_device *peer_device;
4837 	struct drbd_device *device;
4838 	struct p_block_desc *p = pi->data;
4839 
4840 	peer_device = conn_peer_device(connection, pi->vnr);
4841 	if (!peer_device)
4842 		return -EIO;
4843 	device = peer_device->device;
4844 
4845 	switch (device->state.conn) {
4846 	case C_WF_SYNC_UUID:
4847 	case C_WF_BITMAP_T:
4848 	case C_BEHIND:
4849 			break;
4850 	default:
4851 		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4852 				drbd_conn_str(device->state.conn));
4853 	}
4854 
4855 	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4856 
4857 	return 0;
4858 }
4859 
receive_rs_deallocated(struct drbd_connection * connection,struct packet_info * pi)4860 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4861 {
4862 	struct drbd_peer_device *peer_device;
4863 	struct p_block_desc *p = pi->data;
4864 	struct drbd_device *device;
4865 	sector_t sector;
4866 	int size, err = 0;
4867 
4868 	peer_device = conn_peer_device(connection, pi->vnr);
4869 	if (!peer_device)
4870 		return -EIO;
4871 	device = peer_device->device;
4872 
4873 	sector = be64_to_cpu(p->sector);
4874 	size = be32_to_cpu(p->blksize);
4875 
4876 	dec_rs_pending(device);
4877 
4878 	if (get_ldev(device)) {
4879 		struct drbd_peer_request *peer_req;
4880 		const int op = REQ_OP_WRITE_ZEROES;
4881 
4882 		peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4883 					       size, 0, GFP_NOIO);
4884 		if (!peer_req) {
4885 			put_ldev(device);
4886 			return -ENOMEM;
4887 		}
4888 
4889 		peer_req->w.cb = e_end_resync_block;
4890 		peer_req->submit_jif = jiffies;
4891 		peer_req->flags |= EE_IS_TRIM;
4892 
4893 		spin_lock_irq(&device->resource->req_lock);
4894 		list_add_tail(&peer_req->w.list, &device->sync_ee);
4895 		spin_unlock_irq(&device->resource->req_lock);
4896 
4897 		atomic_add(pi->size >> 9, &device->rs_sect_ev);
4898 		err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4899 
4900 		if (err) {
4901 			spin_lock_irq(&device->resource->req_lock);
4902 			list_del(&peer_req->w.list);
4903 			spin_unlock_irq(&device->resource->req_lock);
4904 
4905 			drbd_free_peer_req(device, peer_req);
4906 			put_ldev(device);
4907 			err = 0;
4908 			goto fail;
4909 		}
4910 
4911 		inc_unacked(device);
4912 
4913 		/* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4914 		   as well as drbd_rs_complete_io() */
4915 	} else {
4916 	fail:
4917 		drbd_rs_complete_io(device, sector);
4918 		drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4919 	}
4920 
4921 	atomic_add(size >> 9, &device->rs_sect_in);
4922 
4923 	return err;
4924 }
4925 
4926 struct data_cmd {
4927 	int expect_payload;
4928 	unsigned int pkt_size;
4929 	int (*fn)(struct drbd_connection *, struct packet_info *);
4930 };
4931 
4932 static struct data_cmd drbd_cmd_handler[] = {
4933 	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
4934 	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
4935 	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4936 	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4937 	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
4938 	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4939 	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4940 	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4941 	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4942 	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
4943 	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4944 	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4945 	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
4946 	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
4947 	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
4948 	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4949 	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4950 	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4951 	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4952 	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4953 	[P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954 	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4955 	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4956 	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4957 	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4958 	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
4959 	[P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4960 	[P_WSAME]	    = { 1, sizeof(struct p_wsame), receive_Data },
4961 };
4962 
drbdd(struct drbd_connection * connection)4963 static void drbdd(struct drbd_connection *connection)
4964 {
4965 	struct packet_info pi;
4966 	size_t shs; /* sub header size */
4967 	int err;
4968 
4969 	while (get_t_state(&connection->receiver) == RUNNING) {
4970 		struct data_cmd const *cmd;
4971 
4972 		drbd_thread_current_set_cpu(&connection->receiver);
4973 		update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
4974 		if (drbd_recv_header_maybe_unplug(connection, &pi))
4975 			goto err_out;
4976 
4977 		cmd = &drbd_cmd_handler[pi.cmd];
4978 		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4979 			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4980 				 cmdname(pi.cmd), pi.cmd);
4981 			goto err_out;
4982 		}
4983 
4984 		shs = cmd->pkt_size;
4985 		if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4986 			shs += sizeof(struct o_qlim);
4987 		if (pi.size > shs && !cmd->expect_payload) {
4988 			drbd_err(connection, "No payload expected %s l:%d\n",
4989 				 cmdname(pi.cmd), pi.size);
4990 			goto err_out;
4991 		}
4992 		if (pi.size < shs) {
4993 			drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
4994 				 cmdname(pi.cmd), (int)shs, pi.size);
4995 			goto err_out;
4996 		}
4997 
4998 		if (shs) {
4999 			update_receiver_timing_details(connection, drbd_recv_all_warn);
5000 			err = drbd_recv_all_warn(connection, pi.data, shs);
5001 			if (err)
5002 				goto err_out;
5003 			pi.size -= shs;
5004 		}
5005 
5006 		update_receiver_timing_details(connection, cmd->fn);
5007 		err = cmd->fn(connection, &pi);
5008 		if (err) {
5009 			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5010 				 cmdname(pi.cmd), err, pi.size);
5011 			goto err_out;
5012 		}
5013 	}
5014 	return;
5015 
5016     err_out:
5017 	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5018 }
5019 
conn_disconnect(struct drbd_connection * connection)5020 static void conn_disconnect(struct drbd_connection *connection)
5021 {
5022 	struct drbd_peer_device *peer_device;
5023 	enum drbd_conns oc;
5024 	int vnr;
5025 
5026 	if (connection->cstate == C_STANDALONE)
5027 		return;
5028 
5029 	/* We are about to start the cleanup after connection loss.
5030 	 * Make sure drbd_make_request knows about that.
5031 	 * Usually we should be in some network failure state already,
5032 	 * but just in case we are not, we fix it up here.
5033 	 */
5034 	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5035 
5036 	/* ack_receiver does not clean up anything. it must not interfere, either */
5037 	drbd_thread_stop(&connection->ack_receiver);
5038 	if (connection->ack_sender) {
5039 		destroy_workqueue(connection->ack_sender);
5040 		connection->ack_sender = NULL;
5041 	}
5042 	drbd_free_sock(connection);
5043 
5044 	rcu_read_lock();
5045 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5046 		struct drbd_device *device = peer_device->device;
5047 		kref_get(&device->kref);
5048 		rcu_read_unlock();
5049 		drbd_disconnected(peer_device);
5050 		kref_put(&device->kref, drbd_destroy_device);
5051 		rcu_read_lock();
5052 	}
5053 	rcu_read_unlock();
5054 
5055 	if (!list_empty(&connection->current_epoch->list))
5056 		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5057 	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5058 	atomic_set(&connection->current_epoch->epoch_size, 0);
5059 	connection->send.seen_any_write_yet = false;
5060 
5061 	drbd_info(connection, "Connection closed\n");
5062 
5063 	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5064 		conn_try_outdate_peer_async(connection);
5065 
5066 	spin_lock_irq(&connection->resource->req_lock);
5067 	oc = connection->cstate;
5068 	if (oc >= C_UNCONNECTED)
5069 		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5070 
5071 	spin_unlock_irq(&connection->resource->req_lock);
5072 
5073 	if (oc == C_DISCONNECTING)
5074 		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5075 }
5076 
drbd_disconnected(struct drbd_peer_device * peer_device)5077 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5078 {
5079 	struct drbd_device *device = peer_device->device;
5080 	unsigned int i;
5081 
5082 	/* wait for current activity to cease. */
5083 	spin_lock_irq(&device->resource->req_lock);
5084 	_drbd_wait_ee_list_empty(device, &device->active_ee);
5085 	_drbd_wait_ee_list_empty(device, &device->sync_ee);
5086 	_drbd_wait_ee_list_empty(device, &device->read_ee);
5087 	spin_unlock_irq(&device->resource->req_lock);
5088 
5089 	/* We do not have data structures that would allow us to
5090 	 * get the rs_pending_cnt down to 0 again.
5091 	 *  * On C_SYNC_TARGET we do not have any data structures describing
5092 	 *    the pending RSDataRequest's we have sent.
5093 	 *  * On C_SYNC_SOURCE there is no data structure that tracks
5094 	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5095 	 *  And no, it is not the sum of the reference counts in the
5096 	 *  resync_LRU. The resync_LRU tracks the whole operation including
5097 	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5098 	 *  on the fly. */
5099 	drbd_rs_cancel_all(device);
5100 	device->rs_total = 0;
5101 	device->rs_failed = 0;
5102 	atomic_set(&device->rs_pending_cnt, 0);
5103 	wake_up(&device->misc_wait);
5104 
5105 	del_timer_sync(&device->resync_timer);
5106 	resync_timer_fn((unsigned long)device);
5107 
5108 	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5109 	 * w_make_resync_request etc. which may still be on the worker queue
5110 	 * to be "canceled" */
5111 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5112 
5113 	drbd_finish_peer_reqs(device);
5114 
5115 	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5116 	   might have issued a work again. The one before drbd_finish_peer_reqs() is
5117 	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5118 	drbd_flush_workqueue(&peer_device->connection->sender_work);
5119 
5120 	/* need to do it again, drbd_finish_peer_reqs() may have populated it
5121 	 * again via drbd_try_clear_on_disk_bm(). */
5122 	drbd_rs_cancel_all(device);
5123 
5124 	kfree(device->p_uuid);
5125 	device->p_uuid = NULL;
5126 
5127 	if (!drbd_suspended(device))
5128 		tl_clear(peer_device->connection);
5129 
5130 	drbd_md_sync(device);
5131 
5132 	if (get_ldev(device)) {
5133 		drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5134 				"write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5135 		put_ldev(device);
5136 	}
5137 
5138 	/* tcp_close and release of sendpage pages can be deferred.  I don't
5139 	 * want to use SO_LINGER, because apparently it can be deferred for
5140 	 * more than 20 seconds (longest time I checked).
5141 	 *
5142 	 * Actually we don't care for exactly when the network stack does its
5143 	 * put_page(), but release our reference on these pages right here.
5144 	 */
5145 	i = drbd_free_peer_reqs(device, &device->net_ee);
5146 	if (i)
5147 		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5148 	i = atomic_read(&device->pp_in_use_by_net);
5149 	if (i)
5150 		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5151 	i = atomic_read(&device->pp_in_use);
5152 	if (i)
5153 		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5154 
5155 	D_ASSERT(device, list_empty(&device->read_ee));
5156 	D_ASSERT(device, list_empty(&device->active_ee));
5157 	D_ASSERT(device, list_empty(&device->sync_ee));
5158 	D_ASSERT(device, list_empty(&device->done_ee));
5159 
5160 	return 0;
5161 }
5162 
5163 /*
5164  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5165  * we can agree on is stored in agreed_pro_version.
5166  *
5167  * feature flags and the reserved array should be enough room for future
5168  * enhancements of the handshake protocol, and possible plugins...
5169  *
5170  * for now, they are expected to be zero, but ignored.
5171  */
drbd_send_features(struct drbd_connection * connection)5172 static int drbd_send_features(struct drbd_connection *connection)
5173 {
5174 	struct drbd_socket *sock;
5175 	struct p_connection_features *p;
5176 
5177 	sock = &connection->data;
5178 	p = conn_prepare_command(connection, sock);
5179 	if (!p)
5180 		return -EIO;
5181 	memset(p, 0, sizeof(*p));
5182 	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5183 	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5184 	p->feature_flags = cpu_to_be32(PRO_FEATURES);
5185 	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5186 }
5187 
5188 /*
5189  * return values:
5190  *   1 yes, we have a valid connection
5191  *   0 oops, did not work out, please try again
5192  *  -1 peer talks different language,
5193  *     no point in trying again, please go standalone.
5194  */
drbd_do_features(struct drbd_connection * connection)5195 static int drbd_do_features(struct drbd_connection *connection)
5196 {
5197 	/* ASSERT current == connection->receiver ... */
5198 	struct p_connection_features *p;
5199 	const int expect = sizeof(struct p_connection_features);
5200 	struct packet_info pi;
5201 	int err;
5202 
5203 	err = drbd_send_features(connection);
5204 	if (err)
5205 		return 0;
5206 
5207 	err = drbd_recv_header(connection, &pi);
5208 	if (err)
5209 		return 0;
5210 
5211 	if (pi.cmd != P_CONNECTION_FEATURES) {
5212 		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5213 			 cmdname(pi.cmd), pi.cmd);
5214 		return -1;
5215 	}
5216 
5217 	if (pi.size != expect) {
5218 		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5219 		     expect, pi.size);
5220 		return -1;
5221 	}
5222 
5223 	p = pi.data;
5224 	err = drbd_recv_all_warn(connection, p, expect);
5225 	if (err)
5226 		return 0;
5227 
5228 	p->protocol_min = be32_to_cpu(p->protocol_min);
5229 	p->protocol_max = be32_to_cpu(p->protocol_max);
5230 	if (p->protocol_max == 0)
5231 		p->protocol_max = p->protocol_min;
5232 
5233 	if (PRO_VERSION_MAX < p->protocol_min ||
5234 	    PRO_VERSION_MIN > p->protocol_max)
5235 		goto incompat;
5236 
5237 	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5238 	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5239 
5240 	drbd_info(connection, "Handshake successful: "
5241 	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
5242 
5243 	drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5244 		  connection->agreed_features,
5245 		  connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5246 		  connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5247 		  connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5248 		  connection->agreed_features ? "" : " none");
5249 
5250 	return 1;
5251 
5252  incompat:
5253 	drbd_err(connection, "incompatible DRBD dialects: "
5254 	    "I support %d-%d, peer supports %d-%d\n",
5255 	    PRO_VERSION_MIN, PRO_VERSION_MAX,
5256 	    p->protocol_min, p->protocol_max);
5257 	return -1;
5258 }
5259 
5260 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
drbd_do_auth(struct drbd_connection * connection)5261 static int drbd_do_auth(struct drbd_connection *connection)
5262 {
5263 	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5264 	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5265 	return -1;
5266 }
5267 #else
5268 #define CHALLENGE_LEN 64
5269 
5270 /* Return value:
5271 	1 - auth succeeded,
5272 	0 - failed, try again (network error),
5273 	-1 - auth failed, don't try again.
5274 */
5275 
drbd_do_auth(struct drbd_connection * connection)5276 static int drbd_do_auth(struct drbd_connection *connection)
5277 {
5278 	struct drbd_socket *sock;
5279 	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5280 	char *response = NULL;
5281 	char *right_response = NULL;
5282 	char *peers_ch = NULL;
5283 	unsigned int key_len;
5284 	char secret[SHARED_SECRET_MAX]; /* 64 byte */
5285 	unsigned int resp_size;
5286 	struct shash_desc *desc;
5287 	struct packet_info pi;
5288 	struct net_conf *nc;
5289 	int err, rv;
5290 
5291 	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5292 
5293 	rcu_read_lock();
5294 	nc = rcu_dereference(connection->net_conf);
5295 	key_len = strlen(nc->shared_secret);
5296 	memcpy(secret, nc->shared_secret, key_len);
5297 	rcu_read_unlock();
5298 
5299 	desc = kmalloc(sizeof(struct shash_desc) +
5300 		       crypto_shash_descsize(connection->cram_hmac_tfm),
5301 		       GFP_KERNEL);
5302 	if (!desc) {
5303 		rv = -1;
5304 		goto fail;
5305 	}
5306 	desc->tfm = connection->cram_hmac_tfm;
5307 	desc->flags = 0;
5308 
5309 	rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5310 	if (rv) {
5311 		drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5312 		rv = -1;
5313 		goto fail;
5314 	}
5315 
5316 	get_random_bytes(my_challenge, CHALLENGE_LEN);
5317 
5318 	sock = &connection->data;
5319 	if (!conn_prepare_command(connection, sock)) {
5320 		rv = 0;
5321 		goto fail;
5322 	}
5323 	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5324 				my_challenge, CHALLENGE_LEN);
5325 	if (!rv)
5326 		goto fail;
5327 
5328 	err = drbd_recv_header(connection, &pi);
5329 	if (err) {
5330 		rv = 0;
5331 		goto fail;
5332 	}
5333 
5334 	if (pi.cmd != P_AUTH_CHALLENGE) {
5335 		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5336 			 cmdname(pi.cmd), pi.cmd);
5337 		rv = 0;
5338 		goto fail;
5339 	}
5340 
5341 	if (pi.size > CHALLENGE_LEN * 2) {
5342 		drbd_err(connection, "expected AuthChallenge payload too big.\n");
5343 		rv = -1;
5344 		goto fail;
5345 	}
5346 
5347 	if (pi.size < CHALLENGE_LEN) {
5348 		drbd_err(connection, "AuthChallenge payload too small.\n");
5349 		rv = -1;
5350 		goto fail;
5351 	}
5352 
5353 	peers_ch = kmalloc(pi.size, GFP_NOIO);
5354 	if (peers_ch == NULL) {
5355 		drbd_err(connection, "kmalloc of peers_ch failed\n");
5356 		rv = -1;
5357 		goto fail;
5358 	}
5359 
5360 	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5361 	if (err) {
5362 		rv = 0;
5363 		goto fail;
5364 	}
5365 
5366 	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5367 		drbd_err(connection, "Peer presented the same challenge!\n");
5368 		rv = -1;
5369 		goto fail;
5370 	}
5371 
5372 	resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5373 	response = kmalloc(resp_size, GFP_NOIO);
5374 	if (response == NULL) {
5375 		drbd_err(connection, "kmalloc of response failed\n");
5376 		rv = -1;
5377 		goto fail;
5378 	}
5379 
5380 	rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5381 	if (rv) {
5382 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5383 		rv = -1;
5384 		goto fail;
5385 	}
5386 
5387 	if (!conn_prepare_command(connection, sock)) {
5388 		rv = 0;
5389 		goto fail;
5390 	}
5391 	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5392 				response, resp_size);
5393 	if (!rv)
5394 		goto fail;
5395 
5396 	err = drbd_recv_header(connection, &pi);
5397 	if (err) {
5398 		rv = 0;
5399 		goto fail;
5400 	}
5401 
5402 	if (pi.cmd != P_AUTH_RESPONSE) {
5403 		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5404 			 cmdname(pi.cmd), pi.cmd);
5405 		rv = 0;
5406 		goto fail;
5407 	}
5408 
5409 	if (pi.size != resp_size) {
5410 		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5411 		rv = 0;
5412 		goto fail;
5413 	}
5414 
5415 	err = drbd_recv_all_warn(connection, response , resp_size);
5416 	if (err) {
5417 		rv = 0;
5418 		goto fail;
5419 	}
5420 
5421 	right_response = kmalloc(resp_size, GFP_NOIO);
5422 	if (right_response == NULL) {
5423 		drbd_err(connection, "kmalloc of right_response failed\n");
5424 		rv = -1;
5425 		goto fail;
5426 	}
5427 
5428 	rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5429 				 right_response);
5430 	if (rv) {
5431 		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5432 		rv = -1;
5433 		goto fail;
5434 	}
5435 
5436 	rv = !memcmp(response, right_response, resp_size);
5437 
5438 	if (rv)
5439 		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5440 		     resp_size);
5441 	else
5442 		rv = -1;
5443 
5444  fail:
5445 	kfree(peers_ch);
5446 	kfree(response);
5447 	kfree(right_response);
5448 	if (desc) {
5449 		shash_desc_zero(desc);
5450 		kfree(desc);
5451 	}
5452 
5453 	return rv;
5454 }
5455 #endif
5456 
drbd_receiver(struct drbd_thread * thi)5457 int drbd_receiver(struct drbd_thread *thi)
5458 {
5459 	struct drbd_connection *connection = thi->connection;
5460 	int h;
5461 
5462 	drbd_info(connection, "receiver (re)started\n");
5463 
5464 	do {
5465 		h = conn_connect(connection);
5466 		if (h == 0) {
5467 			conn_disconnect(connection);
5468 			schedule_timeout_interruptible(HZ);
5469 		}
5470 		if (h == -1) {
5471 			drbd_warn(connection, "Discarding network configuration.\n");
5472 			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5473 		}
5474 	} while (h == 0);
5475 
5476 	if (h > 0) {
5477 		blk_start_plug(&connection->receiver_plug);
5478 		drbdd(connection);
5479 		blk_finish_plug(&connection->receiver_plug);
5480 	}
5481 
5482 	conn_disconnect(connection);
5483 
5484 	drbd_info(connection, "receiver terminated\n");
5485 	return 0;
5486 }
5487 
5488 /* ********* acknowledge sender ******** */
5489 
got_conn_RqSReply(struct drbd_connection * connection,struct packet_info * pi)5490 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5491 {
5492 	struct p_req_state_reply *p = pi->data;
5493 	int retcode = be32_to_cpu(p->retcode);
5494 
5495 	if (retcode >= SS_SUCCESS) {
5496 		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5497 	} else {
5498 		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5499 		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5500 			 drbd_set_st_err_str(retcode), retcode);
5501 	}
5502 	wake_up(&connection->ping_wait);
5503 
5504 	return 0;
5505 }
5506 
got_RqSReply(struct drbd_connection * connection,struct packet_info * pi)5507 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5508 {
5509 	struct drbd_peer_device *peer_device;
5510 	struct drbd_device *device;
5511 	struct p_req_state_reply *p = pi->data;
5512 	int retcode = be32_to_cpu(p->retcode);
5513 
5514 	peer_device = conn_peer_device(connection, pi->vnr);
5515 	if (!peer_device)
5516 		return -EIO;
5517 	device = peer_device->device;
5518 
5519 	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5520 		D_ASSERT(device, connection->agreed_pro_version < 100);
5521 		return got_conn_RqSReply(connection, pi);
5522 	}
5523 
5524 	if (retcode >= SS_SUCCESS) {
5525 		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5526 	} else {
5527 		set_bit(CL_ST_CHG_FAIL, &device->flags);
5528 		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5529 			drbd_set_st_err_str(retcode), retcode);
5530 	}
5531 	wake_up(&device->state_wait);
5532 
5533 	return 0;
5534 }
5535 
got_Ping(struct drbd_connection * connection,struct packet_info * pi)5536 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5537 {
5538 	return drbd_send_ping_ack(connection);
5539 
5540 }
5541 
got_PingAck(struct drbd_connection * connection,struct packet_info * pi)5542 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5543 {
5544 	/* restore idle timeout */
5545 	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5546 	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5547 		wake_up(&connection->ping_wait);
5548 
5549 	return 0;
5550 }
5551 
got_IsInSync(struct drbd_connection * connection,struct packet_info * pi)5552 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5553 {
5554 	struct drbd_peer_device *peer_device;
5555 	struct drbd_device *device;
5556 	struct p_block_ack *p = pi->data;
5557 	sector_t sector = be64_to_cpu(p->sector);
5558 	int blksize = be32_to_cpu(p->blksize);
5559 
5560 	peer_device = conn_peer_device(connection, pi->vnr);
5561 	if (!peer_device)
5562 		return -EIO;
5563 	device = peer_device->device;
5564 
5565 	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5566 
5567 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5568 
5569 	if (get_ldev(device)) {
5570 		drbd_rs_complete_io(device, sector);
5571 		drbd_set_in_sync(device, sector, blksize);
5572 		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5573 		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5574 		put_ldev(device);
5575 	}
5576 	dec_rs_pending(device);
5577 	atomic_add(blksize >> 9, &device->rs_sect_in);
5578 
5579 	return 0;
5580 }
5581 
5582 static int
validate_req_change_req_state(struct drbd_device * device,u64 id,sector_t sector,struct rb_root * root,const char * func,enum drbd_req_event what,bool missing_ok)5583 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5584 			      struct rb_root *root, const char *func,
5585 			      enum drbd_req_event what, bool missing_ok)
5586 {
5587 	struct drbd_request *req;
5588 	struct bio_and_error m;
5589 
5590 	spin_lock_irq(&device->resource->req_lock);
5591 	req = find_request(device, root, id, sector, missing_ok, func);
5592 	if (unlikely(!req)) {
5593 		spin_unlock_irq(&device->resource->req_lock);
5594 		return -EIO;
5595 	}
5596 	__req_mod(req, what, &m);
5597 	spin_unlock_irq(&device->resource->req_lock);
5598 
5599 	if (m.bio)
5600 		complete_master_bio(device, &m);
5601 	return 0;
5602 }
5603 
got_BlockAck(struct drbd_connection * connection,struct packet_info * pi)5604 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5605 {
5606 	struct drbd_peer_device *peer_device;
5607 	struct drbd_device *device;
5608 	struct p_block_ack *p = pi->data;
5609 	sector_t sector = be64_to_cpu(p->sector);
5610 	int blksize = be32_to_cpu(p->blksize);
5611 	enum drbd_req_event what;
5612 
5613 	peer_device = conn_peer_device(connection, pi->vnr);
5614 	if (!peer_device)
5615 		return -EIO;
5616 	device = peer_device->device;
5617 
5618 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5619 
5620 	if (p->block_id == ID_SYNCER) {
5621 		drbd_set_in_sync(device, sector, blksize);
5622 		dec_rs_pending(device);
5623 		return 0;
5624 	}
5625 	switch (pi->cmd) {
5626 	case P_RS_WRITE_ACK:
5627 		what = WRITE_ACKED_BY_PEER_AND_SIS;
5628 		break;
5629 	case P_WRITE_ACK:
5630 		what = WRITE_ACKED_BY_PEER;
5631 		break;
5632 	case P_RECV_ACK:
5633 		what = RECV_ACKED_BY_PEER;
5634 		break;
5635 	case P_SUPERSEDED:
5636 		what = CONFLICT_RESOLVED;
5637 		break;
5638 	case P_RETRY_WRITE:
5639 		what = POSTPONE_WRITE;
5640 		break;
5641 	default:
5642 		BUG();
5643 	}
5644 
5645 	return validate_req_change_req_state(device, p->block_id, sector,
5646 					     &device->write_requests, __func__,
5647 					     what, false);
5648 }
5649 
got_NegAck(struct drbd_connection * connection,struct packet_info * pi)5650 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5651 {
5652 	struct drbd_peer_device *peer_device;
5653 	struct drbd_device *device;
5654 	struct p_block_ack *p = pi->data;
5655 	sector_t sector = be64_to_cpu(p->sector);
5656 	int size = be32_to_cpu(p->blksize);
5657 	int err;
5658 
5659 	peer_device = conn_peer_device(connection, pi->vnr);
5660 	if (!peer_device)
5661 		return -EIO;
5662 	device = peer_device->device;
5663 
5664 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5665 
5666 	if (p->block_id == ID_SYNCER) {
5667 		dec_rs_pending(device);
5668 		drbd_rs_failed_io(device, sector, size);
5669 		return 0;
5670 	}
5671 
5672 	err = validate_req_change_req_state(device, p->block_id, sector,
5673 					    &device->write_requests, __func__,
5674 					    NEG_ACKED, true);
5675 	if (err) {
5676 		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5677 		   The master bio might already be completed, therefore the
5678 		   request is no longer in the collision hash. */
5679 		/* In Protocol B we might already have got a P_RECV_ACK
5680 		   but then get a P_NEG_ACK afterwards. */
5681 		drbd_set_out_of_sync(device, sector, size);
5682 	}
5683 	return 0;
5684 }
5685 
got_NegDReply(struct drbd_connection * connection,struct packet_info * pi)5686 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5687 {
5688 	struct drbd_peer_device *peer_device;
5689 	struct drbd_device *device;
5690 	struct p_block_ack *p = pi->data;
5691 	sector_t sector = be64_to_cpu(p->sector);
5692 
5693 	peer_device = conn_peer_device(connection, pi->vnr);
5694 	if (!peer_device)
5695 		return -EIO;
5696 	device = peer_device->device;
5697 
5698 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5699 
5700 	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5701 	    (unsigned long long)sector, be32_to_cpu(p->blksize));
5702 
5703 	return validate_req_change_req_state(device, p->block_id, sector,
5704 					     &device->read_requests, __func__,
5705 					     NEG_ACKED, false);
5706 }
5707 
got_NegRSDReply(struct drbd_connection * connection,struct packet_info * pi)5708 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5709 {
5710 	struct drbd_peer_device *peer_device;
5711 	struct drbd_device *device;
5712 	sector_t sector;
5713 	int size;
5714 	struct p_block_ack *p = pi->data;
5715 
5716 	peer_device = conn_peer_device(connection, pi->vnr);
5717 	if (!peer_device)
5718 		return -EIO;
5719 	device = peer_device->device;
5720 
5721 	sector = be64_to_cpu(p->sector);
5722 	size = be32_to_cpu(p->blksize);
5723 
5724 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5725 
5726 	dec_rs_pending(device);
5727 
5728 	if (get_ldev_if_state(device, D_FAILED)) {
5729 		drbd_rs_complete_io(device, sector);
5730 		switch (pi->cmd) {
5731 		case P_NEG_RS_DREPLY:
5732 			drbd_rs_failed_io(device, sector, size);
5733 		case P_RS_CANCEL:
5734 			break;
5735 		default:
5736 			BUG();
5737 		}
5738 		put_ldev(device);
5739 	}
5740 
5741 	return 0;
5742 }
5743 
got_BarrierAck(struct drbd_connection * connection,struct packet_info * pi)5744 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5745 {
5746 	struct p_barrier_ack *p = pi->data;
5747 	struct drbd_peer_device *peer_device;
5748 	int vnr;
5749 
5750 	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5751 
5752 	rcu_read_lock();
5753 	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5754 		struct drbd_device *device = peer_device->device;
5755 
5756 		if (device->state.conn == C_AHEAD &&
5757 		    atomic_read(&device->ap_in_flight) == 0 &&
5758 		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5759 			device->start_resync_timer.expires = jiffies + HZ;
5760 			add_timer(&device->start_resync_timer);
5761 		}
5762 	}
5763 	rcu_read_unlock();
5764 
5765 	return 0;
5766 }
5767 
got_OVResult(struct drbd_connection * connection,struct packet_info * pi)5768 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5769 {
5770 	struct drbd_peer_device *peer_device;
5771 	struct drbd_device *device;
5772 	struct p_block_ack *p = pi->data;
5773 	struct drbd_device_work *dw;
5774 	sector_t sector;
5775 	int size;
5776 
5777 	peer_device = conn_peer_device(connection, pi->vnr);
5778 	if (!peer_device)
5779 		return -EIO;
5780 	device = peer_device->device;
5781 
5782 	sector = be64_to_cpu(p->sector);
5783 	size = be32_to_cpu(p->blksize);
5784 
5785 	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5786 
5787 	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5788 		drbd_ov_out_of_sync_found(device, sector, size);
5789 	else
5790 		ov_out_of_sync_print(device);
5791 
5792 	if (!get_ldev(device))
5793 		return 0;
5794 
5795 	drbd_rs_complete_io(device, sector);
5796 	dec_rs_pending(device);
5797 
5798 	--device->ov_left;
5799 
5800 	/* let's advance progress step marks only for every other megabyte */
5801 	if ((device->ov_left & 0x200) == 0x200)
5802 		drbd_advance_rs_marks(device, device->ov_left);
5803 
5804 	if (device->ov_left == 0) {
5805 		dw = kmalloc(sizeof(*dw), GFP_NOIO);
5806 		if (dw) {
5807 			dw->w.cb = w_ov_finished;
5808 			dw->device = device;
5809 			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5810 		} else {
5811 			drbd_err(device, "kmalloc(dw) failed.");
5812 			ov_out_of_sync_print(device);
5813 			drbd_resync_finished(device);
5814 		}
5815 	}
5816 	put_ldev(device);
5817 	return 0;
5818 }
5819 
got_skip(struct drbd_connection * connection,struct packet_info * pi)5820 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5821 {
5822 	return 0;
5823 }
5824 
5825 struct meta_sock_cmd {
5826 	size_t pkt_size;
5827 	int (*fn)(struct drbd_connection *connection, struct packet_info *);
5828 };
5829 
set_rcvtimeo(struct drbd_connection * connection,bool ping_timeout)5830 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5831 {
5832 	long t;
5833 	struct net_conf *nc;
5834 
5835 	rcu_read_lock();
5836 	nc = rcu_dereference(connection->net_conf);
5837 	t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5838 	rcu_read_unlock();
5839 
5840 	t *= HZ;
5841 	if (ping_timeout)
5842 		t /= 10;
5843 
5844 	connection->meta.socket->sk->sk_rcvtimeo = t;
5845 }
5846 
set_ping_timeout(struct drbd_connection * connection)5847 static void set_ping_timeout(struct drbd_connection *connection)
5848 {
5849 	set_rcvtimeo(connection, 1);
5850 }
5851 
set_idle_timeout(struct drbd_connection * connection)5852 static void set_idle_timeout(struct drbd_connection *connection)
5853 {
5854 	set_rcvtimeo(connection, 0);
5855 }
5856 
5857 static struct meta_sock_cmd ack_receiver_tbl[] = {
5858 	[P_PING]	    = { 0, got_Ping },
5859 	[P_PING_ACK]	    = { 0, got_PingAck },
5860 	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5861 	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
5862 	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5863 	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5864 	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
5865 	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5866 	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5867 	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
5868 	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
5869 	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5870 	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5871 	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5872 	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5873 	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5874 	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5875 };
5876 
drbd_ack_receiver(struct drbd_thread * thi)5877 int drbd_ack_receiver(struct drbd_thread *thi)
5878 {
5879 	struct drbd_connection *connection = thi->connection;
5880 	struct meta_sock_cmd *cmd = NULL;
5881 	struct packet_info pi;
5882 	unsigned long pre_recv_jif;
5883 	int rv;
5884 	void *buf    = connection->meta.rbuf;
5885 	int received = 0;
5886 	unsigned int header_size = drbd_header_size(connection);
5887 	int expect   = header_size;
5888 	bool ping_timeout_active = false;
5889 	struct sched_param param = { .sched_priority = 2 };
5890 
5891 	rv = sched_setscheduler(current, SCHED_RR, &param);
5892 	if (rv < 0)
5893 		drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5894 
5895 	while (get_t_state(thi) == RUNNING) {
5896 		drbd_thread_current_set_cpu(thi);
5897 
5898 		conn_reclaim_net_peer_reqs(connection);
5899 
5900 		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5901 			if (drbd_send_ping(connection)) {
5902 				drbd_err(connection, "drbd_send_ping has failed\n");
5903 				goto reconnect;
5904 			}
5905 			set_ping_timeout(connection);
5906 			ping_timeout_active = true;
5907 		}
5908 
5909 		pre_recv_jif = jiffies;
5910 		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5911 
5912 		/* Note:
5913 		 * -EINTR	 (on meta) we got a signal
5914 		 * -EAGAIN	 (on meta) rcvtimeo expired
5915 		 * -ECONNRESET	 other side closed the connection
5916 		 * -ERESTARTSYS  (on data) we got a signal
5917 		 * rv <  0	 other than above: unexpected error!
5918 		 * rv == expected: full header or command
5919 		 * rv <  expected: "woken" by signal during receive
5920 		 * rv == 0	 : "connection shut down by peer"
5921 		 */
5922 		if (likely(rv > 0)) {
5923 			received += rv;
5924 			buf	 += rv;
5925 		} else if (rv == 0) {
5926 			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5927 				long t;
5928 				rcu_read_lock();
5929 				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5930 				rcu_read_unlock();
5931 
5932 				t = wait_event_timeout(connection->ping_wait,
5933 						       connection->cstate < C_WF_REPORT_PARAMS,
5934 						       t);
5935 				if (t)
5936 					break;
5937 			}
5938 			drbd_err(connection, "meta connection shut down by peer.\n");
5939 			goto reconnect;
5940 		} else if (rv == -EAGAIN) {
5941 			/* If the data socket received something meanwhile,
5942 			 * that is good enough: peer is still alive. */
5943 			if (time_after(connection->last_received, pre_recv_jif))
5944 				continue;
5945 			if (ping_timeout_active) {
5946 				drbd_err(connection, "PingAck did not arrive in time.\n");
5947 				goto reconnect;
5948 			}
5949 			set_bit(SEND_PING, &connection->flags);
5950 			continue;
5951 		} else if (rv == -EINTR) {
5952 			/* maybe drbd_thread_stop(): the while condition will notice.
5953 			 * maybe woken for send_ping: we'll send a ping above,
5954 			 * and change the rcvtimeo */
5955 			flush_signals(current);
5956 			continue;
5957 		} else {
5958 			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5959 			goto reconnect;
5960 		}
5961 
5962 		if (received == expect && cmd == NULL) {
5963 			if (decode_header(connection, connection->meta.rbuf, &pi))
5964 				goto reconnect;
5965 			cmd = &ack_receiver_tbl[pi.cmd];
5966 			if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5967 				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5968 					 cmdname(pi.cmd), pi.cmd);
5969 				goto disconnect;
5970 			}
5971 			expect = header_size + cmd->pkt_size;
5972 			if (pi.size != expect - header_size) {
5973 				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5974 					pi.cmd, pi.size);
5975 				goto reconnect;
5976 			}
5977 		}
5978 		if (received == expect) {
5979 			bool err;
5980 
5981 			err = cmd->fn(connection, &pi);
5982 			if (err) {
5983 				drbd_err(connection, "%pf failed\n", cmd->fn);
5984 				goto reconnect;
5985 			}
5986 
5987 			connection->last_received = jiffies;
5988 
5989 			if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5990 				set_idle_timeout(connection);
5991 				ping_timeout_active = false;
5992 			}
5993 
5994 			buf	 = connection->meta.rbuf;
5995 			received = 0;
5996 			expect	 = header_size;
5997 			cmd	 = NULL;
5998 		}
5999 	}
6000 
6001 	if (0) {
6002 reconnect:
6003 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6004 		conn_md_sync(connection);
6005 	}
6006 	if (0) {
6007 disconnect:
6008 		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6009 	}
6010 
6011 	drbd_info(connection, "ack_receiver terminated\n");
6012 
6013 	return 0;
6014 }
6015 
drbd_send_acks_wf(struct work_struct * ws)6016 void drbd_send_acks_wf(struct work_struct *ws)
6017 {
6018 	struct drbd_peer_device *peer_device =
6019 		container_of(ws, struct drbd_peer_device, send_acks_work);
6020 	struct drbd_connection *connection = peer_device->connection;
6021 	struct drbd_device *device = peer_device->device;
6022 	struct net_conf *nc;
6023 	int tcp_cork, err;
6024 
6025 	rcu_read_lock();
6026 	nc = rcu_dereference(connection->net_conf);
6027 	tcp_cork = nc->tcp_cork;
6028 	rcu_read_unlock();
6029 
6030 	if (tcp_cork)
6031 		drbd_tcp_cork(connection->meta.socket);
6032 
6033 	err = drbd_finish_peer_reqs(device);
6034 	kref_put(&device->kref, drbd_destroy_device);
6035 	/* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6036 	   struct work_struct send_acks_work alive, which is in the peer_device object */
6037 
6038 	if (err) {
6039 		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6040 		return;
6041 	}
6042 
6043 	if (tcp_cork)
6044 		drbd_tcp_uncork(connection->meta.socket);
6045 
6046 	return;
6047 }
6048