• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2015, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/selftest/rpc.c
33  *
34  * Author: Isaac Huang <isaac@clusterfs.com>
35  *
36  * 2012-05-13: Liang Zhen <liang@whamcloud.com>
37  * - percpt data for service to improve smp performance
38  * - code cleanup
39  */
40 
41 #define DEBUG_SUBSYSTEM S_LNET
42 
43 #include "selftest.h"
44 
45 enum srpc_state {
46 	SRPC_STATE_NONE,
47 	SRPC_STATE_NI_INIT,
48 	SRPC_STATE_EQ_INIT,
49 	SRPC_STATE_RUNNING,
50 	SRPC_STATE_STOPPING,
51 };
52 
53 static struct smoketest_rpc {
54 	spinlock_t	 rpc_glock;	/* global lock */
55 	struct srpc_service	*rpc_services[SRPC_SERVICE_MAX_ID + 1];
56 	struct lnet_handle_eq	 rpc_lnet_eq;	/* _the_ LNet event queue */
57 	enum srpc_state	 rpc_state;
58 	struct srpc_counters	 rpc_counters;
59 	__u64		 rpc_matchbits;	/* matchbits counter */
60 } srpc_data;
61 
62 static inline int
srpc_serv_portal(int svc_id)63 srpc_serv_portal(int svc_id)
64 {
65 	return svc_id < SRPC_FRAMEWORK_SERVICE_MAX_ID ?
66 	       SRPC_FRAMEWORK_REQUEST_PORTAL : SRPC_REQUEST_PORTAL;
67 }
68 
69 /* forward ref's */
70 int srpc_handle_rpc(struct swi_workitem *wi);
71 
srpc_get_counters(struct srpc_counters * cnt)72 void srpc_get_counters(struct srpc_counters *cnt)
73 {
74 	spin_lock(&srpc_data.rpc_glock);
75 	*cnt = srpc_data.rpc_counters;
76 	spin_unlock(&srpc_data.rpc_glock);
77 }
78 
srpc_set_counters(const struct srpc_counters * cnt)79 void srpc_set_counters(const struct srpc_counters *cnt)
80 {
81 	spin_lock(&srpc_data.rpc_glock);
82 	srpc_data.rpc_counters = *cnt;
83 	spin_unlock(&srpc_data.rpc_glock);
84 }
85 
86 static int
srpc_add_bulk_page(struct srpc_bulk * bk,struct page * pg,int i,int off,int nob)87 srpc_add_bulk_page(struct srpc_bulk *bk, struct page *pg, int i, int off,
88 		   int nob)
89 {
90 	LASSERT(off < PAGE_SIZE);
91 	LASSERT(nob > 0 && nob <= PAGE_SIZE);
92 
93 	bk->bk_iovs[i].bv_offset = off;
94 	bk->bk_iovs[i].bv_page = pg;
95 	bk->bk_iovs[i].bv_len = nob;
96 	return nob;
97 }
98 
99 void
srpc_free_bulk(struct srpc_bulk * bk)100 srpc_free_bulk(struct srpc_bulk *bk)
101 {
102 	int i;
103 	struct page *pg;
104 
105 	LASSERT(bk);
106 
107 	for (i = 0; i < bk->bk_niov; i++) {
108 		pg = bk->bk_iovs[i].bv_page;
109 		if (!pg)
110 			break;
111 
112 		__free_page(pg);
113 	}
114 
115 	LIBCFS_FREE(bk, offsetof(struct srpc_bulk, bk_iovs[bk->bk_niov]));
116 }
117 
118 struct srpc_bulk *
srpc_alloc_bulk(int cpt,unsigned int bulk_off,unsigned int bulk_npg,unsigned int bulk_len,int sink)119 srpc_alloc_bulk(int cpt, unsigned int bulk_off, unsigned int bulk_npg,
120 		unsigned int bulk_len, int sink)
121 {
122 	struct srpc_bulk *bk;
123 	int i;
124 
125 	LASSERT(bulk_npg > 0 && bulk_npg <= LNET_MAX_IOV);
126 
127 	LIBCFS_CPT_ALLOC(bk, lnet_cpt_table(), cpt,
128 			 offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
129 	if (!bk) {
130 		CERROR("Can't allocate descriptor for %d pages\n", bulk_npg);
131 		return NULL;
132 	}
133 
134 	memset(bk, 0, offsetof(struct srpc_bulk, bk_iovs[bulk_npg]));
135 	bk->bk_sink = sink;
136 	bk->bk_len = bulk_len;
137 	bk->bk_niov = bulk_npg;
138 
139 	for (i = 0; i < bulk_npg; i++) {
140 		struct page *pg;
141 		int nob;
142 
143 		pg = alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
144 				      GFP_KERNEL, 0);
145 		if (!pg) {
146 			CERROR("Can't allocate page %d of %d\n", i, bulk_npg);
147 			srpc_free_bulk(bk);
148 			return NULL;
149 		}
150 
151 		nob = min_t(unsigned int, bulk_off + bulk_len, PAGE_SIZE) -
152 		      bulk_off;
153 		srpc_add_bulk_page(bk, pg, i, bulk_off, nob);
154 		bulk_len -= nob;
155 		bulk_off = 0;
156 	}
157 
158 	return bk;
159 }
160 
161 static inline __u64
srpc_next_id(void)162 srpc_next_id(void)
163 {
164 	__u64 id;
165 
166 	spin_lock(&srpc_data.rpc_glock);
167 	id = srpc_data.rpc_matchbits++;
168 	spin_unlock(&srpc_data.rpc_glock);
169 	return id;
170 }
171 
172 static void
srpc_init_server_rpc(struct srpc_server_rpc * rpc,struct srpc_service_cd * scd,struct srpc_buffer * buffer)173 srpc_init_server_rpc(struct srpc_server_rpc *rpc,
174 		     struct srpc_service_cd *scd,
175 		     struct srpc_buffer *buffer)
176 {
177 	memset(rpc, 0, sizeof(*rpc));
178 	swi_init_workitem(&rpc->srpc_wi, rpc, srpc_handle_rpc,
179 			  srpc_serv_is_framework(scd->scd_svc) ?
180 			  lst_sched_serial : lst_sched_test[scd->scd_cpt]);
181 
182 	rpc->srpc_ev.ev_fired = 1; /* no event expected now */
183 
184 	rpc->srpc_scd = scd;
185 	rpc->srpc_reqstbuf = buffer;
186 	rpc->srpc_peer = buffer->buf_peer;
187 	rpc->srpc_self = buffer->buf_self;
188 	LNetInvalidateMDHandle(&rpc->srpc_replymdh);
189 }
190 
191 static void
srpc_service_fini(struct srpc_service * svc)192 srpc_service_fini(struct srpc_service *svc)
193 {
194 	struct srpc_service_cd *scd;
195 	struct srpc_server_rpc *rpc;
196 	struct srpc_buffer *buf;
197 	struct list_head *q;
198 	int i;
199 
200 	if (!svc->sv_cpt_data)
201 		return;
202 
203 	cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
204 		while (1) {
205 			if (!list_empty(&scd->scd_buf_posted))
206 				q = &scd->scd_buf_posted;
207 			else if (!list_empty(&scd->scd_buf_blocked))
208 				q = &scd->scd_buf_blocked;
209 			else
210 				break;
211 
212 			while (!list_empty(q)) {
213 				buf = list_entry(q->next, struct srpc_buffer,
214 						 buf_list);
215 				list_del(&buf->buf_list);
216 				LIBCFS_FREE(buf, sizeof(*buf));
217 			}
218 		}
219 
220 		LASSERT(list_empty(&scd->scd_rpc_active));
221 
222 		while (!list_empty(&scd->scd_rpc_free)) {
223 			rpc = list_entry(scd->scd_rpc_free.next,
224 					 struct srpc_server_rpc,
225 					 srpc_list);
226 			list_del(&rpc->srpc_list);
227 			LIBCFS_FREE(rpc, sizeof(*rpc));
228 		}
229 	}
230 
231 	cfs_percpt_free(svc->sv_cpt_data);
232 	svc->sv_cpt_data = NULL;
233 }
234 
235 static int
srpc_service_nrpcs(struct srpc_service * svc)236 srpc_service_nrpcs(struct srpc_service *svc)
237 {
238 	int nrpcs = svc->sv_wi_total / svc->sv_ncpts;
239 
240 	return srpc_serv_is_framework(svc) ?
241 	       max(nrpcs, SFW_FRWK_WI_MIN) : max(nrpcs, SFW_TEST_WI_MIN);
242 }
243 
244 int srpc_add_buffer(struct swi_workitem *wi);
245 
246 static int
srpc_service_init(struct srpc_service * svc)247 srpc_service_init(struct srpc_service *svc)
248 {
249 	struct srpc_service_cd *scd;
250 	struct srpc_server_rpc *rpc;
251 	int nrpcs;
252 	int i;
253 	int j;
254 
255 	svc->sv_shuttingdown = 0;
256 
257 	svc->sv_cpt_data = cfs_percpt_alloc(lnet_cpt_table(),
258 					    sizeof(**svc->sv_cpt_data));
259 	if (!svc->sv_cpt_data)
260 		return -ENOMEM;
261 
262 	svc->sv_ncpts = srpc_serv_is_framework(svc) ?
263 			1 : cfs_cpt_number(lnet_cpt_table());
264 	nrpcs = srpc_service_nrpcs(svc);
265 
266 	cfs_percpt_for_each(scd, i, svc->sv_cpt_data) {
267 		scd->scd_cpt = i;
268 		scd->scd_svc = svc;
269 		spin_lock_init(&scd->scd_lock);
270 		INIT_LIST_HEAD(&scd->scd_rpc_free);
271 		INIT_LIST_HEAD(&scd->scd_rpc_active);
272 		INIT_LIST_HEAD(&scd->scd_buf_posted);
273 		INIT_LIST_HEAD(&scd->scd_buf_blocked);
274 
275 		scd->scd_ev.ev_data = scd;
276 		scd->scd_ev.ev_type = SRPC_REQUEST_RCVD;
277 
278 		/*
279 		 * NB: don't use lst_sched_serial for adding buffer,
280 		 * see details in srpc_service_add_buffers()
281 		 */
282 		swi_init_workitem(&scd->scd_buf_wi, scd,
283 				  srpc_add_buffer, lst_sched_test[i]);
284 
285 		if (i && srpc_serv_is_framework(svc)) {
286 			/*
287 			 * NB: framework service only needs srpc_service_cd for
288 			 * one partition, but we allocate for all to make
289 			 * it easier to implement, it will waste a little
290 			 * memory but nobody should care about this
291 			 */
292 			continue;
293 		}
294 
295 		for (j = 0; j < nrpcs; j++) {
296 			LIBCFS_CPT_ALLOC(rpc, lnet_cpt_table(),
297 					 i, sizeof(*rpc));
298 			if (!rpc) {
299 				srpc_service_fini(svc);
300 				return -ENOMEM;
301 			}
302 			list_add(&rpc->srpc_list, &scd->scd_rpc_free);
303 		}
304 	}
305 
306 	return 0;
307 }
308 
309 int
srpc_add_service(struct srpc_service * sv)310 srpc_add_service(struct srpc_service *sv)
311 {
312 	int id = sv->sv_id;
313 
314 	LASSERT(0 <= id && id <= SRPC_SERVICE_MAX_ID);
315 
316 	if (srpc_service_init(sv))
317 		return -ENOMEM;
318 
319 	spin_lock(&srpc_data.rpc_glock);
320 
321 	LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
322 
323 	if (srpc_data.rpc_services[id]) {
324 		spin_unlock(&srpc_data.rpc_glock);
325 		goto failed;
326 	}
327 
328 	srpc_data.rpc_services[id] = sv;
329 	spin_unlock(&srpc_data.rpc_glock);
330 
331 	CDEBUG(D_NET, "Adding service: id %d, name %s\n", id, sv->sv_name);
332 	return 0;
333 
334  failed:
335 	srpc_service_fini(sv);
336 	return -EBUSY;
337 }
338 
339 int
srpc_remove_service(struct srpc_service * sv)340 srpc_remove_service(struct srpc_service *sv)
341 {
342 	int id = sv->sv_id;
343 
344 	spin_lock(&srpc_data.rpc_glock);
345 
346 	if (srpc_data.rpc_services[id] != sv) {
347 		spin_unlock(&srpc_data.rpc_glock);
348 		return -ENOENT;
349 	}
350 
351 	srpc_data.rpc_services[id] = NULL;
352 	spin_unlock(&srpc_data.rpc_glock);
353 	return 0;
354 }
355 
356 static int
srpc_post_passive_rdma(int portal,int local,__u64 matchbits,void * buf,int len,int options,struct lnet_process_id peer,struct lnet_handle_md * mdh,struct srpc_event * ev)357 srpc_post_passive_rdma(int portal, int local, __u64 matchbits, void *buf,
358 		       int len, int options, struct lnet_process_id peer,
359 		       struct lnet_handle_md *mdh, struct srpc_event *ev)
360 {
361 	int rc;
362 	struct lnet_md md;
363 	struct lnet_handle_me meh;
364 
365 	rc = LNetMEAttach(portal, peer, matchbits, 0, LNET_UNLINK,
366 			  local ? LNET_INS_LOCAL : LNET_INS_AFTER, &meh);
367 	if (rc) {
368 		CERROR("LNetMEAttach failed: %d\n", rc);
369 		LASSERT(rc == -ENOMEM);
370 		return -ENOMEM;
371 	}
372 
373 	md.threshold = 1;
374 	md.user_ptr = ev;
375 	md.start = buf;
376 	md.length = len;
377 	md.options = options;
378 	md.eq_handle = srpc_data.rpc_lnet_eq;
379 
380 	rc = LNetMDAttach(meh, md, LNET_UNLINK, mdh);
381 	if (rc) {
382 		CERROR("LNetMDAttach failed: %d\n", rc);
383 		LASSERT(rc == -ENOMEM);
384 
385 		rc = LNetMEUnlink(meh);
386 		LASSERT(!rc);
387 		return -ENOMEM;
388 	}
389 
390 	CDEBUG(D_NET, "Posted passive RDMA: peer %s, portal %d, matchbits %#llx\n",
391 	       libcfs_id2str(peer), portal, matchbits);
392 	return 0;
393 }
394 
395 static int
srpc_post_active_rdma(int portal,__u64 matchbits,void * buf,int len,int options,struct lnet_process_id peer,lnet_nid_t self,struct lnet_handle_md * mdh,struct srpc_event * ev)396 srpc_post_active_rdma(int portal, __u64 matchbits, void *buf, int len,
397 		      int options, struct lnet_process_id peer,
398 		      lnet_nid_t self, struct lnet_handle_md *mdh,
399 		      struct srpc_event *ev)
400 {
401 	int rc;
402 	struct lnet_md md;
403 
404 	md.user_ptr = ev;
405 	md.start = buf;
406 	md.length = len;
407 	md.eq_handle = srpc_data.rpc_lnet_eq;
408 	md.threshold = options & LNET_MD_OP_GET ? 2 : 1;
409 	md.options = options & ~(LNET_MD_OP_PUT | LNET_MD_OP_GET);
410 
411 	rc = LNetMDBind(md, LNET_UNLINK, mdh);
412 	if (rc) {
413 		CERROR("LNetMDBind failed: %d\n", rc);
414 		LASSERT(rc == -ENOMEM);
415 		return -ENOMEM;
416 	}
417 
418 	/*
419 	 * this is kind of an abuse of the LNET_MD_OP_{PUT,GET} options.
420 	 * they're only meaningful for MDs attached to an ME (i.e. passive
421 	 * buffers...
422 	 */
423 	if (options & LNET_MD_OP_PUT) {
424 		rc = LNetPut(self, *mdh, LNET_NOACK_REQ, peer,
425 			     portal, matchbits, 0, 0);
426 	} else {
427 		LASSERT(options & LNET_MD_OP_GET);
428 
429 		rc = LNetGet(self, *mdh, peer, portal, matchbits, 0);
430 	}
431 
432 	if (rc) {
433 		CERROR("LNet%s(%s, %d, %lld) failed: %d\n",
434 		       options & LNET_MD_OP_PUT ? "Put" : "Get",
435 		       libcfs_id2str(peer), portal, matchbits, rc);
436 
437 		/*
438 		 * The forthcoming unlink event will complete this operation
439 		 * with failure, so fall through and return success here.
440 		 */
441 		rc = LNetMDUnlink(*mdh);
442 		LASSERT(!rc);
443 	} else {
444 		CDEBUG(D_NET, "Posted active RDMA: peer %s, portal %u, matchbits %#llx\n",
445 		       libcfs_id2str(peer), portal, matchbits);
446 	}
447 	return 0;
448 }
449 
450 static int
srpc_post_passive_rqtbuf(int service,int local,void * buf,int len,struct lnet_handle_md * mdh,struct srpc_event * ev)451 srpc_post_passive_rqtbuf(int service, int local, void *buf, int len,
452 			 struct lnet_handle_md *mdh, struct srpc_event *ev)
453 {
454 	struct lnet_process_id any = { 0 };
455 
456 	any.nid = LNET_NID_ANY;
457 	any.pid = LNET_PID_ANY;
458 
459 	return srpc_post_passive_rdma(srpc_serv_portal(service),
460 				      local, service, buf, len,
461 				      LNET_MD_OP_PUT, any, mdh, ev);
462 }
463 
464 static int
srpc_service_post_buffer(struct srpc_service_cd * scd,struct srpc_buffer * buf)465 srpc_service_post_buffer(struct srpc_service_cd *scd, struct srpc_buffer *buf)
466 __must_hold(&scd->scd_lock)
467 {
468 	struct srpc_service *sv = scd->scd_svc;
469 	struct srpc_msg	*msg = &buf->buf_msg;
470 	int rc;
471 
472 	LNetInvalidateMDHandle(&buf->buf_mdh);
473 	list_add(&buf->buf_list, &scd->scd_buf_posted);
474 	scd->scd_buf_nposted++;
475 	spin_unlock(&scd->scd_lock);
476 
477 	rc = srpc_post_passive_rqtbuf(sv->sv_id,
478 				      !srpc_serv_is_framework(sv),
479 				      msg, sizeof(*msg), &buf->buf_mdh,
480 				      &scd->scd_ev);
481 
482 	/*
483 	 * At this point, a RPC (new or delayed) may have arrived in
484 	 * msg and its event handler has been called. So we must add
485 	 * buf to scd_buf_posted _before_ dropping scd_lock
486 	 */
487 	spin_lock(&scd->scd_lock);
488 
489 	if (!rc) {
490 		if (!sv->sv_shuttingdown)
491 			return 0;
492 
493 		spin_unlock(&scd->scd_lock);
494 		/*
495 		 * srpc_shutdown_service might have tried to unlink me
496 		 * when my buf_mdh was still invalid
497 		 */
498 		LNetMDUnlink(buf->buf_mdh);
499 		spin_lock(&scd->scd_lock);
500 		return 0;
501 	}
502 
503 	scd->scd_buf_nposted--;
504 	if (sv->sv_shuttingdown)
505 		return rc; /* don't allow to change scd_buf_posted */
506 
507 	list_del(&buf->buf_list);
508 	spin_unlock(&scd->scd_lock);
509 
510 	LIBCFS_FREE(buf, sizeof(*buf));
511 
512 	spin_lock(&scd->scd_lock);
513 	return rc;
514 }
515 
516 int
srpc_add_buffer(struct swi_workitem * wi)517 srpc_add_buffer(struct swi_workitem *wi)
518 {
519 	struct srpc_service_cd *scd = wi->swi_workitem.wi_data;
520 	struct srpc_buffer *buf;
521 	int rc = 0;
522 
523 	/*
524 	 * it's called by workitem scheduler threads, these threads
525 	 * should have been set CPT affinity, so buffers will be posted
526 	 * on CPT local list of Portal
527 	 */
528 	spin_lock(&scd->scd_lock);
529 
530 	while (scd->scd_buf_adjust > 0 &&
531 	       !scd->scd_svc->sv_shuttingdown) {
532 		scd->scd_buf_adjust--; /* consume it */
533 		scd->scd_buf_posting++;
534 
535 		spin_unlock(&scd->scd_lock);
536 
537 		LIBCFS_ALLOC(buf, sizeof(*buf));
538 		if (!buf) {
539 			CERROR("Failed to add new buf to service: %s\n",
540 			       scd->scd_svc->sv_name);
541 			spin_lock(&scd->scd_lock);
542 			rc = -ENOMEM;
543 			break;
544 		}
545 
546 		spin_lock(&scd->scd_lock);
547 		if (scd->scd_svc->sv_shuttingdown) {
548 			spin_unlock(&scd->scd_lock);
549 			LIBCFS_FREE(buf, sizeof(*buf));
550 
551 			spin_lock(&scd->scd_lock);
552 			rc = -ESHUTDOWN;
553 			break;
554 		}
555 
556 		rc = srpc_service_post_buffer(scd, buf);
557 		if (rc)
558 			break; /* buf has been freed inside */
559 
560 		LASSERT(scd->scd_buf_posting > 0);
561 		scd->scd_buf_posting--;
562 		scd->scd_buf_total++;
563 		scd->scd_buf_low = max(2, scd->scd_buf_total / 4);
564 	}
565 
566 	if (rc) {
567 		scd->scd_buf_err_stamp = ktime_get_real_seconds();
568 		scd->scd_buf_err = rc;
569 
570 		LASSERT(scd->scd_buf_posting > 0);
571 		scd->scd_buf_posting--;
572 	}
573 
574 	spin_unlock(&scd->scd_lock);
575 	return 0;
576 }
577 
578 int
srpc_service_add_buffers(struct srpc_service * sv,int nbuffer)579 srpc_service_add_buffers(struct srpc_service *sv, int nbuffer)
580 {
581 	struct srpc_service_cd *scd;
582 	int rc = 0;
583 	int i;
584 
585 	LASSERTF(nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer);
586 
587 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
588 		spin_lock(&scd->scd_lock);
589 
590 		scd->scd_buf_err = 0;
591 		scd->scd_buf_err_stamp = 0;
592 		scd->scd_buf_posting = 0;
593 		scd->scd_buf_adjust = nbuffer;
594 		/* start to post buffers */
595 		swi_schedule_workitem(&scd->scd_buf_wi);
596 		spin_unlock(&scd->scd_lock);
597 
598 		/* framework service only post buffer for one partition  */
599 		if (srpc_serv_is_framework(sv))
600 			break;
601 	}
602 
603 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
604 		spin_lock(&scd->scd_lock);
605 		/*
606 		 * NB: srpc_service_add_buffers() can be called inside
607 		 * thread context of lst_sched_serial, and we don't normally
608 		 * allow to sleep inside thread context of WI scheduler
609 		 * because it will block current scheduler thread from doing
610 		 * anything else, even worse, it could deadlock if it's
611 		 * waiting on result from another WI of the same scheduler.
612 		 * However, it's safe at here because scd_buf_wi is scheduled
613 		 * by thread in a different WI scheduler (lst_sched_test),
614 		 * so we don't have any risk of deadlock, though this could
615 		 * block all WIs pending on lst_sched_serial for a moment
616 		 * which is not good but not fatal.
617 		 */
618 		lst_wait_until(scd->scd_buf_err ||
619 			       (!scd->scd_buf_adjust &&
620 				!scd->scd_buf_posting),
621 			       scd->scd_lock, "waiting for adding buffer\n");
622 
623 		if (scd->scd_buf_err && !rc)
624 			rc = scd->scd_buf_err;
625 
626 		spin_unlock(&scd->scd_lock);
627 	}
628 
629 	return rc;
630 }
631 
632 void
srpc_service_remove_buffers(struct srpc_service * sv,int nbuffer)633 srpc_service_remove_buffers(struct srpc_service *sv, int nbuffer)
634 {
635 	struct srpc_service_cd *scd;
636 	int num;
637 	int i;
638 
639 	LASSERT(!sv->sv_shuttingdown);
640 
641 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
642 		spin_lock(&scd->scd_lock);
643 
644 		num = scd->scd_buf_total + scd->scd_buf_posting;
645 		scd->scd_buf_adjust -= min(nbuffer, num);
646 
647 		spin_unlock(&scd->scd_lock);
648 	}
649 }
650 
651 /* returns 1 if sv has finished, otherwise 0 */
652 int
srpc_finish_service(struct srpc_service * sv)653 srpc_finish_service(struct srpc_service *sv)
654 {
655 	struct srpc_service_cd *scd;
656 	struct srpc_server_rpc *rpc;
657 	int i;
658 
659 	LASSERT(sv->sv_shuttingdown); /* srpc_shutdown_service called */
660 
661 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
662 		spin_lock(&scd->scd_lock);
663 		if (!swi_deschedule_workitem(&scd->scd_buf_wi)) {
664 			spin_unlock(&scd->scd_lock);
665 			return 0;
666 		}
667 
668 		if (scd->scd_buf_nposted > 0) {
669 			CDEBUG(D_NET, "waiting for %d posted buffers to unlink\n",
670 			       scd->scd_buf_nposted);
671 			spin_unlock(&scd->scd_lock);
672 			return 0;
673 		}
674 
675 		if (list_empty(&scd->scd_rpc_active)) {
676 			spin_unlock(&scd->scd_lock);
677 			continue;
678 		}
679 
680 		rpc = list_entry(scd->scd_rpc_active.next,
681 				 struct srpc_server_rpc, srpc_list);
682 		CNETERR("Active RPC %p on shutdown: sv %s, peer %s, wi %s scheduled %d running %d, ev fired %d type %d status %d lnet %d\n",
683 			rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
684 			swi_state2str(rpc->srpc_wi.swi_state),
685 			rpc->srpc_wi.swi_workitem.wi_scheduled,
686 			rpc->srpc_wi.swi_workitem.wi_running,
687 			rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type,
688 			rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet);
689 		spin_unlock(&scd->scd_lock);
690 		return 0;
691 	}
692 
693 	/* no lock needed from now on */
694 	srpc_service_fini(sv);
695 	return 1;
696 }
697 
698 /* called with sv->sv_lock held */
699 static void
srpc_service_recycle_buffer(struct srpc_service_cd * scd,struct srpc_buffer * buf)700 srpc_service_recycle_buffer(struct srpc_service_cd *scd,
701 			    struct srpc_buffer *buf)
702 __must_hold(&scd->scd_lock)
703 {
704 	if (!scd->scd_svc->sv_shuttingdown && scd->scd_buf_adjust >= 0) {
705 		if (srpc_service_post_buffer(scd, buf)) {
706 			CWARN("Failed to post %s buffer\n",
707 			      scd->scd_svc->sv_name);
708 		}
709 		return;
710 	}
711 
712 	/* service is shutting down, or we want to recycle some buffers */
713 	scd->scd_buf_total--;
714 
715 	if (scd->scd_buf_adjust < 0) {
716 		scd->scd_buf_adjust++;
717 		if (scd->scd_buf_adjust < 0 &&
718 		    !scd->scd_buf_total && !scd->scd_buf_posting) {
719 			CDEBUG(D_INFO,
720 			       "Try to recycle %d buffers but nothing left\n",
721 			       scd->scd_buf_adjust);
722 			scd->scd_buf_adjust = 0;
723 		}
724 	}
725 
726 	spin_unlock(&scd->scd_lock);
727 	LIBCFS_FREE(buf, sizeof(*buf));
728 	spin_lock(&scd->scd_lock);
729 }
730 
731 void
srpc_abort_service(struct srpc_service * sv)732 srpc_abort_service(struct srpc_service *sv)
733 {
734 	struct srpc_service_cd *scd;
735 	struct srpc_server_rpc *rpc;
736 	int i;
737 
738 	CDEBUG(D_NET, "Aborting service: id %d, name %s\n",
739 	       sv->sv_id, sv->sv_name);
740 
741 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
742 		spin_lock(&scd->scd_lock);
743 
744 		/*
745 		 * schedule in-flight RPCs to notice the abort, NB:
746 		 * racing with incoming RPCs; complete fix should make test
747 		 * RPCs carry session ID in its headers
748 		 */
749 		list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list) {
750 			rpc->srpc_aborted = 1;
751 			swi_schedule_workitem(&rpc->srpc_wi);
752 		}
753 
754 		spin_unlock(&scd->scd_lock);
755 	}
756 }
757 
758 void
srpc_shutdown_service(struct srpc_service * sv)759 srpc_shutdown_service(struct srpc_service *sv)
760 {
761 	struct srpc_service_cd *scd;
762 	struct srpc_server_rpc *rpc;
763 	struct srpc_buffer *buf;
764 	int i;
765 
766 	CDEBUG(D_NET, "Shutting down service: id %d, name %s\n",
767 	       sv->sv_id, sv->sv_name);
768 
769 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
770 		spin_lock(&scd->scd_lock);
771 
772 	sv->sv_shuttingdown = 1; /* i.e. no new active RPC */
773 
774 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data)
775 		spin_unlock(&scd->scd_lock);
776 
777 	cfs_percpt_for_each(scd, i, sv->sv_cpt_data) {
778 		spin_lock(&scd->scd_lock);
779 
780 		/* schedule in-flight RPCs to notice the shutdown */
781 		list_for_each_entry(rpc, &scd->scd_rpc_active, srpc_list)
782 			swi_schedule_workitem(&rpc->srpc_wi);
783 
784 		spin_unlock(&scd->scd_lock);
785 
786 		/*
787 		 * OK to traverse scd_buf_posted without lock, since no one
788 		 * touches scd_buf_posted now
789 		 */
790 		list_for_each_entry(buf, &scd->scd_buf_posted, buf_list)
791 			LNetMDUnlink(buf->buf_mdh);
792 	}
793 }
794 
795 static int
srpc_send_request(struct srpc_client_rpc * rpc)796 srpc_send_request(struct srpc_client_rpc *rpc)
797 {
798 	struct srpc_event *ev = &rpc->crpc_reqstev;
799 	int rc;
800 
801 	ev->ev_fired = 0;
802 	ev->ev_data = rpc;
803 	ev->ev_type = SRPC_REQUEST_SENT;
804 
805 	 rc = srpc_post_active_rdma(srpc_serv_portal(rpc->crpc_service),
806 				    rpc->crpc_service, &rpc->crpc_reqstmsg,
807 				    sizeof(struct srpc_msg), LNET_MD_OP_PUT,
808 				    rpc->crpc_dest, LNET_NID_ANY,
809 				    &rpc->crpc_reqstmdh, ev);
810 	if (rc) {
811 		LASSERT(rc == -ENOMEM);
812 		ev->ev_fired = 1;  /* no more event expected */
813 	}
814 	return rc;
815 }
816 
817 static int
srpc_prepare_reply(struct srpc_client_rpc * rpc)818 srpc_prepare_reply(struct srpc_client_rpc *rpc)
819 {
820 	struct srpc_event *ev = &rpc->crpc_replyev;
821 	__u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid;
822 	int rc;
823 
824 	ev->ev_fired = 0;
825 	ev->ev_data = rpc;
826 	ev->ev_type = SRPC_REPLY_RCVD;
827 
828 	*id = srpc_next_id();
829 
830 	rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
831 				    &rpc->crpc_replymsg,
832 				    sizeof(struct srpc_msg),
833 				    LNET_MD_OP_PUT, rpc->crpc_dest,
834 				    &rpc->crpc_replymdh, ev);
835 	if (rc) {
836 		LASSERT(rc == -ENOMEM);
837 		ev->ev_fired = 1;  /* no more event expected */
838 	}
839 	return rc;
840 }
841 
842 static int
srpc_prepare_bulk(struct srpc_client_rpc * rpc)843 srpc_prepare_bulk(struct srpc_client_rpc *rpc)
844 {
845 	struct srpc_bulk *bk = &rpc->crpc_bulk;
846 	struct srpc_event *ev = &rpc->crpc_bulkev;
847 	__u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid;
848 	int rc;
849 	int opt;
850 
851 	LASSERT(bk->bk_niov <= LNET_MAX_IOV);
852 
853 	if (!bk->bk_niov)
854 		return 0; /* nothing to do */
855 
856 	opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;
857 	opt |= LNET_MD_KIOV;
858 
859 	ev->ev_fired = 0;
860 	ev->ev_data = rpc;
861 	ev->ev_type = SRPC_BULK_REQ_RCVD;
862 
863 	*id = srpc_next_id();
864 
865 	rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, 0, *id,
866 				    &bk->bk_iovs[0], bk->bk_niov, opt,
867 				    rpc->crpc_dest, &bk->bk_mdh, ev);
868 	if (rc) {
869 		LASSERT(rc == -ENOMEM);
870 		ev->ev_fired = 1;  /* no more event expected */
871 	}
872 	return rc;
873 }
874 
875 static int
srpc_do_bulk(struct srpc_server_rpc * rpc)876 srpc_do_bulk(struct srpc_server_rpc *rpc)
877 {
878 	struct srpc_event *ev = &rpc->srpc_ev;
879 	struct srpc_bulk *bk = rpc->srpc_bulk;
880 	__u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid;
881 	int rc;
882 	int opt;
883 
884 	LASSERT(bk);
885 
886 	opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;
887 	opt |= LNET_MD_KIOV;
888 
889 	ev->ev_fired = 0;
890 	ev->ev_data = rpc;
891 	ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT;
892 
893 	rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id,
894 				   &bk->bk_iovs[0], bk->bk_niov, opt,
895 				   rpc->srpc_peer, rpc->srpc_self,
896 				   &bk->bk_mdh, ev);
897 	if (rc)
898 		ev->ev_fired = 1;  /* no more event expected */
899 	return rc;
900 }
901 
902 /* only called from srpc_handle_rpc */
903 static void
srpc_server_rpc_done(struct srpc_server_rpc * rpc,int status)904 srpc_server_rpc_done(struct srpc_server_rpc *rpc, int status)
905 {
906 	struct srpc_service_cd *scd = rpc->srpc_scd;
907 	struct srpc_service *sv = scd->scd_svc;
908 	struct srpc_buffer *buffer;
909 
910 	LASSERT(status || rpc->srpc_wi.swi_state == SWI_STATE_DONE);
911 
912 	rpc->srpc_status = status;
913 
914 	CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
915 		     "Server RPC %p done: service %s, peer %s, status %s:%d\n",
916 		     rpc, sv->sv_name, libcfs_id2str(rpc->srpc_peer),
917 		     swi_state2str(rpc->srpc_wi.swi_state), status);
918 
919 	if (status) {
920 		spin_lock(&srpc_data.rpc_glock);
921 		srpc_data.rpc_counters.rpcs_dropped++;
922 		spin_unlock(&srpc_data.rpc_glock);
923 	}
924 
925 	if (rpc->srpc_done)
926 		(*rpc->srpc_done) (rpc);
927 	LASSERT(!rpc->srpc_bulk);
928 
929 	spin_lock(&scd->scd_lock);
930 
931 	if (rpc->srpc_reqstbuf) {
932 		/*
933 		 * NB might drop sv_lock in srpc_service_recycle_buffer, but
934 		 * sv won't go away for scd_rpc_active must not be empty
935 		 */
936 		srpc_service_recycle_buffer(scd, rpc->srpc_reqstbuf);
937 		rpc->srpc_reqstbuf = NULL;
938 	}
939 
940 	list_del(&rpc->srpc_list); /* from scd->scd_rpc_active */
941 
942 	/*
943 	 * No one can schedule me now since:
944 	 * - I'm not on scd_rpc_active.
945 	 * - all LNet events have been fired.
946 	 * Cancel pending schedules and prevent future schedule attempts:
947 	 */
948 	LASSERT(rpc->srpc_ev.ev_fired);
949 	swi_exit_workitem(&rpc->srpc_wi);
950 
951 	if (!sv->sv_shuttingdown && !list_empty(&scd->scd_buf_blocked)) {
952 		buffer = list_entry(scd->scd_buf_blocked.next,
953 				    struct srpc_buffer, buf_list);
954 		list_del(&buffer->buf_list);
955 
956 		srpc_init_server_rpc(rpc, scd, buffer);
957 		list_add_tail(&rpc->srpc_list, &scd->scd_rpc_active);
958 		swi_schedule_workitem(&rpc->srpc_wi);
959 	} else {
960 		list_add(&rpc->srpc_list, &scd->scd_rpc_free);
961 	}
962 
963 	spin_unlock(&scd->scd_lock);
964 }
965 
966 /* handles an incoming RPC */
967 int
srpc_handle_rpc(struct swi_workitem * wi)968 srpc_handle_rpc(struct swi_workitem *wi)
969 {
970 	struct srpc_server_rpc *rpc = wi->swi_workitem.wi_data;
971 	struct srpc_service_cd *scd = rpc->srpc_scd;
972 	struct srpc_service *sv = scd->scd_svc;
973 	struct srpc_event *ev = &rpc->srpc_ev;
974 	int rc = 0;
975 
976 	LASSERT(wi == &rpc->srpc_wi);
977 
978 	spin_lock(&scd->scd_lock);
979 
980 	if (sv->sv_shuttingdown || rpc->srpc_aborted) {
981 		spin_unlock(&scd->scd_lock);
982 
983 		if (rpc->srpc_bulk)
984 			LNetMDUnlink(rpc->srpc_bulk->bk_mdh);
985 		LNetMDUnlink(rpc->srpc_replymdh);
986 
987 		if (ev->ev_fired) { /* no more event, OK to finish */
988 			srpc_server_rpc_done(rpc, -ESHUTDOWN);
989 			return 1;
990 		}
991 		return 0;
992 	}
993 
994 	spin_unlock(&scd->scd_lock);
995 
996 	switch (wi->swi_state) {
997 	default:
998 		LBUG();
999 	case SWI_STATE_NEWBORN: {
1000 		struct srpc_msg *msg;
1001 		struct srpc_generic_reply *reply;
1002 
1003 		msg = &rpc->srpc_reqstbuf->buf_msg;
1004 		reply = &rpc->srpc_replymsg.msg_body.reply;
1005 
1006 		if (!msg->msg_magic) {
1007 			/* moaned already in srpc_lnet_ev_handler */
1008 			srpc_server_rpc_done(rpc, EBADMSG);
1009 			return 1;
1010 		}
1011 
1012 		srpc_unpack_msg_hdr(msg);
1013 		if (msg->msg_version != SRPC_MSG_VERSION) {
1014 			CWARN("Version mismatch: %u, %u expected, from %s\n",
1015 			      msg->msg_version, SRPC_MSG_VERSION,
1016 			      libcfs_id2str(rpc->srpc_peer));
1017 			reply->status = EPROTO;
1018 			/* drop through and send reply */
1019 		} else {
1020 			reply->status = 0;
1021 			rc = (*sv->sv_handler)(rpc);
1022 			LASSERT(!reply->status || !rpc->srpc_bulk);
1023 			if (rc) {
1024 				srpc_server_rpc_done(rpc, rc);
1025 				return 1;
1026 			}
1027 		}
1028 
1029 		wi->swi_state = SWI_STATE_BULK_STARTED;
1030 
1031 		if (rpc->srpc_bulk) {
1032 			rc = srpc_do_bulk(rpc);
1033 			if (!rc)
1034 				return 0; /* wait for bulk */
1035 
1036 			LASSERT(ev->ev_fired);
1037 			ev->ev_status = rc;
1038 		}
1039 	}
1040 	case SWI_STATE_BULK_STARTED:
1041 		LASSERT(!rpc->srpc_bulk || ev->ev_fired);
1042 
1043 		if (rpc->srpc_bulk) {
1044 			rc = ev->ev_status;
1045 
1046 			if (sv->sv_bulk_ready)
1047 				rc = (*sv->sv_bulk_ready) (rpc, rc);
1048 
1049 			if (rc) {
1050 				srpc_server_rpc_done(rpc, rc);
1051 				return 1;
1052 			}
1053 		}
1054 
1055 		wi->swi_state = SWI_STATE_REPLY_SUBMITTED;
1056 		rc = srpc_send_reply(rpc);
1057 		if (!rc)
1058 			return 0; /* wait for reply */
1059 		srpc_server_rpc_done(rpc, rc);
1060 		return 1;
1061 
1062 	case SWI_STATE_REPLY_SUBMITTED:
1063 		if (!ev->ev_fired) {
1064 			CERROR("RPC %p: bulk %p, service %d\n",
1065 			       rpc, rpc->srpc_bulk, sv->sv_id);
1066 			CERROR("Event: status %d, type %d, lnet %d\n",
1067 			       ev->ev_status, ev->ev_type, ev->ev_lnet);
1068 			LASSERT(ev->ev_fired);
1069 		}
1070 
1071 		wi->swi_state = SWI_STATE_DONE;
1072 		srpc_server_rpc_done(rpc, ev->ev_status);
1073 		return 1;
1074 	}
1075 
1076 	return 0;
1077 }
1078 
1079 static void
srpc_client_rpc_expired(void * data)1080 srpc_client_rpc_expired(void *data)
1081 {
1082 	struct srpc_client_rpc *rpc = data;
1083 
1084 	CWARN("Client RPC expired: service %d, peer %s, timeout %d.\n",
1085 	      rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1086 	      rpc->crpc_timeout);
1087 
1088 	spin_lock(&rpc->crpc_lock);
1089 
1090 	rpc->crpc_timeout = 0;
1091 	srpc_abort_rpc(rpc, -ETIMEDOUT);
1092 
1093 	spin_unlock(&rpc->crpc_lock);
1094 
1095 	spin_lock(&srpc_data.rpc_glock);
1096 	srpc_data.rpc_counters.rpcs_expired++;
1097 	spin_unlock(&srpc_data.rpc_glock);
1098 }
1099 
1100 static void
srpc_add_client_rpc_timer(struct srpc_client_rpc * rpc)1101 srpc_add_client_rpc_timer(struct srpc_client_rpc *rpc)
1102 {
1103 	struct stt_timer *timer = &rpc->crpc_timer;
1104 
1105 	if (!rpc->crpc_timeout)
1106 		return;
1107 
1108 	INIT_LIST_HEAD(&timer->stt_list);
1109 	timer->stt_data	= rpc;
1110 	timer->stt_func	= srpc_client_rpc_expired;
1111 	timer->stt_expires = ktime_get_real_seconds() + rpc->crpc_timeout;
1112 	stt_add_timer(timer);
1113 }
1114 
1115 /*
1116  * Called with rpc->crpc_lock held.
1117  *
1118  * Upon exit the RPC expiry timer is not queued and the handler is not
1119  * running on any CPU.
1120  */
1121 static void
srpc_del_client_rpc_timer(struct srpc_client_rpc * rpc)1122 srpc_del_client_rpc_timer(struct srpc_client_rpc *rpc)
1123 {
1124 	/* timer not planted or already exploded */
1125 	if (!rpc->crpc_timeout)
1126 		return;
1127 
1128 	/* timer successfully defused */
1129 	if (stt_del_timer(&rpc->crpc_timer))
1130 		return;
1131 
1132 	/* timer detonated, wait for it to explode */
1133 	while (rpc->crpc_timeout) {
1134 		spin_unlock(&rpc->crpc_lock);
1135 
1136 		schedule();
1137 
1138 		spin_lock(&rpc->crpc_lock);
1139 	}
1140 }
1141 
1142 static void
srpc_client_rpc_done(struct srpc_client_rpc * rpc,int status)1143 srpc_client_rpc_done(struct srpc_client_rpc *rpc, int status)
1144 {
1145 	struct swi_workitem *wi = &rpc->crpc_wi;
1146 
1147 	LASSERT(status || wi->swi_state == SWI_STATE_DONE);
1148 
1149 	spin_lock(&rpc->crpc_lock);
1150 
1151 	rpc->crpc_closed = 1;
1152 	if (!rpc->crpc_status)
1153 		rpc->crpc_status = status;
1154 
1155 	srpc_del_client_rpc_timer(rpc);
1156 
1157 	CDEBUG_LIMIT(!status ? D_NET : D_NETERROR,
1158 		     "Client RPC done: service %d, peer %s, status %s:%d:%d\n",
1159 		     rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1160 		     swi_state2str(wi->swi_state), rpc->crpc_aborted, status);
1161 
1162 	/*
1163 	 * No one can schedule me now since:
1164 	 * - RPC timer has been defused.
1165 	 * - all LNet events have been fired.
1166 	 * - crpc_closed has been set, preventing srpc_abort_rpc from
1167 	 *   scheduling me.
1168 	 * Cancel pending schedules and prevent future schedule attempts:
1169 	 */
1170 	LASSERT(!srpc_event_pending(rpc));
1171 	swi_exit_workitem(wi);
1172 
1173 	spin_unlock(&rpc->crpc_lock);
1174 
1175 	(*rpc->crpc_done)(rpc);
1176 }
1177 
1178 /* sends an outgoing RPC */
1179 int
srpc_send_rpc(struct swi_workitem * wi)1180 srpc_send_rpc(struct swi_workitem *wi)
1181 {
1182 	int rc = 0;
1183 	struct srpc_client_rpc *rpc;
1184 	struct srpc_msg *reply;
1185 	int do_bulk;
1186 
1187 	LASSERT(wi);
1188 
1189 	rpc = wi->swi_workitem.wi_data;
1190 
1191 	LASSERT(rpc);
1192 	LASSERT(wi == &rpc->crpc_wi);
1193 
1194 	reply = &rpc->crpc_replymsg;
1195 	do_bulk = rpc->crpc_bulk.bk_niov > 0;
1196 
1197 	spin_lock(&rpc->crpc_lock);
1198 
1199 	if (rpc->crpc_aborted) {
1200 		spin_unlock(&rpc->crpc_lock);
1201 		goto abort;
1202 	}
1203 
1204 	spin_unlock(&rpc->crpc_lock);
1205 
1206 	switch (wi->swi_state) {
1207 	default:
1208 		LBUG();
1209 	case SWI_STATE_NEWBORN:
1210 		LASSERT(!srpc_event_pending(rpc));
1211 
1212 		rc = srpc_prepare_reply(rpc);
1213 		if (rc) {
1214 			srpc_client_rpc_done(rpc, rc);
1215 			return 1;
1216 		}
1217 
1218 		rc = srpc_prepare_bulk(rpc);
1219 		if (rc)
1220 			break;
1221 
1222 		wi->swi_state = SWI_STATE_REQUEST_SUBMITTED;
1223 		rc = srpc_send_request(rpc);
1224 		break;
1225 
1226 	case SWI_STATE_REQUEST_SUBMITTED:
1227 		/*
1228 		 * CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any
1229 		 * order; however, they're processed in a strict order:
1230 		 * rqt, rpy, and bulk.
1231 		 */
1232 		if (!rpc->crpc_reqstev.ev_fired)
1233 			break;
1234 
1235 		rc = rpc->crpc_reqstev.ev_status;
1236 		if (rc)
1237 			break;
1238 
1239 		wi->swi_state = SWI_STATE_REQUEST_SENT;
1240 		/* perhaps more events, fall thru */
1241 	case SWI_STATE_REQUEST_SENT: {
1242 		enum srpc_msg_type type = srpc_service2reply(rpc->crpc_service);
1243 
1244 		if (!rpc->crpc_replyev.ev_fired)
1245 			break;
1246 
1247 		rc = rpc->crpc_replyev.ev_status;
1248 		if (rc)
1249 			break;
1250 
1251 		srpc_unpack_msg_hdr(reply);
1252 		if (reply->msg_type != type ||
1253 		    (reply->msg_magic != SRPC_MSG_MAGIC &&
1254 		     reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1255 			CWARN("Bad message from %s: type %u (%d expected), magic %u (%d expected).\n",
1256 			      libcfs_id2str(rpc->crpc_dest),
1257 			      reply->msg_type, type,
1258 			      reply->msg_magic, SRPC_MSG_MAGIC);
1259 			rc = -EBADMSG;
1260 			break;
1261 		}
1262 
1263 		if (do_bulk && reply->msg_body.reply.status) {
1264 			CWARN("Remote error %d at %s, unlink bulk buffer in case peer didn't initiate bulk transfer\n",
1265 			      reply->msg_body.reply.status,
1266 			      libcfs_id2str(rpc->crpc_dest));
1267 			LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1268 		}
1269 
1270 		wi->swi_state = SWI_STATE_REPLY_RECEIVED;
1271 	}
1272 	case SWI_STATE_REPLY_RECEIVED:
1273 		if (do_bulk && !rpc->crpc_bulkev.ev_fired)
1274 			break;
1275 
1276 		rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0;
1277 
1278 		/*
1279 		 * Bulk buffer was unlinked due to remote error. Clear error
1280 		 * since reply buffer still contains valid data.
1281 		 * NB rpc->crpc_done shouldn't look into bulk data in case of
1282 		 * remote error.
1283 		 */
1284 		if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK &&
1285 		    !rpc->crpc_status && reply->msg_body.reply.status)
1286 			rc = 0;
1287 
1288 		wi->swi_state = SWI_STATE_DONE;
1289 		srpc_client_rpc_done(rpc, rc);
1290 		return 1;
1291 	}
1292 
1293 	if (rc) {
1294 		spin_lock(&rpc->crpc_lock);
1295 		srpc_abort_rpc(rpc, rc);
1296 		spin_unlock(&rpc->crpc_lock);
1297 	}
1298 
1299 abort:
1300 	if (rpc->crpc_aborted) {
1301 		LNetMDUnlink(rpc->crpc_reqstmdh);
1302 		LNetMDUnlink(rpc->crpc_replymdh);
1303 		LNetMDUnlink(rpc->crpc_bulk.bk_mdh);
1304 
1305 		if (!srpc_event_pending(rpc)) {
1306 			srpc_client_rpc_done(rpc, -EINTR);
1307 			return 1;
1308 		}
1309 	}
1310 	return 0;
1311 }
1312 
1313 struct srpc_client_rpc *
srpc_create_client_rpc(struct lnet_process_id peer,int service,int nbulkiov,int bulklen,void (* rpc_done)(struct srpc_client_rpc *),void (* rpc_fini)(struct srpc_client_rpc *),void * priv)1314 srpc_create_client_rpc(struct lnet_process_id peer, int service,
1315 		       int nbulkiov, int bulklen,
1316 		       void (*rpc_done)(struct srpc_client_rpc *),
1317 		       void (*rpc_fini)(struct srpc_client_rpc *), void *priv)
1318 {
1319 	struct srpc_client_rpc *rpc;
1320 
1321 	LIBCFS_ALLOC(rpc, offsetof(struct srpc_client_rpc,
1322 				   crpc_bulk.bk_iovs[nbulkiov]));
1323 	if (!rpc)
1324 		return NULL;
1325 
1326 	srpc_init_client_rpc(rpc, peer, service, nbulkiov,
1327 			     bulklen, rpc_done, rpc_fini, priv);
1328 	return rpc;
1329 }
1330 
1331 /* called with rpc->crpc_lock held */
1332 void
srpc_abort_rpc(struct srpc_client_rpc * rpc,int why)1333 srpc_abort_rpc(struct srpc_client_rpc *rpc, int why)
1334 {
1335 	LASSERT(why);
1336 
1337 	if (rpc->crpc_aborted ||	/* already aborted */
1338 	    rpc->crpc_closed)		/* callback imminent */
1339 		return;
1340 
1341 	CDEBUG(D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n",
1342 	       rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
1343 	       swi_state2str(rpc->crpc_wi.swi_state), why);
1344 
1345 	rpc->crpc_aborted = 1;
1346 	rpc->crpc_status = why;
1347 	swi_schedule_workitem(&rpc->crpc_wi);
1348 }
1349 
1350 /* called with rpc->crpc_lock held */
1351 void
srpc_post_rpc(struct srpc_client_rpc * rpc)1352 srpc_post_rpc(struct srpc_client_rpc *rpc)
1353 {
1354 	LASSERT(!rpc->crpc_aborted);
1355 	LASSERT(srpc_data.rpc_state == SRPC_STATE_RUNNING);
1356 
1357 	CDEBUG(D_NET, "Posting RPC: peer %s, service %d, timeout %d\n",
1358 	       libcfs_id2str(rpc->crpc_dest), rpc->crpc_service,
1359 	       rpc->crpc_timeout);
1360 
1361 	srpc_add_client_rpc_timer(rpc);
1362 	swi_schedule_workitem(&rpc->crpc_wi);
1363 }
1364 
1365 int
srpc_send_reply(struct srpc_server_rpc * rpc)1366 srpc_send_reply(struct srpc_server_rpc *rpc)
1367 {
1368 	struct srpc_event *ev = &rpc->srpc_ev;
1369 	struct srpc_msg *msg = &rpc->srpc_replymsg;
1370 	struct srpc_buffer *buffer = rpc->srpc_reqstbuf;
1371 	struct srpc_service_cd *scd = rpc->srpc_scd;
1372 	struct srpc_service *sv = scd->scd_svc;
1373 	__u64 rpyid;
1374 	int rc;
1375 
1376 	LASSERT(buffer);
1377 	rpyid = buffer->buf_msg.msg_body.reqst.rpyid;
1378 
1379 	spin_lock(&scd->scd_lock);
1380 
1381 	if (!sv->sv_shuttingdown && !srpc_serv_is_framework(sv)) {
1382 		/*
1383 		 * Repost buffer before replying since test client
1384 		 * might send me another RPC once it gets the reply
1385 		 */
1386 		if (srpc_service_post_buffer(scd, buffer))
1387 			CWARN("Failed to repost %s buffer\n", sv->sv_name);
1388 		rpc->srpc_reqstbuf = NULL;
1389 	}
1390 
1391 	spin_unlock(&scd->scd_lock);
1392 
1393 	ev->ev_fired = 0;
1394 	ev->ev_data = rpc;
1395 	ev->ev_type = SRPC_REPLY_SENT;
1396 
1397 	msg->msg_magic = SRPC_MSG_MAGIC;
1398 	msg->msg_version = SRPC_MSG_VERSION;
1399 	msg->msg_type = srpc_service2reply(sv->sv_id);
1400 
1401 	rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg,
1402 				   sizeof(*msg), LNET_MD_OP_PUT,
1403 				   rpc->srpc_peer, rpc->srpc_self,
1404 				   &rpc->srpc_replymdh, ev);
1405 	if (rc)
1406 		ev->ev_fired = 1; /* no more event expected */
1407 	return rc;
1408 }
1409 
1410 /* when in kernel always called with LNET_LOCK() held, and in thread context */
1411 static void
srpc_lnet_ev_handler(struct lnet_event * ev)1412 srpc_lnet_ev_handler(struct lnet_event *ev)
1413 {
1414 	struct srpc_service_cd *scd;
1415 	struct srpc_event *rpcev = ev->md.user_ptr;
1416 	struct srpc_client_rpc *crpc;
1417 	struct srpc_server_rpc *srpc;
1418 	struct srpc_buffer *buffer;
1419 	struct srpc_service *sv;
1420 	struct srpc_msg *msg;
1421 	enum srpc_msg_type type;
1422 
1423 	LASSERT(!in_interrupt());
1424 
1425 	if (ev->status) {
1426 		__u32 errors;
1427 
1428 		spin_lock(&srpc_data.rpc_glock);
1429 		if (ev->status != -ECANCELED) /* cancellation is not error */
1430 			srpc_data.rpc_counters.errors++;
1431 		errors = srpc_data.rpc_counters.errors;
1432 		spin_unlock(&srpc_data.rpc_glock);
1433 
1434 		CNETERR("LNet event status %d type %d, RPC errors %u\n",
1435 			ev->status, ev->type, errors);
1436 	}
1437 
1438 	rpcev->ev_lnet = ev->type;
1439 
1440 	switch (rpcev->ev_type) {
1441 	default:
1442 		CERROR("Unknown event: status %d, type %d, lnet %d\n",
1443 		       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1444 		LBUG();
1445 	case SRPC_REQUEST_SENT:
1446 		if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1447 			spin_lock(&srpc_data.rpc_glock);
1448 			srpc_data.rpc_counters.rpcs_sent++;
1449 			spin_unlock(&srpc_data.rpc_glock);
1450 		}
1451 	case SRPC_REPLY_RCVD:
1452 	case SRPC_BULK_REQ_RCVD:
1453 		crpc = rpcev->ev_data;
1454 
1455 		if (rpcev != &crpc->crpc_reqstev &&
1456 		    rpcev != &crpc->crpc_replyev &&
1457 		    rpcev != &crpc->crpc_bulkev) {
1458 			CERROR("rpcev %p, crpc %p, reqstev %p, replyev %p, bulkev %p\n",
1459 			       rpcev, crpc, &crpc->crpc_reqstev,
1460 			       &crpc->crpc_replyev, &crpc->crpc_bulkev);
1461 			CERROR("Bad event: status %d, type %d, lnet %d\n",
1462 			       rpcev->ev_status, rpcev->ev_type, rpcev->ev_lnet);
1463 			LBUG();
1464 		}
1465 
1466 		spin_lock(&crpc->crpc_lock);
1467 
1468 		LASSERT(!rpcev->ev_fired);
1469 		rpcev->ev_fired = 1;
1470 		rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1471 						-EINTR : ev->status;
1472 		swi_schedule_workitem(&crpc->crpc_wi);
1473 
1474 		spin_unlock(&crpc->crpc_lock);
1475 		break;
1476 
1477 	case SRPC_REQUEST_RCVD:
1478 		scd = rpcev->ev_data;
1479 		sv = scd->scd_svc;
1480 
1481 		LASSERT(rpcev == &scd->scd_ev);
1482 
1483 		spin_lock(&scd->scd_lock);
1484 
1485 		LASSERT(ev->unlinked);
1486 		LASSERT(ev->type == LNET_EVENT_PUT ||
1487 			ev->type == LNET_EVENT_UNLINK);
1488 		LASSERT(ev->type != LNET_EVENT_UNLINK ||
1489 			sv->sv_shuttingdown);
1490 
1491 		buffer = container_of(ev->md.start, struct srpc_buffer, buf_msg);
1492 		buffer->buf_peer = ev->initiator;
1493 		buffer->buf_self = ev->target.nid;
1494 
1495 		LASSERT(scd->scd_buf_nposted > 0);
1496 		scd->scd_buf_nposted--;
1497 
1498 		if (sv->sv_shuttingdown) {
1499 			/*
1500 			 * Leave buffer on scd->scd_buf_nposted since
1501 			 * srpc_finish_service needs to traverse it.
1502 			 */
1503 			spin_unlock(&scd->scd_lock);
1504 			break;
1505 		}
1506 
1507 		if (scd->scd_buf_err_stamp &&
1508 		    scd->scd_buf_err_stamp < ktime_get_real_seconds()) {
1509 			/* re-enable adding buffer */
1510 			scd->scd_buf_err_stamp = 0;
1511 			scd->scd_buf_err = 0;
1512 		}
1513 
1514 		if (!scd->scd_buf_err &&	/* adding buffer is enabled */
1515 		    !scd->scd_buf_adjust &&
1516 		    scd->scd_buf_nposted < scd->scd_buf_low) {
1517 			scd->scd_buf_adjust = max(scd->scd_buf_total / 2,
1518 						  SFW_TEST_WI_MIN);
1519 			swi_schedule_workitem(&scd->scd_buf_wi);
1520 		}
1521 
1522 		list_del(&buffer->buf_list); /* from scd->scd_buf_posted */
1523 		msg = &buffer->buf_msg;
1524 		type = srpc_service2request(sv->sv_id);
1525 
1526 		if (ev->status || ev->mlength != sizeof(*msg) ||
1527 		    (msg->msg_type != type &&
1528 		     msg->msg_type != __swab32(type)) ||
1529 		    (msg->msg_magic != SRPC_MSG_MAGIC &&
1530 		     msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) {
1531 			CERROR("Dropping RPC (%s) from %s: status %d mlength %d type %u magic %u.\n",
1532 			       sv->sv_name, libcfs_id2str(ev->initiator),
1533 			       ev->status, ev->mlength,
1534 			       msg->msg_type, msg->msg_magic);
1535 
1536 			/*
1537 			 * NB can't call srpc_service_recycle_buffer here since
1538 			 * it may call LNetM[DE]Attach. The invalid magic tells
1539 			 * srpc_handle_rpc to drop this RPC
1540 			 */
1541 			msg->msg_magic = 0;
1542 		}
1543 
1544 		if (!list_empty(&scd->scd_rpc_free)) {
1545 			srpc = list_entry(scd->scd_rpc_free.next,
1546 					  struct srpc_server_rpc,
1547 					  srpc_list);
1548 			list_del(&srpc->srpc_list);
1549 
1550 			srpc_init_server_rpc(srpc, scd, buffer);
1551 			list_add_tail(&srpc->srpc_list,
1552 				      &scd->scd_rpc_active);
1553 			swi_schedule_workitem(&srpc->srpc_wi);
1554 		} else {
1555 			list_add_tail(&buffer->buf_list,
1556 				      &scd->scd_buf_blocked);
1557 		}
1558 
1559 		spin_unlock(&scd->scd_lock);
1560 
1561 		spin_lock(&srpc_data.rpc_glock);
1562 		srpc_data.rpc_counters.rpcs_rcvd++;
1563 		spin_unlock(&srpc_data.rpc_glock);
1564 		break;
1565 
1566 	case SRPC_BULK_GET_RPLD:
1567 		LASSERT(ev->type == LNET_EVENT_SEND ||
1568 			ev->type == LNET_EVENT_REPLY ||
1569 			ev->type == LNET_EVENT_UNLINK);
1570 
1571 		if (!ev->unlinked)
1572 			break; /* wait for final event */
1573 
1574 	case SRPC_BULK_PUT_SENT:
1575 		if (!ev->status && ev->type != LNET_EVENT_UNLINK) {
1576 			spin_lock(&srpc_data.rpc_glock);
1577 
1578 			if (rpcev->ev_type == SRPC_BULK_GET_RPLD)
1579 				srpc_data.rpc_counters.bulk_get += ev->mlength;
1580 			else
1581 				srpc_data.rpc_counters.bulk_put += ev->mlength;
1582 
1583 			spin_unlock(&srpc_data.rpc_glock);
1584 		}
1585 	case SRPC_REPLY_SENT:
1586 		srpc = rpcev->ev_data;
1587 		scd = srpc->srpc_scd;
1588 
1589 		LASSERT(rpcev == &srpc->srpc_ev);
1590 
1591 		spin_lock(&scd->scd_lock);
1592 
1593 		rpcev->ev_fired = 1;
1594 		rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ?
1595 				   -EINTR : ev->status;
1596 		swi_schedule_workitem(&srpc->srpc_wi);
1597 
1598 		spin_unlock(&scd->scd_lock);
1599 		break;
1600 	}
1601 }
1602 
1603 int
srpc_startup(void)1604 srpc_startup(void)
1605 {
1606 	int rc;
1607 
1608 	memset(&srpc_data, 0, sizeof(struct smoketest_rpc));
1609 	spin_lock_init(&srpc_data.rpc_glock);
1610 
1611 	/* 1 second pause to avoid timestamp reuse */
1612 	set_current_state(TASK_UNINTERRUPTIBLE);
1613 	schedule_timeout(cfs_time_seconds(1));
1614 	srpc_data.rpc_matchbits = ((__u64)ktime_get_real_seconds()) << 48;
1615 
1616 	srpc_data.rpc_state = SRPC_STATE_NONE;
1617 
1618 	rc = LNetNIInit(LNET_PID_LUSTRE);
1619 	if (rc < 0) {
1620 		CERROR("LNetNIInit() has failed: %d\n", rc);
1621 		return rc;
1622 	}
1623 
1624 	srpc_data.rpc_state = SRPC_STATE_NI_INIT;
1625 
1626 	LNetInvalidateEQHandle(&srpc_data.rpc_lnet_eq);
1627 	rc = LNetEQAlloc(0, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);
1628 	if (rc) {
1629 		CERROR("LNetEQAlloc() has failed: %d\n", rc);
1630 		goto bail;
1631 	}
1632 
1633 	rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1634 	LASSERT(!rc);
1635 	rc = LNetSetLazyPortal(SRPC_REQUEST_PORTAL);
1636 	LASSERT(!rc);
1637 
1638 	srpc_data.rpc_state = SRPC_STATE_EQ_INIT;
1639 
1640 	rc = stt_startup();
1641 
1642 bail:
1643 	if (rc)
1644 		srpc_shutdown();
1645 	else
1646 		srpc_data.rpc_state = SRPC_STATE_RUNNING;
1647 
1648 	return rc;
1649 }
1650 
1651 void
srpc_shutdown(void)1652 srpc_shutdown(void)
1653 {
1654 	int i;
1655 	int rc;
1656 	int state;
1657 
1658 	state = srpc_data.rpc_state;
1659 	srpc_data.rpc_state = SRPC_STATE_STOPPING;
1660 
1661 	switch (state) {
1662 	default:
1663 		LBUG();
1664 	case SRPC_STATE_RUNNING:
1665 		spin_lock(&srpc_data.rpc_glock);
1666 
1667 		for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) {
1668 			struct srpc_service *sv = srpc_data.rpc_services[i];
1669 
1670 			LASSERTF(!sv, "service not empty: id %d, name %s\n",
1671 				 i, sv->sv_name);
1672 		}
1673 
1674 		spin_unlock(&srpc_data.rpc_glock);
1675 
1676 		stt_shutdown();
1677 
1678 	case SRPC_STATE_EQ_INIT:
1679 		rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL);
1680 		rc = LNetClearLazyPortal(SRPC_REQUEST_PORTAL);
1681 		LASSERT(!rc);
1682 		rc = LNetEQFree(srpc_data.rpc_lnet_eq);
1683 		LASSERT(!rc); /* the EQ should have no user by now */
1684 
1685 	case SRPC_STATE_NI_INIT:
1686 		LNetNIFini();
1687 	}
1688 }
1689