• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/selftest/framework.c
33  *
34  * Author: Isaac Huang <isaac@clusterfs.com>
35  * Author: Liang Zhen  <liangzhen@clusterfs.com>
36  */
37 
38 #define DEBUG_SUBSYSTEM S_LNET
39 
40 #include "selftest.h"
41 
42 lst_sid_t LST_INVALID_SID = {LNET_NID_ANY, -1};
43 
44 static int session_timeout = 100;
45 module_param(session_timeout, int, 0444);
46 MODULE_PARM_DESC(session_timeout, "test session timeout in seconds (100 by default, 0 == never)");
47 
48 static int rpc_timeout = 64;
49 module_param(rpc_timeout, int, 0644);
50 MODULE_PARM_DESC(rpc_timeout, "rpc timeout in seconds (64 by default, 0 == never)");
51 
52 #define sfw_unpack_id(id)		\
53 do {					\
54 	__swab64s(&(id).nid);		\
55 	__swab32s(&(id).pid);		\
56 } while (0)
57 
58 #define sfw_unpack_sid(sid)		\
59 do {					\
60 	__swab64s(&(sid).ses_nid);	\
61 	__swab64s(&(sid).ses_stamp);	\
62 } while (0)
63 
64 #define sfw_unpack_fw_counters(fc)	  \
65 do {					  \
66 	__swab32s(&(fc).running_ms);	  \
67 	__swab32s(&(fc).active_batches);  \
68 	__swab32s(&(fc).zombie_sessions); \
69 	__swab32s(&(fc).brw_errors);	  \
70 	__swab32s(&(fc).ping_errors);	  \
71 } while (0)
72 
73 #define sfw_unpack_rpc_counters(rc)	\
74 do {					\
75 	__swab32s(&(rc).errors);	\
76 	__swab32s(&(rc).rpcs_sent);	\
77 	__swab32s(&(rc).rpcs_rcvd);	\
78 	__swab32s(&(rc).rpcs_dropped);	\
79 	__swab32s(&(rc).rpcs_expired);	\
80 	__swab64s(&(rc).bulk_get);	\
81 	__swab64s(&(rc).bulk_put);	\
82 } while (0)
83 
84 #define sfw_unpack_lnet_counters(lc)	\
85 do {					\
86 	__swab32s(&(lc).errors);	\
87 	__swab32s(&(lc).msgs_max);	\
88 	__swab32s(&(lc).msgs_alloc);	\
89 	__swab32s(&(lc).send_count);	\
90 	__swab32s(&(lc).recv_count);	\
91 	__swab32s(&(lc).drop_count);	\
92 	__swab32s(&(lc).route_count);	\
93 	__swab64s(&(lc).send_length);	\
94 	__swab64s(&(lc).recv_length);	\
95 	__swab64s(&(lc).drop_length);	\
96 	__swab64s(&(lc).route_length);	\
97 } while (0)
98 
99 #define sfw_test_active(t)	(atomic_read(&(t)->tsi_nactive))
100 #define sfw_batch_active(b)	(atomic_read(&(b)->bat_nactive))
101 
102 static struct smoketest_framework {
103 	struct list_head  fw_zombie_rpcs;     /* RPCs to be recycled */
104 	struct list_head  fw_zombie_sessions; /* stopping sessions */
105 	struct list_head  fw_tests;	      /* registered test cases */
106 	atomic_t	  fw_nzombies;	      /* # zombie sessions */
107 	spinlock_t	  fw_lock;	      /* serialise */
108 	struct sfw_session	  *fw_session;	      /* _the_ session */
109 	int		  fw_shuttingdown;    /* shutdown in progress */
110 	struct srpc_server_rpc *fw_active_srpc;/* running RPC */
111 } sfw_data;
112 
113 /* forward ref's */
114 int sfw_stop_batch(struct sfw_batch *tsb, int force);
115 void sfw_destroy_session(struct sfw_session *sn);
116 
117 static inline struct sfw_test_case *
sfw_find_test_case(int id)118 sfw_find_test_case(int id)
119 {
120 	struct sfw_test_case *tsc;
121 
122 	LASSERT(id <= SRPC_SERVICE_MAX_ID);
123 	LASSERT(id > SRPC_FRAMEWORK_SERVICE_MAX_ID);
124 
125 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
126 		if (tsc->tsc_srv_service->sv_id == id)
127 			return tsc;
128 	}
129 
130 	return NULL;
131 }
132 
133 static int
sfw_register_test(struct srpc_service * service,struct sfw_test_client_ops * cliops)134 sfw_register_test(struct srpc_service *service, struct sfw_test_client_ops *cliops)
135 {
136 	struct sfw_test_case *tsc;
137 
138 	if (sfw_find_test_case(service->sv_id)) {
139 		CERROR("Failed to register test %s (%d)\n",
140 		       service->sv_name, service->sv_id);
141 		return -EEXIST;
142 	}
143 
144 	LIBCFS_ALLOC(tsc, sizeof(struct sfw_test_case));
145 	if (!tsc)
146 		return -ENOMEM;
147 
148 	tsc->tsc_cli_ops = cliops;
149 	tsc->tsc_srv_service = service;
150 
151 	list_add_tail(&tsc->tsc_list, &sfw_data.fw_tests);
152 	return 0;
153 }
154 
155 static void
sfw_add_session_timer(void)156 sfw_add_session_timer(void)
157 {
158 	struct sfw_session *sn = sfw_data.fw_session;
159 	struct stt_timer *timer = &sn->sn_timer;
160 
161 	LASSERT(!sfw_data.fw_shuttingdown);
162 
163 	if (!sn || !sn->sn_timeout)
164 		return;
165 
166 	LASSERT(!sn->sn_timer_active);
167 
168 	sn->sn_timer_active = 1;
169 	timer->stt_expires = ktime_get_real_seconds() + sn->sn_timeout;
170 	stt_add_timer(timer);
171 }
172 
173 static int
sfw_del_session_timer(void)174 sfw_del_session_timer(void)
175 {
176 	struct sfw_session *sn = sfw_data.fw_session;
177 
178 	if (!sn || !sn->sn_timer_active)
179 		return 0;
180 
181 	LASSERT(sn->sn_timeout);
182 
183 	if (stt_del_timer(&sn->sn_timer)) { /* timer defused */
184 		sn->sn_timer_active = 0;
185 		return 0;
186 	}
187 
188 	return EBUSY; /* racing with sfw_session_expired() */
189 }
190 
191 static void
sfw_deactivate_session(void)192 sfw_deactivate_session(void)
193 __must_hold(&sfw_data.fw_lock)
194 {
195 	struct sfw_session *sn = sfw_data.fw_session;
196 	int nactive = 0;
197 	struct sfw_batch *tsb;
198 	struct sfw_test_case *tsc;
199 
200 	if (!sn)
201 		return;
202 
203 	LASSERT(!sn->sn_timer_active);
204 
205 	sfw_data.fw_session = NULL;
206 	atomic_inc(&sfw_data.fw_nzombies);
207 	list_add(&sn->sn_list, &sfw_data.fw_zombie_sessions);
208 
209 	spin_unlock(&sfw_data.fw_lock);
210 
211 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
212 		srpc_abort_service(tsc->tsc_srv_service);
213 	}
214 
215 	spin_lock(&sfw_data.fw_lock);
216 
217 	list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
218 		if (sfw_batch_active(tsb)) {
219 			nactive++;
220 			sfw_stop_batch(tsb, 1);
221 		}
222 	}
223 
224 	if (nactive)
225 		return;	/* wait for active batches to stop */
226 
227 	list_del_init(&sn->sn_list);
228 	spin_unlock(&sfw_data.fw_lock);
229 
230 	sfw_destroy_session(sn);
231 
232 	spin_lock(&sfw_data.fw_lock);
233 }
234 
235 static void
sfw_session_expired(void * data)236 sfw_session_expired(void *data)
237 {
238 	struct sfw_session *sn = data;
239 
240 	spin_lock(&sfw_data.fw_lock);
241 
242 	LASSERT(sn->sn_timer_active);
243 	LASSERT(sn == sfw_data.fw_session);
244 
245 	CWARN("Session expired! sid: %s-%llu, name: %s\n",
246 	      libcfs_nid2str(sn->sn_id.ses_nid),
247 	      sn->sn_id.ses_stamp, &sn->sn_name[0]);
248 
249 	sn->sn_timer_active = 0;
250 	sfw_deactivate_session();
251 
252 	spin_unlock(&sfw_data.fw_lock);
253 }
254 
255 static inline void
sfw_init_session(struct sfw_session * sn,lst_sid_t sid,unsigned features,const char * name)256 sfw_init_session(struct sfw_session *sn, lst_sid_t sid,
257 		 unsigned features, const char *name)
258 {
259 	struct stt_timer *timer = &sn->sn_timer;
260 
261 	memset(sn, 0, sizeof(struct sfw_session));
262 	INIT_LIST_HEAD(&sn->sn_list);
263 	INIT_LIST_HEAD(&sn->sn_batches);
264 	atomic_set(&sn->sn_refcount, 1);	/* +1 for caller */
265 	atomic_set(&sn->sn_brw_errors, 0);
266 	atomic_set(&sn->sn_ping_errors, 0);
267 	strlcpy(&sn->sn_name[0], name, sizeof(sn->sn_name));
268 
269 	sn->sn_timer_active = 0;
270 	sn->sn_id = sid;
271 	sn->sn_features = features;
272 	sn->sn_timeout = session_timeout;
273 	sn->sn_started = cfs_time_current();
274 
275 	timer->stt_data = sn;
276 	timer->stt_func = sfw_session_expired;
277 	INIT_LIST_HEAD(&timer->stt_list);
278 }
279 
280 /* completion handler for incoming framework RPCs */
281 static void
sfw_server_rpc_done(struct srpc_server_rpc * rpc)282 sfw_server_rpc_done(struct srpc_server_rpc *rpc)
283 {
284 	struct srpc_service *sv	= rpc->srpc_scd->scd_svc;
285 	int status = rpc->srpc_status;
286 
287 	CDEBUG(D_NET, "Incoming framework RPC done: service %s, peer %s, status %s:%d\n",
288 	       sv->sv_name, libcfs_id2str(rpc->srpc_peer),
289 	       swi_state2str(rpc->srpc_wi.swi_state),
290 	       status);
291 
292 	if (rpc->srpc_bulk)
293 		sfw_free_pages(rpc);
294 }
295 
296 static void
sfw_client_rpc_fini(struct srpc_client_rpc * rpc)297 sfw_client_rpc_fini(struct srpc_client_rpc *rpc)
298 {
299 	LASSERT(!rpc->crpc_bulk.bk_niov);
300 	LASSERT(list_empty(&rpc->crpc_list));
301 	LASSERT(!atomic_read(&rpc->crpc_refcount));
302 
303 	CDEBUG(D_NET, "Outgoing framework RPC done: service %d, peer %s, status %s:%d:%d\n",
304 	       rpc->crpc_service, libcfs_id2str(rpc->crpc_dest),
305 	       swi_state2str(rpc->crpc_wi.swi_state),
306 	       rpc->crpc_aborted, rpc->crpc_status);
307 
308 	spin_lock(&sfw_data.fw_lock);
309 
310 	/* my callers must finish all RPCs before shutting me down */
311 	LASSERT(!sfw_data.fw_shuttingdown);
312 	list_add(&rpc->crpc_list, &sfw_data.fw_zombie_rpcs);
313 
314 	spin_unlock(&sfw_data.fw_lock);
315 }
316 
317 static struct sfw_batch *
sfw_find_batch(lst_bid_t bid)318 sfw_find_batch(lst_bid_t bid)
319 {
320 	struct sfw_session *sn = sfw_data.fw_session;
321 	struct sfw_batch *bat;
322 
323 	LASSERT(sn);
324 
325 	list_for_each_entry(bat, &sn->sn_batches, bat_list) {
326 		if (bat->bat_id.bat_id == bid.bat_id)
327 			return bat;
328 	}
329 
330 	return NULL;
331 }
332 
333 static struct sfw_batch *
sfw_bid2batch(lst_bid_t bid)334 sfw_bid2batch(lst_bid_t bid)
335 {
336 	struct sfw_session *sn = sfw_data.fw_session;
337 	struct sfw_batch *bat;
338 
339 	LASSERT(sn);
340 
341 	bat = sfw_find_batch(bid);
342 	if (bat)
343 		return bat;
344 
345 	LIBCFS_ALLOC(bat, sizeof(struct sfw_batch));
346 	if (!bat)
347 		return NULL;
348 
349 	bat->bat_error = 0;
350 	bat->bat_session = sn;
351 	bat->bat_id = bid;
352 	atomic_set(&bat->bat_nactive, 0);
353 	INIT_LIST_HEAD(&bat->bat_tests);
354 
355 	list_add_tail(&bat->bat_list, &sn->sn_batches);
356 	return bat;
357 }
358 
359 static int
sfw_get_stats(struct srpc_stat_reqst * request,struct srpc_stat_reply * reply)360 sfw_get_stats(struct srpc_stat_reqst *request, struct srpc_stat_reply *reply)
361 {
362 	struct sfw_session *sn = sfw_data.fw_session;
363 	sfw_counters_t *cnt = &reply->str_fw;
364 	struct sfw_batch *bat;
365 
366 	reply->str_sid = !sn ? LST_INVALID_SID : sn->sn_id;
367 
368 	if (request->str_sid.ses_nid == LNET_NID_ANY) {
369 		reply->str_status = EINVAL;
370 		return 0;
371 	}
372 
373 	if (!sn || !sfw_sid_equal(request->str_sid, sn->sn_id)) {
374 		reply->str_status = ESRCH;
375 		return 0;
376 	}
377 
378 	lnet_counters_get(&reply->str_lnet);
379 	srpc_get_counters(&reply->str_rpc);
380 
381 	/*
382 	 * send over the msecs since the session was started
383 	 * with 32 bits to send, this is ~49 days
384 	 */
385 	cnt->running_ms = jiffies_to_msecs(jiffies - sn->sn_started);
386 	cnt->brw_errors = atomic_read(&sn->sn_brw_errors);
387 	cnt->ping_errors = atomic_read(&sn->sn_ping_errors);
388 	cnt->zombie_sessions = atomic_read(&sfw_data.fw_nzombies);
389 
390 	cnt->active_batches = 0;
391 	list_for_each_entry(bat, &sn->sn_batches, bat_list) {
392 		if (atomic_read(&bat->bat_nactive) > 0)
393 			cnt->active_batches++;
394 	}
395 
396 	reply->str_status = 0;
397 	return 0;
398 }
399 
400 int
sfw_make_session(struct srpc_mksn_reqst * request,struct srpc_mksn_reply * reply)401 sfw_make_session(struct srpc_mksn_reqst *request, struct srpc_mksn_reply *reply)
402 {
403 	struct sfw_session *sn = sfw_data.fw_session;
404 	struct srpc_msg *msg = container_of(request, struct srpc_msg,
405 				       msg_body.mksn_reqst);
406 	int cplen = 0;
407 
408 	if (request->mksn_sid.ses_nid == LNET_NID_ANY) {
409 		reply->mksn_sid = !sn ? LST_INVALID_SID : sn->sn_id;
410 		reply->mksn_status = EINVAL;
411 		return 0;
412 	}
413 
414 	if (sn) {
415 		reply->mksn_status = 0;
416 		reply->mksn_sid = sn->sn_id;
417 		reply->mksn_timeout = sn->sn_timeout;
418 
419 		if (sfw_sid_equal(request->mksn_sid, sn->sn_id)) {
420 			atomic_inc(&sn->sn_refcount);
421 			return 0;
422 		}
423 
424 		if (!request->mksn_force) {
425 			reply->mksn_status = EBUSY;
426 			cplen = strlcpy(&reply->mksn_name[0], &sn->sn_name[0],
427 					sizeof(reply->mksn_name));
428 			if (cplen >= sizeof(reply->mksn_name))
429 				return -E2BIG;
430 			return 0;
431 		}
432 	}
433 
434 	/*
435 	 * reject the request if it requires unknown features
436 	 * NB: old version will always accept all features because it's not
437 	 * aware of srpc_msg::msg_ses_feats, it's a defect but it's also
438 	 * harmless because it will return zero feature to console, and it's
439 	 * console's responsibility to make sure all nodes in a session have
440 	 * same feature mask.
441 	 */
442 	if (msg->msg_ses_feats & ~LST_FEATS_MASK) {
443 		reply->mksn_status = EPROTO;
444 		return 0;
445 	}
446 
447 	/* brand new or create by force */
448 	LIBCFS_ALLOC(sn, sizeof(struct sfw_session));
449 	if (!sn) {
450 		CERROR("dropping RPC mksn under memory pressure\n");
451 		return -ENOMEM;
452 	}
453 
454 	sfw_init_session(sn, request->mksn_sid,
455 			 msg->msg_ses_feats, &request->mksn_name[0]);
456 
457 	spin_lock(&sfw_data.fw_lock);
458 
459 	sfw_deactivate_session();
460 	LASSERT(!sfw_data.fw_session);
461 	sfw_data.fw_session = sn;
462 
463 	spin_unlock(&sfw_data.fw_lock);
464 
465 	reply->mksn_status = 0;
466 	reply->mksn_sid = sn->sn_id;
467 	reply->mksn_timeout = sn->sn_timeout;
468 	return 0;
469 }
470 
471 static int
sfw_remove_session(struct srpc_rmsn_reqst * request,struct srpc_rmsn_reply * reply)472 sfw_remove_session(struct srpc_rmsn_reqst *request, struct srpc_rmsn_reply *reply)
473 {
474 	struct sfw_session *sn = sfw_data.fw_session;
475 
476 	reply->rmsn_sid = !sn ? LST_INVALID_SID : sn->sn_id;
477 
478 	if (request->rmsn_sid.ses_nid == LNET_NID_ANY) {
479 		reply->rmsn_status = EINVAL;
480 		return 0;
481 	}
482 
483 	if (!sn || !sfw_sid_equal(request->rmsn_sid, sn->sn_id)) {
484 		reply->rmsn_status = !sn ? ESRCH : EBUSY;
485 		return 0;
486 	}
487 
488 	if (!atomic_dec_and_test(&sn->sn_refcount)) {
489 		reply->rmsn_status = 0;
490 		return 0;
491 	}
492 
493 	spin_lock(&sfw_data.fw_lock);
494 	sfw_deactivate_session();
495 	spin_unlock(&sfw_data.fw_lock);
496 
497 	reply->rmsn_status = 0;
498 	reply->rmsn_sid = LST_INVALID_SID;
499 	LASSERT(!sfw_data.fw_session);
500 	return 0;
501 }
502 
503 static int
sfw_debug_session(struct srpc_debug_reqst * request,struct srpc_debug_reply * reply)504 sfw_debug_session(struct srpc_debug_reqst *request, struct srpc_debug_reply *reply)
505 {
506 	struct sfw_session *sn = sfw_data.fw_session;
507 
508 	if (!sn) {
509 		reply->dbg_status = ESRCH;
510 		reply->dbg_sid = LST_INVALID_SID;
511 		return 0;
512 	}
513 
514 	reply->dbg_status = 0;
515 	reply->dbg_sid = sn->sn_id;
516 	reply->dbg_timeout = sn->sn_timeout;
517 	if (strlcpy(reply->dbg_name, &sn->sn_name[0], sizeof(reply->dbg_name))
518 	    >= sizeof(reply->dbg_name))
519 		return -E2BIG;
520 
521 	return 0;
522 }
523 
524 static void
sfw_test_rpc_fini(struct srpc_client_rpc * rpc)525 sfw_test_rpc_fini(struct srpc_client_rpc *rpc)
526 {
527 	struct sfw_test_unit *tsu = rpc->crpc_priv;
528 	struct sfw_test_instance *tsi = tsu->tsu_instance;
529 
530 	/* Called with hold of tsi->tsi_lock */
531 	LASSERT(list_empty(&rpc->crpc_list));
532 	list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
533 }
534 
535 static inline int
sfw_test_buffers(struct sfw_test_instance * tsi)536 sfw_test_buffers(struct sfw_test_instance *tsi)
537 {
538 	struct sfw_test_case *tsc;
539 	struct srpc_service *svc;
540 	int nbuf;
541 
542 	LASSERT(tsi);
543 	tsc = sfw_find_test_case(tsi->tsi_service);
544 	LASSERT(tsc);
545 	svc = tsc->tsc_srv_service;
546 	LASSERT(svc);
547 
548 	nbuf = min(svc->sv_wi_total, tsi->tsi_loop) / svc->sv_ncpts;
549 	return max(SFW_TEST_WI_MIN, nbuf + SFW_TEST_WI_EXTRA);
550 }
551 
552 static int
sfw_load_test(struct sfw_test_instance * tsi)553 sfw_load_test(struct sfw_test_instance *tsi)
554 {
555 	struct sfw_test_case *tsc;
556 	struct srpc_service *svc;
557 	int nbuf;
558 	int rc;
559 
560 	LASSERT(tsi);
561 	tsc = sfw_find_test_case(tsi->tsi_service);
562 	nbuf = sfw_test_buffers(tsi);
563 	LASSERT(tsc);
564 	svc = tsc->tsc_srv_service;
565 
566 	if (tsi->tsi_is_client) {
567 		tsi->tsi_ops = tsc->tsc_cli_ops;
568 		return 0;
569 	}
570 
571 	rc = srpc_service_add_buffers(svc, nbuf);
572 	if (rc) {
573 		CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
574 		      svc->sv_name, nbuf, rc);
575 		/*
576 		 * NB: this error handler is not strictly correct, because
577 		 * it may release more buffers than already allocated,
578 		 * but it doesn't matter because request portal should
579 		 * be lazy portal and will grow buffers if necessary.
580 		 */
581 		srpc_service_remove_buffers(svc, nbuf);
582 		return -ENOMEM;
583 	}
584 
585 	CDEBUG(D_NET, "Reserved %d buffers for test %s\n",
586 	       nbuf * (srpc_serv_is_framework(svc) ?
587 		       2 : cfs_cpt_number(cfs_cpt_table)), svc->sv_name);
588 	return 0;
589 }
590 
591 static void
sfw_unload_test(struct sfw_test_instance * tsi)592 sfw_unload_test(struct sfw_test_instance *tsi)
593 {
594 	struct sfw_test_case *tsc;
595 
596 	LASSERT(tsi);
597 	tsc = sfw_find_test_case(tsi->tsi_service);
598 	LASSERT(tsc);
599 
600 	if (tsi->tsi_is_client)
601 		return;
602 
603 	/*
604 	 * shrink buffers, because request portal is lazy portal
605 	 * which can grow buffers at runtime so we may leave
606 	 * some buffers behind, but never mind...
607 	 */
608 	srpc_service_remove_buffers(tsc->tsc_srv_service,
609 				    sfw_test_buffers(tsi));
610 }
611 
612 static void
sfw_destroy_test_instance(struct sfw_test_instance * tsi)613 sfw_destroy_test_instance(struct sfw_test_instance *tsi)
614 {
615 	struct srpc_client_rpc *rpc;
616 	struct sfw_test_unit *tsu;
617 
618 	if (!tsi->tsi_is_client)
619 		goto clean;
620 
621 	tsi->tsi_ops->tso_fini(tsi);
622 
623 	LASSERT(!tsi->tsi_stopping);
624 	LASSERT(list_empty(&tsi->tsi_active_rpcs));
625 	LASSERT(!sfw_test_active(tsi));
626 
627 	while (!list_empty(&tsi->tsi_units)) {
628 		tsu = list_entry(tsi->tsi_units.next,
629 				 struct sfw_test_unit, tsu_list);
630 		list_del(&tsu->tsu_list);
631 		LIBCFS_FREE(tsu, sizeof(*tsu));
632 	}
633 
634 	while (!list_empty(&tsi->tsi_free_rpcs)) {
635 		rpc = list_entry(tsi->tsi_free_rpcs.next,
636 				 struct srpc_client_rpc, crpc_list);
637 		list_del(&rpc->crpc_list);
638 		LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
639 	}
640 
641 clean:
642 	sfw_unload_test(tsi);
643 	LIBCFS_FREE(tsi, sizeof(*tsi));
644 }
645 
646 static void
sfw_destroy_batch(struct sfw_batch * tsb)647 sfw_destroy_batch(struct sfw_batch *tsb)
648 {
649 	struct sfw_test_instance *tsi;
650 
651 	LASSERT(!sfw_batch_active(tsb));
652 	LASSERT(list_empty(&tsb->bat_list));
653 
654 	while (!list_empty(&tsb->bat_tests)) {
655 		tsi = list_entry(tsb->bat_tests.next,
656 				 struct sfw_test_instance, tsi_list);
657 		list_del_init(&tsi->tsi_list);
658 		sfw_destroy_test_instance(tsi);
659 	}
660 
661 	LIBCFS_FREE(tsb, sizeof(struct sfw_batch));
662 }
663 
664 void
sfw_destroy_session(struct sfw_session * sn)665 sfw_destroy_session(struct sfw_session *sn)
666 {
667 	struct sfw_batch *batch;
668 
669 	LASSERT(list_empty(&sn->sn_list));
670 	LASSERT(sn != sfw_data.fw_session);
671 
672 	while (!list_empty(&sn->sn_batches)) {
673 		batch = list_entry(sn->sn_batches.next,
674 				   struct sfw_batch, bat_list);
675 		list_del_init(&batch->bat_list);
676 		sfw_destroy_batch(batch);
677 	}
678 
679 	LIBCFS_FREE(sn, sizeof(*sn));
680 	atomic_dec(&sfw_data.fw_nzombies);
681 }
682 
683 static void
sfw_unpack_addtest_req(struct srpc_msg * msg)684 sfw_unpack_addtest_req(struct srpc_msg *msg)
685 {
686 	struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
687 
688 	LASSERT(msg->msg_type == SRPC_MSG_TEST_REQST);
689 	LASSERT(req->tsr_is_client);
690 
691 	if (msg->msg_magic == SRPC_MSG_MAGIC)
692 		return;	/* no flipping needed */
693 
694 	LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
695 
696 	if (req->tsr_service == SRPC_SERVICE_BRW) {
697 		if (!(msg->msg_ses_feats & LST_FEAT_BULK_LEN)) {
698 			struct test_bulk_req *bulk = &req->tsr_u.bulk_v0;
699 
700 			__swab32s(&bulk->blk_opc);
701 			__swab32s(&bulk->blk_npg);
702 			__swab32s(&bulk->blk_flags);
703 
704 		} else {
705 			struct test_bulk_req_v1 *bulk = &req->tsr_u.bulk_v1;
706 
707 			__swab16s(&bulk->blk_opc);
708 			__swab16s(&bulk->blk_flags);
709 			__swab32s(&bulk->blk_offset);
710 			__swab32s(&bulk->blk_len);
711 		}
712 
713 		return;
714 	}
715 
716 	if (req->tsr_service == SRPC_SERVICE_PING) {
717 		struct test_ping_req *ping = &req->tsr_u.ping;
718 
719 		__swab32s(&ping->png_size);
720 		__swab32s(&ping->png_flags);
721 		return;
722 	}
723 
724 	LBUG();
725 }
726 
727 static int
sfw_add_test_instance(struct sfw_batch * tsb,struct srpc_server_rpc * rpc)728 sfw_add_test_instance(struct sfw_batch *tsb, struct srpc_server_rpc *rpc)
729 {
730 	struct srpc_msg *msg = &rpc->srpc_reqstbuf->buf_msg;
731 	struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
732 	struct srpc_bulk *bk = rpc->srpc_bulk;
733 	int ndest = req->tsr_ndest;
734 	struct sfw_test_unit *tsu;
735 	struct sfw_test_instance *tsi;
736 	int i;
737 	int rc;
738 
739 	LIBCFS_ALLOC(tsi, sizeof(*tsi));
740 	if (!tsi) {
741 		CERROR("Can't allocate test instance for batch: %llu\n",
742 		       tsb->bat_id.bat_id);
743 		return -ENOMEM;
744 	}
745 
746 	spin_lock_init(&tsi->tsi_lock);
747 	atomic_set(&tsi->tsi_nactive, 0);
748 	INIT_LIST_HEAD(&tsi->tsi_units);
749 	INIT_LIST_HEAD(&tsi->tsi_free_rpcs);
750 	INIT_LIST_HEAD(&tsi->tsi_active_rpcs);
751 
752 	tsi->tsi_stopping = 0;
753 	tsi->tsi_batch = tsb;
754 	tsi->tsi_loop = req->tsr_loop;
755 	tsi->tsi_concur = req->tsr_concur;
756 	tsi->tsi_service = req->tsr_service;
757 	tsi->tsi_is_client = !!(req->tsr_is_client);
758 	tsi->tsi_stoptsu_onerr = !!(req->tsr_stop_onerr);
759 
760 	rc = sfw_load_test(tsi);
761 	if (rc) {
762 		LIBCFS_FREE(tsi, sizeof(*tsi));
763 		return rc;
764 	}
765 
766 	LASSERT(!sfw_batch_active(tsb));
767 
768 	if (!tsi->tsi_is_client) {
769 		/* it's test server, just add it to tsb */
770 		list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
771 		return 0;
772 	}
773 
774 	LASSERT(bk);
775 	LASSERT(bk->bk_niov * SFW_ID_PER_PAGE >= (unsigned int)ndest);
776 	LASSERT((unsigned int)bk->bk_len >=
777 		sizeof(lnet_process_id_packed_t) * ndest);
778 
779 	sfw_unpack_addtest_req(msg);
780 	memcpy(&tsi->tsi_u, &req->tsr_u, sizeof(tsi->tsi_u));
781 
782 	for (i = 0; i < ndest; i++) {
783 		lnet_process_id_packed_t *dests;
784 		lnet_process_id_packed_t id;
785 		int j;
786 
787 		dests = page_address(bk->bk_iovs[i / SFW_ID_PER_PAGE].bv_page);
788 		LASSERT(dests);  /* my pages are within KVM always */
789 		id = dests[i % SFW_ID_PER_PAGE];
790 		if (msg->msg_magic != SRPC_MSG_MAGIC)
791 			sfw_unpack_id(id);
792 
793 		for (j = 0; j < tsi->tsi_concur; j++) {
794 			LIBCFS_ALLOC(tsu, sizeof(struct sfw_test_unit));
795 			if (!tsu) {
796 				rc = -ENOMEM;
797 				CERROR("Can't allocate tsu for %d\n",
798 				       tsi->tsi_service);
799 				goto error;
800 			}
801 
802 			tsu->tsu_dest.nid = id.nid;
803 			tsu->tsu_dest.pid = id.pid;
804 			tsu->tsu_instance = tsi;
805 			tsu->tsu_private = NULL;
806 			list_add_tail(&tsu->tsu_list, &tsi->tsi_units);
807 		}
808 	}
809 
810 	rc = tsi->tsi_ops->tso_init(tsi);
811 	if (!rc) {
812 		list_add_tail(&tsi->tsi_list, &tsb->bat_tests);
813 		return 0;
814 	}
815 
816 error:
817 	LASSERT(rc);
818 	sfw_destroy_test_instance(tsi);
819 	return rc;
820 }
821 
822 static void
sfw_test_unit_done(struct sfw_test_unit * tsu)823 sfw_test_unit_done(struct sfw_test_unit *tsu)
824 {
825 	struct sfw_test_instance *tsi = tsu->tsu_instance;
826 	struct sfw_batch *tsb = tsi->tsi_batch;
827 	struct sfw_session *sn = tsb->bat_session;
828 
829 	LASSERT(sfw_test_active(tsi));
830 
831 	if (!atomic_dec_and_test(&tsi->tsi_nactive))
832 		return;
833 
834 	/* the test instance is done */
835 	spin_lock(&tsi->tsi_lock);
836 
837 	tsi->tsi_stopping = 0;
838 
839 	spin_unlock(&tsi->tsi_lock);
840 
841 	spin_lock(&sfw_data.fw_lock);
842 
843 	if (!atomic_dec_and_test(&tsb->bat_nactive) ||	/* tsb still active */
844 	    sn == sfw_data.fw_session) {		/* sn also active */
845 		spin_unlock(&sfw_data.fw_lock);
846 		return;
847 	}
848 
849 	LASSERT(!list_empty(&sn->sn_list)); /* I'm a zombie! */
850 
851 	list_for_each_entry(tsb, &sn->sn_batches, bat_list) {
852 		if (sfw_batch_active(tsb)) {
853 			spin_unlock(&sfw_data.fw_lock);
854 			return;
855 		}
856 	}
857 
858 	list_del_init(&sn->sn_list);
859 	spin_unlock(&sfw_data.fw_lock);
860 
861 	sfw_destroy_session(sn);
862 }
863 
864 static void
sfw_test_rpc_done(struct srpc_client_rpc * rpc)865 sfw_test_rpc_done(struct srpc_client_rpc *rpc)
866 {
867 	struct sfw_test_unit *tsu = rpc->crpc_priv;
868 	struct sfw_test_instance *tsi = tsu->tsu_instance;
869 	int done = 0;
870 
871 	tsi->tsi_ops->tso_done_rpc(tsu, rpc);
872 
873 	spin_lock(&tsi->tsi_lock);
874 
875 	LASSERT(sfw_test_active(tsi));
876 	LASSERT(!list_empty(&rpc->crpc_list));
877 
878 	list_del_init(&rpc->crpc_list);
879 
880 	/* batch is stopping or loop is done or get error */
881 	if (tsi->tsi_stopping || !tsu->tsu_loop ||
882 	    (rpc->crpc_status && tsi->tsi_stoptsu_onerr))
883 		done = 1;
884 
885 	/* dec ref for poster */
886 	srpc_client_rpc_decref(rpc);
887 
888 	spin_unlock(&tsi->tsi_lock);
889 
890 	if (!done) {
891 		swi_schedule_workitem(&tsu->tsu_worker);
892 		return;
893 	}
894 
895 	sfw_test_unit_done(tsu);
896 }
897 
898 int
sfw_create_test_rpc(struct sfw_test_unit * tsu,lnet_process_id_t peer,unsigned features,int nblk,int blklen,struct srpc_client_rpc ** rpcpp)899 sfw_create_test_rpc(struct sfw_test_unit *tsu, lnet_process_id_t peer,
900 		    unsigned features, int nblk, int blklen,
901 		    struct srpc_client_rpc **rpcpp)
902 {
903 	struct srpc_client_rpc *rpc = NULL;
904 	struct sfw_test_instance *tsi = tsu->tsu_instance;
905 
906 	spin_lock(&tsi->tsi_lock);
907 
908 	LASSERT(sfw_test_active(tsi));
909 		/* pick request from buffer */
910 	rpc = list_first_entry_or_null(&tsi->tsi_free_rpcs,
911 				       struct srpc_client_rpc, crpc_list);
912 	if (rpc) {
913 		LASSERT(nblk == rpc->crpc_bulk.bk_niov);
914 		list_del_init(&rpc->crpc_list);
915 	}
916 
917 	spin_unlock(&tsi->tsi_lock);
918 
919 	if (!rpc) {
920 		rpc = srpc_create_client_rpc(peer, tsi->tsi_service, nblk,
921 					     blklen, sfw_test_rpc_done,
922 					     sfw_test_rpc_fini, tsu);
923 	} else {
924 		srpc_init_client_rpc(rpc, peer, tsi->tsi_service, nblk,
925 				     blklen, sfw_test_rpc_done,
926 				     sfw_test_rpc_fini, tsu);
927 	}
928 
929 	if (!rpc) {
930 		CERROR("Can't create rpc for test %d\n", tsi->tsi_service);
931 		return -ENOMEM;
932 	}
933 
934 	rpc->crpc_reqstmsg.msg_ses_feats = features;
935 	*rpcpp = rpc;
936 
937 	return 0;
938 }
939 
940 static int
sfw_run_test(struct swi_workitem * wi)941 sfw_run_test(struct swi_workitem *wi)
942 {
943 	struct sfw_test_unit *tsu = wi->swi_workitem.wi_data;
944 	struct sfw_test_instance *tsi = tsu->tsu_instance;
945 	struct srpc_client_rpc *rpc = NULL;
946 
947 	LASSERT(wi == &tsu->tsu_worker);
948 
949 	if (tsi->tsi_ops->tso_prep_rpc(tsu, tsu->tsu_dest, &rpc)) {
950 		LASSERT(!rpc);
951 		goto test_done;
952 	}
953 
954 	LASSERT(rpc);
955 
956 	spin_lock(&tsi->tsi_lock);
957 
958 	if (tsi->tsi_stopping) {
959 		list_add(&rpc->crpc_list, &tsi->tsi_free_rpcs);
960 		spin_unlock(&tsi->tsi_lock);
961 		goto test_done;
962 	}
963 
964 	if (tsu->tsu_loop > 0)
965 		tsu->tsu_loop--;
966 
967 	list_add_tail(&rpc->crpc_list, &tsi->tsi_active_rpcs);
968 	spin_unlock(&tsi->tsi_lock);
969 
970 	spin_lock(&rpc->crpc_lock);
971 	rpc->crpc_timeout = rpc_timeout;
972 	srpc_post_rpc(rpc);
973 	spin_unlock(&rpc->crpc_lock);
974 	return 0;
975 
976 test_done:
977 	/*
978 	 * No one can schedule me now since:
979 	 * - previous RPC, if any, has done and
980 	 * - no new RPC is initiated.
981 	 * - my batch is still active; no one can run it again now.
982 	 * Cancel pending schedules and prevent future schedule attempts:
983 	 */
984 	swi_exit_workitem(wi);
985 	sfw_test_unit_done(tsu);
986 	return 1;
987 }
988 
989 static int
sfw_run_batch(struct sfw_batch * tsb)990 sfw_run_batch(struct sfw_batch *tsb)
991 {
992 	struct swi_workitem *wi;
993 	struct sfw_test_unit *tsu;
994 	struct sfw_test_instance *tsi;
995 
996 	if (sfw_batch_active(tsb)) {
997 		CDEBUG(D_NET, "Batch already active: %llu (%d)\n",
998 		       tsb->bat_id.bat_id, atomic_read(&tsb->bat_nactive));
999 		return 0;
1000 	}
1001 
1002 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1003 		if (!tsi->tsi_is_client) /* skip server instances */
1004 			continue;
1005 
1006 		LASSERT(!tsi->tsi_stopping);
1007 		LASSERT(!sfw_test_active(tsi));
1008 
1009 		atomic_inc(&tsb->bat_nactive);
1010 
1011 		list_for_each_entry(tsu, &tsi->tsi_units, tsu_list) {
1012 			atomic_inc(&tsi->tsi_nactive);
1013 			tsu->tsu_loop = tsi->tsi_loop;
1014 			wi = &tsu->tsu_worker;
1015 			swi_init_workitem(wi, tsu, sfw_run_test,
1016 					  lst_sched_test[lnet_cpt_of_nid(tsu->tsu_dest.nid)]);
1017 			swi_schedule_workitem(wi);
1018 		}
1019 	}
1020 
1021 	return 0;
1022 }
1023 
1024 int
sfw_stop_batch(struct sfw_batch * tsb,int force)1025 sfw_stop_batch(struct sfw_batch *tsb, int force)
1026 {
1027 	struct sfw_test_instance *tsi;
1028 	struct srpc_client_rpc *rpc;
1029 
1030 	if (!sfw_batch_active(tsb)) {
1031 		CDEBUG(D_NET, "Batch %llu inactive\n", tsb->bat_id.bat_id);
1032 		return 0;
1033 	}
1034 
1035 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1036 		spin_lock(&tsi->tsi_lock);
1037 
1038 		if (!tsi->tsi_is_client ||
1039 		    !sfw_test_active(tsi) || tsi->tsi_stopping) {
1040 			spin_unlock(&tsi->tsi_lock);
1041 			continue;
1042 		}
1043 
1044 		tsi->tsi_stopping = 1;
1045 
1046 		if (!force) {
1047 			spin_unlock(&tsi->tsi_lock);
1048 			continue;
1049 		}
1050 
1051 		/* abort launched rpcs in the test */
1052 		list_for_each_entry(rpc, &tsi->tsi_active_rpcs, crpc_list) {
1053 			spin_lock(&rpc->crpc_lock);
1054 
1055 			srpc_abort_rpc(rpc, -EINTR);
1056 
1057 			spin_unlock(&rpc->crpc_lock);
1058 		}
1059 
1060 		spin_unlock(&tsi->tsi_lock);
1061 	}
1062 
1063 	return 0;
1064 }
1065 
1066 static int
sfw_query_batch(struct sfw_batch * tsb,int testidx,struct srpc_batch_reply * reply)1067 sfw_query_batch(struct sfw_batch *tsb, int testidx, struct srpc_batch_reply *reply)
1068 {
1069 	struct sfw_test_instance *tsi;
1070 
1071 	if (testidx < 0)
1072 		return -EINVAL;
1073 
1074 	if (!testidx) {
1075 		reply->bar_active = atomic_read(&tsb->bat_nactive);
1076 		return 0;
1077 	}
1078 
1079 	list_for_each_entry(tsi, &tsb->bat_tests, tsi_list) {
1080 		if (testidx-- > 1)
1081 			continue;
1082 
1083 		reply->bar_active = atomic_read(&tsi->tsi_nactive);
1084 		return 0;
1085 	}
1086 
1087 	return -ENOENT;
1088 }
1089 
1090 void
sfw_free_pages(struct srpc_server_rpc * rpc)1091 sfw_free_pages(struct srpc_server_rpc *rpc)
1092 {
1093 	srpc_free_bulk(rpc->srpc_bulk);
1094 	rpc->srpc_bulk = NULL;
1095 }
1096 
1097 int
sfw_alloc_pages(struct srpc_server_rpc * rpc,int cpt,int npages,int len,int sink)1098 sfw_alloc_pages(struct srpc_server_rpc *rpc, int cpt, int npages, int len,
1099 		int sink)
1100 {
1101 	LASSERT(!rpc->srpc_bulk);
1102 	LASSERT(npages > 0 && npages <= LNET_MAX_IOV);
1103 
1104 	rpc->srpc_bulk = srpc_alloc_bulk(cpt, npages, len, sink);
1105 	if (!rpc->srpc_bulk)
1106 		return -ENOMEM;
1107 
1108 	return 0;
1109 }
1110 
1111 static int
sfw_add_test(struct srpc_server_rpc * rpc)1112 sfw_add_test(struct srpc_server_rpc *rpc)
1113 {
1114 	struct sfw_session *sn = sfw_data.fw_session;
1115 	struct srpc_test_reply *reply = &rpc->srpc_replymsg.msg_body.tes_reply;
1116 	struct srpc_test_reqst *request;
1117 	int rc;
1118 	struct sfw_batch *bat;
1119 
1120 	request = &rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst;
1121 	reply->tsr_sid = !sn ? LST_INVALID_SID : sn->sn_id;
1122 
1123 	if (!request->tsr_loop ||
1124 	    !request->tsr_concur ||
1125 	    request->tsr_sid.ses_nid == LNET_NID_ANY ||
1126 	    request->tsr_ndest > SFW_MAX_NDESTS ||
1127 	    (request->tsr_is_client && !request->tsr_ndest) ||
1128 	    request->tsr_concur > SFW_MAX_CONCUR ||
1129 	    request->tsr_service > SRPC_SERVICE_MAX_ID ||
1130 	    request->tsr_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID) {
1131 		reply->tsr_status = EINVAL;
1132 		return 0;
1133 	}
1134 
1135 	if (!sn || !sfw_sid_equal(request->tsr_sid, sn->sn_id) ||
1136 	    !sfw_find_test_case(request->tsr_service)) {
1137 		reply->tsr_status = ENOENT;
1138 		return 0;
1139 	}
1140 
1141 	bat = sfw_bid2batch(request->tsr_bid);
1142 	if (!bat) {
1143 		CERROR("dropping RPC %s from %s under memory pressure\n",
1144 		       rpc->srpc_scd->scd_svc->sv_name,
1145 		       libcfs_id2str(rpc->srpc_peer));
1146 		return -ENOMEM;
1147 	}
1148 
1149 	if (sfw_batch_active(bat)) {
1150 		reply->tsr_status = EBUSY;
1151 		return 0;
1152 	}
1153 
1154 	if (request->tsr_is_client && !rpc->srpc_bulk) {
1155 		/* rpc will be resumed later in sfw_bulk_ready */
1156 		int npg = sfw_id_pages(request->tsr_ndest);
1157 		int len;
1158 
1159 		if (!(sn->sn_features & LST_FEAT_BULK_LEN)) {
1160 			len = npg * PAGE_SIZE;
1161 
1162 		} else {
1163 			len = sizeof(lnet_process_id_packed_t) *
1164 			      request->tsr_ndest;
1165 		}
1166 
1167 		return sfw_alloc_pages(rpc, CFS_CPT_ANY, npg, len, 1);
1168 	}
1169 
1170 	rc = sfw_add_test_instance(bat, rpc);
1171 	CDEBUG(!rc ? D_NET : D_WARNING,
1172 	       "%s test: sv %d %s, loop %d, concur %d, ndest %d\n",
1173 	       !rc ? "Added" : "Failed to add", request->tsr_service,
1174 	       request->tsr_is_client ? "client" : "server",
1175 	       request->tsr_loop, request->tsr_concur, request->tsr_ndest);
1176 
1177 	reply->tsr_status = (rc < 0) ? -rc : rc;
1178 	return 0;
1179 }
1180 
1181 static int
sfw_control_batch(struct srpc_batch_reqst * request,struct srpc_batch_reply * reply)1182 sfw_control_batch(struct srpc_batch_reqst *request, struct srpc_batch_reply *reply)
1183 {
1184 	struct sfw_session *sn = sfw_data.fw_session;
1185 	int rc = 0;
1186 	struct sfw_batch *bat;
1187 
1188 	reply->bar_sid = !sn ? LST_INVALID_SID : sn->sn_id;
1189 
1190 	if (!sn || !sfw_sid_equal(request->bar_sid, sn->sn_id)) {
1191 		reply->bar_status = ESRCH;
1192 		return 0;
1193 	}
1194 
1195 	bat = sfw_find_batch(request->bar_bid);
1196 	if (!bat) {
1197 		reply->bar_status = ENOENT;
1198 		return 0;
1199 	}
1200 
1201 	switch (request->bar_opc) {
1202 	case SRPC_BATCH_OPC_RUN:
1203 		rc = sfw_run_batch(bat);
1204 		break;
1205 
1206 	case SRPC_BATCH_OPC_STOP:
1207 		rc = sfw_stop_batch(bat, request->bar_arg);
1208 		break;
1209 
1210 	case SRPC_BATCH_OPC_QUERY:
1211 		rc = sfw_query_batch(bat, request->bar_testidx, reply);
1212 		break;
1213 
1214 	default:
1215 		return -EINVAL; /* drop it */
1216 	}
1217 
1218 	reply->bar_status = (rc < 0) ? -rc : rc;
1219 	return 0;
1220 }
1221 
1222 static int
sfw_handle_server_rpc(struct srpc_server_rpc * rpc)1223 sfw_handle_server_rpc(struct srpc_server_rpc *rpc)
1224 {
1225 	struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1226 	struct srpc_msg *reply = &rpc->srpc_replymsg;
1227 	struct srpc_msg *request = &rpc->srpc_reqstbuf->buf_msg;
1228 	unsigned features = LST_FEATS_MASK;
1229 	int rc = 0;
1230 
1231 	LASSERT(!sfw_data.fw_active_srpc);
1232 	LASSERT(sv->sv_id <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1233 
1234 	spin_lock(&sfw_data.fw_lock);
1235 
1236 	if (sfw_data.fw_shuttingdown) {
1237 		spin_unlock(&sfw_data.fw_lock);
1238 		return -ESHUTDOWN;
1239 	}
1240 
1241 	/* Remove timer to avoid racing with it or expiring active session */
1242 	if (sfw_del_session_timer()) {
1243 		CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1244 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1245 		spin_unlock(&sfw_data.fw_lock);
1246 		return -EAGAIN;
1247 	}
1248 
1249 	sfw_data.fw_active_srpc = rpc;
1250 	spin_unlock(&sfw_data.fw_lock);
1251 
1252 	sfw_unpack_message(request);
1253 	LASSERT(request->msg_type == srpc_service2request(sv->sv_id));
1254 
1255 	/* rpc module should have checked this */
1256 	LASSERT(request->msg_version == SRPC_MSG_VERSION);
1257 
1258 	if (sv->sv_id != SRPC_SERVICE_MAKE_SESSION &&
1259 	    sv->sv_id != SRPC_SERVICE_DEBUG) {
1260 		struct sfw_session *sn = sfw_data.fw_session;
1261 
1262 		if (sn &&
1263 		    sn->sn_features != request->msg_ses_feats) {
1264 			CNETERR("Features of framework RPC don't match features of current session: %x/%x\n",
1265 				request->msg_ses_feats, sn->sn_features);
1266 			reply->msg_body.reply.status = EPROTO;
1267 			reply->msg_body.reply.sid = sn->sn_id;
1268 			goto out;
1269 		}
1270 
1271 	} else if (request->msg_ses_feats & ~LST_FEATS_MASK) {
1272 		/*
1273 		 * NB: at this point, old version will ignore features and
1274 		 * create new session anyway, so console should be able
1275 		 * to handle this
1276 		 */
1277 		reply->msg_body.reply.status = EPROTO;
1278 		goto out;
1279 	}
1280 
1281 	switch (sv->sv_id) {
1282 	default:
1283 		LBUG();
1284 	case SRPC_SERVICE_TEST:
1285 		rc = sfw_add_test(rpc);
1286 		break;
1287 
1288 	case SRPC_SERVICE_BATCH:
1289 		rc = sfw_control_batch(&request->msg_body.bat_reqst,
1290 				       &reply->msg_body.bat_reply);
1291 		break;
1292 
1293 	case SRPC_SERVICE_QUERY_STAT:
1294 		rc = sfw_get_stats(&request->msg_body.stat_reqst,
1295 				   &reply->msg_body.stat_reply);
1296 		break;
1297 
1298 	case SRPC_SERVICE_DEBUG:
1299 		rc = sfw_debug_session(&request->msg_body.dbg_reqst,
1300 				       &reply->msg_body.dbg_reply);
1301 		break;
1302 
1303 	case SRPC_SERVICE_MAKE_SESSION:
1304 		rc = sfw_make_session(&request->msg_body.mksn_reqst,
1305 				      &reply->msg_body.mksn_reply);
1306 		break;
1307 
1308 	case SRPC_SERVICE_REMOVE_SESSION:
1309 		rc = sfw_remove_session(&request->msg_body.rmsn_reqst,
1310 					&reply->msg_body.rmsn_reply);
1311 		break;
1312 	}
1313 
1314 	if (sfw_data.fw_session)
1315 		features = sfw_data.fw_session->sn_features;
1316  out:
1317 	reply->msg_ses_feats = features;
1318 	rpc->srpc_done = sfw_server_rpc_done;
1319 	spin_lock(&sfw_data.fw_lock);
1320 
1321 	if (!sfw_data.fw_shuttingdown)
1322 		sfw_add_session_timer();
1323 
1324 	sfw_data.fw_active_srpc = NULL;
1325 	spin_unlock(&sfw_data.fw_lock);
1326 	return rc;
1327 }
1328 
1329 static int
sfw_bulk_ready(struct srpc_server_rpc * rpc,int status)1330 sfw_bulk_ready(struct srpc_server_rpc *rpc, int status)
1331 {
1332 	struct srpc_service *sv = rpc->srpc_scd->scd_svc;
1333 	int rc;
1334 
1335 	LASSERT(rpc->srpc_bulk);
1336 	LASSERT(sv->sv_id == SRPC_SERVICE_TEST);
1337 	LASSERT(!sfw_data.fw_active_srpc);
1338 	LASSERT(rpc->srpc_reqstbuf->buf_msg.msg_body.tes_reqst.tsr_is_client);
1339 
1340 	spin_lock(&sfw_data.fw_lock);
1341 
1342 	if (status) {
1343 		CERROR("Bulk transfer failed for RPC: service %s, peer %s, status %d\n",
1344 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer), status);
1345 		spin_unlock(&sfw_data.fw_lock);
1346 		return -EIO;
1347 	}
1348 
1349 	if (sfw_data.fw_shuttingdown) {
1350 		spin_unlock(&sfw_data.fw_lock);
1351 		return -ESHUTDOWN;
1352 	}
1353 
1354 	if (sfw_del_session_timer()) {
1355 		CERROR("dropping RPC %s from %s: racing with expiry timer\n",
1356 		       sv->sv_name, libcfs_id2str(rpc->srpc_peer));
1357 		spin_unlock(&sfw_data.fw_lock);
1358 		return -EAGAIN;
1359 	}
1360 
1361 	sfw_data.fw_active_srpc = rpc;
1362 	spin_unlock(&sfw_data.fw_lock);
1363 
1364 	rc = sfw_add_test(rpc);
1365 
1366 	spin_lock(&sfw_data.fw_lock);
1367 
1368 	if (!sfw_data.fw_shuttingdown)
1369 		sfw_add_session_timer();
1370 
1371 	sfw_data.fw_active_srpc = NULL;
1372 	spin_unlock(&sfw_data.fw_lock);
1373 	return rc;
1374 }
1375 
1376 struct srpc_client_rpc *
sfw_create_rpc(lnet_process_id_t peer,int service,unsigned features,int nbulkiov,int bulklen,void (* done)(struct srpc_client_rpc *),void * priv)1377 sfw_create_rpc(lnet_process_id_t peer, int service,
1378 	       unsigned features, int nbulkiov, int bulklen,
1379 	       void (*done)(struct srpc_client_rpc *), void *priv)
1380 {
1381 	struct srpc_client_rpc *rpc = NULL;
1382 
1383 	spin_lock(&sfw_data.fw_lock);
1384 
1385 	LASSERT(!sfw_data.fw_shuttingdown);
1386 	LASSERT(service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1387 
1388 	if (!nbulkiov && !list_empty(&sfw_data.fw_zombie_rpcs)) {
1389 		rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1390 				 struct srpc_client_rpc, crpc_list);
1391 		list_del(&rpc->crpc_list);
1392 
1393 		srpc_init_client_rpc(rpc, peer, service, 0, 0,
1394 				     done, sfw_client_rpc_fini, priv);
1395 	}
1396 
1397 	spin_unlock(&sfw_data.fw_lock);
1398 
1399 	if (!rpc) {
1400 		rpc = srpc_create_client_rpc(peer, service,
1401 					     nbulkiov, bulklen, done,
1402 					     nbulkiov ?  NULL :
1403 					     sfw_client_rpc_fini,
1404 					     priv);
1405 	}
1406 
1407 	if (rpc) /* "session" is concept in framework */
1408 		rpc->crpc_reqstmsg.msg_ses_feats = features;
1409 
1410 	return rpc;
1411 }
1412 
1413 void
sfw_unpack_message(struct srpc_msg * msg)1414 sfw_unpack_message(struct srpc_msg *msg)
1415 {
1416 	if (msg->msg_magic == SRPC_MSG_MAGIC)
1417 		return; /* no flipping needed */
1418 
1419 	/* srpc module should guarantee I wouldn't get crap */
1420 	LASSERT(msg->msg_magic == __swab32(SRPC_MSG_MAGIC));
1421 
1422 	if (msg->msg_type == SRPC_MSG_STAT_REQST) {
1423 		struct srpc_stat_reqst *req = &msg->msg_body.stat_reqst;
1424 
1425 		__swab32s(&req->str_type);
1426 		__swab64s(&req->str_rpyid);
1427 		sfw_unpack_sid(req->str_sid);
1428 		return;
1429 	}
1430 
1431 	if (msg->msg_type == SRPC_MSG_STAT_REPLY) {
1432 		struct srpc_stat_reply *rep = &msg->msg_body.stat_reply;
1433 
1434 		__swab32s(&rep->str_status);
1435 		sfw_unpack_sid(rep->str_sid);
1436 		sfw_unpack_fw_counters(rep->str_fw);
1437 		sfw_unpack_rpc_counters(rep->str_rpc);
1438 		sfw_unpack_lnet_counters(rep->str_lnet);
1439 		return;
1440 	}
1441 
1442 	if (msg->msg_type == SRPC_MSG_MKSN_REQST) {
1443 		struct srpc_mksn_reqst *req = &msg->msg_body.mksn_reqst;
1444 
1445 		__swab64s(&req->mksn_rpyid);
1446 		__swab32s(&req->mksn_force);
1447 		sfw_unpack_sid(req->mksn_sid);
1448 		return;
1449 	}
1450 
1451 	if (msg->msg_type == SRPC_MSG_MKSN_REPLY) {
1452 		struct srpc_mksn_reply *rep = &msg->msg_body.mksn_reply;
1453 
1454 		__swab32s(&rep->mksn_status);
1455 		__swab32s(&rep->mksn_timeout);
1456 		sfw_unpack_sid(rep->mksn_sid);
1457 		return;
1458 	}
1459 
1460 	if (msg->msg_type == SRPC_MSG_RMSN_REQST) {
1461 		struct srpc_rmsn_reqst *req = &msg->msg_body.rmsn_reqst;
1462 
1463 		__swab64s(&req->rmsn_rpyid);
1464 		sfw_unpack_sid(req->rmsn_sid);
1465 		return;
1466 	}
1467 
1468 	if (msg->msg_type == SRPC_MSG_RMSN_REPLY) {
1469 		struct srpc_rmsn_reply *rep = &msg->msg_body.rmsn_reply;
1470 
1471 		__swab32s(&rep->rmsn_status);
1472 		sfw_unpack_sid(rep->rmsn_sid);
1473 		return;
1474 	}
1475 
1476 	if (msg->msg_type == SRPC_MSG_DEBUG_REQST) {
1477 		struct srpc_debug_reqst *req = &msg->msg_body.dbg_reqst;
1478 
1479 		__swab64s(&req->dbg_rpyid);
1480 		__swab32s(&req->dbg_flags);
1481 		sfw_unpack_sid(req->dbg_sid);
1482 		return;
1483 	}
1484 
1485 	if (msg->msg_type == SRPC_MSG_DEBUG_REPLY) {
1486 		struct srpc_debug_reply *rep = &msg->msg_body.dbg_reply;
1487 
1488 		__swab32s(&rep->dbg_nbatch);
1489 		__swab32s(&rep->dbg_timeout);
1490 		sfw_unpack_sid(rep->dbg_sid);
1491 		return;
1492 	}
1493 
1494 	if (msg->msg_type == SRPC_MSG_BATCH_REQST) {
1495 		struct srpc_batch_reqst *req = &msg->msg_body.bat_reqst;
1496 
1497 		__swab32s(&req->bar_opc);
1498 		__swab64s(&req->bar_rpyid);
1499 		__swab32s(&req->bar_testidx);
1500 		__swab32s(&req->bar_arg);
1501 		sfw_unpack_sid(req->bar_sid);
1502 		__swab64s(&req->bar_bid.bat_id);
1503 		return;
1504 	}
1505 
1506 	if (msg->msg_type == SRPC_MSG_BATCH_REPLY) {
1507 		struct srpc_batch_reply *rep = &msg->msg_body.bat_reply;
1508 
1509 		__swab32s(&rep->bar_status);
1510 		sfw_unpack_sid(rep->bar_sid);
1511 		return;
1512 	}
1513 
1514 	if (msg->msg_type == SRPC_MSG_TEST_REQST) {
1515 		struct srpc_test_reqst *req = &msg->msg_body.tes_reqst;
1516 
1517 		__swab64s(&req->tsr_rpyid);
1518 		__swab64s(&req->tsr_bulkid);
1519 		__swab32s(&req->tsr_loop);
1520 		__swab32s(&req->tsr_ndest);
1521 		__swab32s(&req->tsr_concur);
1522 		__swab32s(&req->tsr_service);
1523 		sfw_unpack_sid(req->tsr_sid);
1524 		__swab64s(&req->tsr_bid.bat_id);
1525 		return;
1526 	}
1527 
1528 	if (msg->msg_type == SRPC_MSG_TEST_REPLY) {
1529 		struct srpc_test_reply *rep = &msg->msg_body.tes_reply;
1530 
1531 		__swab32s(&rep->tsr_status);
1532 		sfw_unpack_sid(rep->tsr_sid);
1533 		return;
1534 	}
1535 
1536 	if (msg->msg_type == SRPC_MSG_JOIN_REQST) {
1537 		struct srpc_join_reqst *req = &msg->msg_body.join_reqst;
1538 
1539 		__swab64s(&req->join_rpyid);
1540 		sfw_unpack_sid(req->join_sid);
1541 		return;
1542 	}
1543 
1544 	if (msg->msg_type == SRPC_MSG_JOIN_REPLY) {
1545 		struct srpc_join_reply *rep = &msg->msg_body.join_reply;
1546 
1547 		__swab32s(&rep->join_status);
1548 		__swab32s(&rep->join_timeout);
1549 		sfw_unpack_sid(rep->join_sid);
1550 		return;
1551 	}
1552 
1553 	LBUG();
1554 }
1555 
1556 void
sfw_abort_rpc(struct srpc_client_rpc * rpc)1557 sfw_abort_rpc(struct srpc_client_rpc *rpc)
1558 {
1559 	LASSERT(atomic_read(&rpc->crpc_refcount) > 0);
1560 	LASSERT(rpc->crpc_service <= SRPC_FRAMEWORK_SERVICE_MAX_ID);
1561 
1562 	spin_lock(&rpc->crpc_lock);
1563 	srpc_abort_rpc(rpc, -EINTR);
1564 	spin_unlock(&rpc->crpc_lock);
1565 }
1566 
1567 void
sfw_post_rpc(struct srpc_client_rpc * rpc)1568 sfw_post_rpc(struct srpc_client_rpc *rpc)
1569 {
1570 	spin_lock(&rpc->crpc_lock);
1571 
1572 	LASSERT(!rpc->crpc_closed);
1573 	LASSERT(!rpc->crpc_aborted);
1574 	LASSERT(list_empty(&rpc->crpc_list));
1575 	LASSERT(!sfw_data.fw_shuttingdown);
1576 
1577 	rpc->crpc_timeout = rpc_timeout;
1578 	srpc_post_rpc(rpc);
1579 
1580 	spin_unlock(&rpc->crpc_lock);
1581 }
1582 
1583 static struct srpc_service sfw_services[] = {
1584 	{
1585 		/* sv_id */    SRPC_SERVICE_DEBUG,
1586 		/* sv_name */  "debug",
1587 		0
1588 	},
1589 	{
1590 		/* sv_id */    SRPC_SERVICE_QUERY_STAT,
1591 		/* sv_name */  "query stats",
1592 		0
1593 	},
1594 	{
1595 		/* sv_id */    SRPC_SERVICE_MAKE_SESSION,
1596 		/* sv_name */  "make session",
1597 		0
1598 	},
1599 	{
1600 		/* sv_id */    SRPC_SERVICE_REMOVE_SESSION,
1601 		/* sv_name */  "remove session",
1602 		0
1603 	},
1604 	{
1605 		/* sv_id */    SRPC_SERVICE_BATCH,
1606 		/* sv_name */  "batch service",
1607 		0
1608 	},
1609 	{
1610 		/* sv_id */    SRPC_SERVICE_TEST,
1611 		/* sv_name */  "test service",
1612 		0
1613 	},
1614 	{
1615 		/* sv_id */    0,
1616 		/* sv_name */  NULL,
1617 		0
1618 	}
1619 };
1620 
1621 int
sfw_startup(void)1622 sfw_startup(void)
1623 {
1624 	int i;
1625 	int rc;
1626 	int error;
1627 	struct srpc_service *sv;
1628 	struct sfw_test_case *tsc;
1629 
1630 	if (session_timeout < 0) {
1631 		CERROR("Session timeout must be non-negative: %d\n",
1632 		       session_timeout);
1633 		return -EINVAL;
1634 	}
1635 
1636 	if (rpc_timeout < 0) {
1637 		CERROR("RPC timeout must be non-negative: %d\n",
1638 		       rpc_timeout);
1639 		return -EINVAL;
1640 	}
1641 
1642 	if (!session_timeout)
1643 		CWARN("Zero session_timeout specified - test sessions never expire.\n");
1644 
1645 	if (!rpc_timeout)
1646 		CWARN("Zero rpc_timeout specified - test RPC never expire.\n");
1647 
1648 	memset(&sfw_data, 0, sizeof(struct smoketest_framework));
1649 
1650 	sfw_data.fw_session = NULL;
1651 	sfw_data.fw_active_srpc = NULL;
1652 	spin_lock_init(&sfw_data.fw_lock);
1653 	atomic_set(&sfw_data.fw_nzombies, 0);
1654 	INIT_LIST_HEAD(&sfw_data.fw_tests);
1655 	INIT_LIST_HEAD(&sfw_data.fw_zombie_rpcs);
1656 	INIT_LIST_HEAD(&sfw_data.fw_zombie_sessions);
1657 
1658 	brw_init_test_client();
1659 	brw_init_test_service();
1660 	rc = sfw_register_test(&brw_test_service, &brw_test_client);
1661 	LASSERT(!rc);
1662 
1663 	ping_init_test_client();
1664 	ping_init_test_service();
1665 	rc = sfw_register_test(&ping_test_service, &ping_test_client);
1666 	LASSERT(!rc);
1667 
1668 	error = 0;
1669 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1670 		sv = tsc->tsc_srv_service;
1671 
1672 		rc = srpc_add_service(sv);
1673 		LASSERT(rc != -EBUSY);
1674 		if (rc) {
1675 			CWARN("Failed to add %s service: %d\n",
1676 			      sv->sv_name, rc);
1677 			error = rc;
1678 		}
1679 	}
1680 
1681 	for (i = 0; ; i++) {
1682 		sv = &sfw_services[i];
1683 		if (!sv->sv_name)
1684 			break;
1685 
1686 		sv->sv_bulk_ready = NULL;
1687 		sv->sv_handler = sfw_handle_server_rpc;
1688 		sv->sv_wi_total = SFW_FRWK_WI_MAX;
1689 		if (sv->sv_id == SRPC_SERVICE_TEST)
1690 			sv->sv_bulk_ready = sfw_bulk_ready;
1691 
1692 		rc = srpc_add_service(sv);
1693 		LASSERT(rc != -EBUSY);
1694 		if (rc) {
1695 			CWARN("Failed to add %s service: %d\n",
1696 			      sv->sv_name, rc);
1697 			error = rc;
1698 		}
1699 
1700 		/* about to sfw_shutdown, no need to add buffer */
1701 		if (error)
1702 			continue;
1703 
1704 		rc = srpc_service_add_buffers(sv, sv->sv_wi_total);
1705 		if (rc) {
1706 			CWARN("Failed to reserve enough buffers: service %s, %d needed: %d\n",
1707 			      sv->sv_name, sv->sv_wi_total, rc);
1708 			error = -ENOMEM;
1709 		}
1710 	}
1711 
1712 	if (error)
1713 		sfw_shutdown();
1714 	return error;
1715 }
1716 
1717 void
sfw_shutdown(void)1718 sfw_shutdown(void)
1719 {
1720 	struct srpc_service *sv;
1721 	struct sfw_test_case	*tsc;
1722 	int i;
1723 
1724 	spin_lock(&sfw_data.fw_lock);
1725 
1726 	sfw_data.fw_shuttingdown = 1;
1727 	lst_wait_until(!sfw_data.fw_active_srpc, sfw_data.fw_lock,
1728 		       "waiting for active RPC to finish.\n");
1729 
1730 	if (sfw_del_session_timer())
1731 		lst_wait_until(!sfw_data.fw_session, sfw_data.fw_lock,
1732 			       "waiting for session timer to explode.\n");
1733 
1734 	sfw_deactivate_session();
1735 	lst_wait_until(!atomic_read(&sfw_data.fw_nzombies),
1736 		       sfw_data.fw_lock,
1737 		       "waiting for %d zombie sessions to die.\n",
1738 		       atomic_read(&sfw_data.fw_nzombies));
1739 
1740 	spin_unlock(&sfw_data.fw_lock);
1741 
1742 	for (i = 0; ; i++) {
1743 		sv = &sfw_services[i];
1744 		if (!sv->sv_name)
1745 			break;
1746 
1747 		srpc_shutdown_service(sv);
1748 		srpc_remove_service(sv);
1749 	}
1750 
1751 	list_for_each_entry(tsc, &sfw_data.fw_tests, tsc_list) {
1752 		sv = tsc->tsc_srv_service;
1753 		srpc_shutdown_service(sv);
1754 		srpc_remove_service(sv);
1755 	}
1756 
1757 	while (!list_empty(&sfw_data.fw_zombie_rpcs)) {
1758 		struct srpc_client_rpc *rpc;
1759 
1760 		rpc = list_entry(sfw_data.fw_zombie_rpcs.next,
1761 				 struct srpc_client_rpc, crpc_list);
1762 		list_del(&rpc->crpc_list);
1763 
1764 		LIBCFS_FREE(rpc, srpc_client_rpc_size(rpc));
1765 	}
1766 
1767 	for (i = 0; ; i++) {
1768 		sv = &sfw_services[i];
1769 		if (!sv->sv_name)
1770 			break;
1771 
1772 		srpc_wait_service_shutdown(sv);
1773 	}
1774 
1775 	while (!list_empty(&sfw_data.fw_tests)) {
1776 		tsc = list_entry(sfw_data.fw_tests.next,
1777 				 struct sfw_test_case, tsc_list);
1778 
1779 		srpc_wait_service_shutdown(tsc->tsc_srv_service);
1780 
1781 		list_del(&tsc->tsc_list);
1782 		LIBCFS_FREE(tsc, sizeof(*tsc));
1783 	}
1784 }
1785