• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2014, Intel Corporation.
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  * Lustre is a trademark of Seagate, Inc.
28  *
29  * lnet/lnet/net_fault.c
30  *
31  * Lustre network fault simulation
32  *
33  * Author: liang.zhen@intel.com
34  */
35 
36 #define DEBUG_SUBSYSTEM S_LNET
37 
38 #include "../../include/linux/lnet/lib-lnet.h"
39 #include "../../include/linux/lnet/lnetctl.h"
40 
41 #define LNET_MSG_MASK		(LNET_PUT_BIT | LNET_ACK_BIT | \
42 				 LNET_GET_BIT | LNET_REPLY_BIT)
43 
44 struct lnet_drop_rule {
45 	/** link chain on the_lnet.ln_drop_rules */
46 	struct list_head	dr_link;
47 	/** attributes of this rule */
48 	struct lnet_fault_attr	dr_attr;
49 	/** lock to protect \a dr_drop_at and \a dr_stat */
50 	spinlock_t		dr_lock;
51 	/**
52 	 * the message sequence to drop, which means message is dropped when
53 	 * dr_stat.drs_count == dr_drop_at
54 	 */
55 	unsigned long		dr_drop_at;
56 	/**
57 	 * seconds to drop the next message, it's exclusive with dr_drop_at
58 	 */
59 	unsigned long		dr_drop_time;
60 	/** baseline to caculate dr_drop_time */
61 	unsigned long		dr_time_base;
62 	/** statistic of dropped messages */
63 	struct lnet_fault_stat	dr_stat;
64 };
65 
66 static bool
lnet_fault_nid_match(lnet_nid_t nid,lnet_nid_t msg_nid)67 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
68 {
69 	if (nid == msg_nid || nid == LNET_NID_ANY)
70 		return true;
71 
72 	if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
73 		return false;
74 
75 	/* 255.255.255.255@net is wildcard for all addresses in a network */
76 	return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
77 }
78 
79 static bool
lnet_fault_attr_match(struct lnet_fault_attr * attr,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal)80 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
81 		      lnet_nid_t dst, unsigned int type, unsigned int portal)
82 {
83 	if (!lnet_fault_nid_match(attr->fa_src, src) ||
84 	    !lnet_fault_nid_match(attr->fa_dst, dst))
85 		return false;
86 
87 	if (!(attr->fa_msg_mask & (1 << type)))
88 		return false;
89 
90 	/**
91 	 * NB: ACK and REPLY have no portal, but they should have been
92 	 * rejected by message mask
93 	 */
94 	if (attr->fa_ptl_mask && /* has portal filter */
95 	    !(attr->fa_ptl_mask & (1ULL << portal)))
96 		return false;
97 
98 	return true;
99 }
100 
101 static int
lnet_fault_attr_validate(struct lnet_fault_attr * attr)102 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 {
104 	if (!attr->fa_msg_mask)
105 		attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106 
107 	if (!attr->fa_ptl_mask) /* no portal filter */
108 		return 0;
109 
110 	/* NB: only PUT and GET can be filtered if portal filter has been set */
111 	attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
112 	if (!attr->fa_msg_mask) {
113 		CDEBUG(D_NET, "can't find valid message type bits %x\n",
114 		       attr->fa_msg_mask);
115 		return -EINVAL;
116 	}
117 	return 0;
118 }
119 
120 static void
lnet_fault_stat_inc(struct lnet_fault_stat * stat,unsigned int type)121 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 {
123 	/* NB: fs_counter is NOT updated by this function */
124 	switch (type) {
125 	case LNET_MSG_PUT:
126 		stat->fs_put++;
127 		return;
128 	case LNET_MSG_ACK:
129 		stat->fs_ack++;
130 		return;
131 	case LNET_MSG_GET:
132 		stat->fs_get++;
133 		return;
134 	case LNET_MSG_REPLY:
135 		stat->fs_reply++;
136 		return;
137 	}
138 }
139 
140 /**
141  * LNet message drop simulation
142  */
143 
144 /**
145  * Add a new drop rule to LNet
146  * There is no check for duplicated drop rule, all rules will be checked for
147  * incoming message.
148  */
149 static int
lnet_drop_rule_add(struct lnet_fault_attr * attr)150 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 {
152 	struct lnet_drop_rule *rule;
153 
154 	if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
155 		CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
156 		       attr->u.drop.da_rate, attr->u.drop.da_interval);
157 		return -EINVAL;
158 	}
159 
160 	if (lnet_fault_attr_validate(attr))
161 		return -EINVAL;
162 
163 	CFS_ALLOC_PTR(rule);
164 	if (!rule)
165 		return -ENOMEM;
166 
167 	spin_lock_init(&rule->dr_lock);
168 
169 	rule->dr_attr = *attr;
170 	if (attr->u.drop.da_interval) {
171 		rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
172 		rule->dr_drop_time = cfs_time_shift(cfs_rand() %
173 						    attr->u.drop.da_interval);
174 	} else {
175 		rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
176 	}
177 
178 	lnet_net_lock(LNET_LOCK_EX);
179 	list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
180 	lnet_net_unlock(LNET_LOCK_EX);
181 
182 	CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
183 	       libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
184 	       attr->u.drop.da_rate, attr->u.drop.da_interval);
185 	return 0;
186 }
187 
188 /**
189  * Remove matched drop rules from lnet, all rules that can match \a src and
190  * \a dst will be removed.
191  * If \a src is zero, then all rules have \a dst as destination will be remove
192  * If \a dst is zero, then all rules have \a src as source will be removed
193  * If both of them are zero, all rules will be removed
194  */
195 static int
lnet_drop_rule_del(lnet_nid_t src,lnet_nid_t dst)196 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
197 {
198 	struct lnet_drop_rule *rule;
199 	struct lnet_drop_rule *tmp;
200 	struct list_head zombies;
201 	int n = 0;
202 
203 	INIT_LIST_HEAD(&zombies);
204 
205 	lnet_net_lock(LNET_LOCK_EX);
206 	list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
207 		if (rule->dr_attr.fa_src != src && src)
208 			continue;
209 
210 		if (rule->dr_attr.fa_dst != dst && dst)
211 			continue;
212 
213 		list_move(&rule->dr_link, &zombies);
214 	}
215 	lnet_net_unlock(LNET_LOCK_EX);
216 
217 	list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
218 		CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
219 		       libcfs_nid2str(rule->dr_attr.fa_src),
220 		       libcfs_nid2str(rule->dr_attr.fa_dst),
221 		       rule->dr_attr.u.drop.da_rate,
222 		       rule->dr_attr.u.drop.da_interval);
223 
224 		list_del(&rule->dr_link);
225 		CFS_FREE_PTR(rule);
226 		n++;
227 	}
228 
229 	return n;
230 }
231 
232 /**
233  * List drop rule at position of \a pos
234  */
235 static int
lnet_drop_rule_list(int pos,struct lnet_fault_attr * attr,struct lnet_fault_stat * stat)236 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
237 		    struct lnet_fault_stat *stat)
238 {
239 	struct lnet_drop_rule *rule;
240 	int cpt;
241 	int i = 0;
242 	int rc = -ENOENT;
243 
244 	cpt = lnet_net_lock_current();
245 	list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
246 		if (i++ < pos)
247 			continue;
248 
249 		spin_lock(&rule->dr_lock);
250 		*attr = rule->dr_attr;
251 		*stat = rule->dr_stat;
252 		spin_unlock(&rule->dr_lock);
253 		rc = 0;
254 		break;
255 	}
256 
257 	lnet_net_unlock(cpt);
258 	return rc;
259 }
260 
261 /**
262  * reset counters for all drop rules
263  */
264 static void
lnet_drop_rule_reset(void)265 lnet_drop_rule_reset(void)
266 {
267 	struct lnet_drop_rule *rule;
268 	int cpt;
269 
270 	cpt = lnet_net_lock_current();
271 
272 	list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
273 		struct lnet_fault_attr *attr = &rule->dr_attr;
274 
275 		spin_lock(&rule->dr_lock);
276 
277 		memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
278 		if (attr->u.drop.da_rate) {
279 			rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
280 		} else {
281 			rule->dr_drop_time = cfs_time_shift(cfs_rand() %
282 						attr->u.drop.da_interval);
283 			rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
284 		}
285 		spin_unlock(&rule->dr_lock);
286 	}
287 
288 	lnet_net_unlock(cpt);
289 }
290 
291 /**
292  * check source/destination NID, portal, message type and drop rate,
293  * decide whether should drop this message or not
294  */
295 static bool
drop_rule_match(struct lnet_drop_rule * rule,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal)296 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
297 		lnet_nid_t dst, unsigned int type, unsigned int portal)
298 {
299 	struct lnet_fault_attr *attr = &rule->dr_attr;
300 	bool drop;
301 
302 	if (!lnet_fault_attr_match(attr, src, dst, type, portal))
303 		return false;
304 
305 	/* match this rule, check drop rate now */
306 	spin_lock(&rule->dr_lock);
307 	if (rule->dr_drop_time) { /* time based drop */
308 		unsigned long now = cfs_time_current();
309 
310 		rule->dr_stat.fs_count++;
311 		drop = cfs_time_aftereq(now, rule->dr_drop_time);
312 		if (drop) {
313 			if (cfs_time_after(now, rule->dr_time_base))
314 				rule->dr_time_base = now;
315 
316 			rule->dr_drop_time = rule->dr_time_base +
317 					     cfs_time_seconds(cfs_rand() %
318 						attr->u.drop.da_interval);
319 			rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
320 
321 			CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
322 			       libcfs_nid2str(attr->fa_src),
323 			       libcfs_nid2str(attr->fa_dst),
324 			       rule->dr_drop_time);
325 		}
326 
327 	} else { /* rate based drop */
328 		drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
329 
330 		if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
331 			rule->dr_drop_at = rule->dr_stat.fs_count +
332 					   cfs_rand() % attr->u.drop.da_rate;
333 			CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
334 			       libcfs_nid2str(attr->fa_src),
335 			       libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
336 		}
337 	}
338 
339 	if (drop) { /* drop this message, update counters */
340 		lnet_fault_stat_inc(&rule->dr_stat, type);
341 		rule->dr_stat.u.drop.ds_dropped++;
342 	}
343 
344 	spin_unlock(&rule->dr_lock);
345 	return drop;
346 }
347 
348 /**
349  * Check if message from \a src to \a dst can match any existed drop rule
350  */
351 bool
lnet_drop_rule_match(lnet_hdr_t * hdr)352 lnet_drop_rule_match(lnet_hdr_t *hdr)
353 {
354 	struct lnet_drop_rule *rule;
355 	lnet_nid_t src = le64_to_cpu(hdr->src_nid);
356 	lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
357 	unsigned int typ = le32_to_cpu(hdr->type);
358 	unsigned int ptl = -1;
359 	bool drop = false;
360 	int cpt;
361 
362 	/**
363 	 * NB: if Portal is specified, then only PUT and GET will be
364 	 * filtered by drop rule
365 	 */
366 	if (typ == LNET_MSG_PUT)
367 		ptl = le32_to_cpu(hdr->msg.put.ptl_index);
368 	else if (typ == LNET_MSG_GET)
369 		ptl = le32_to_cpu(hdr->msg.get.ptl_index);
370 
371 	cpt = lnet_net_lock_current();
372 	list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
373 		drop = drop_rule_match(rule, src, dst, typ, ptl);
374 		if (drop)
375 			break;
376 	}
377 
378 	lnet_net_unlock(cpt);
379 	return drop;
380 }
381 
382 /**
383  * LNet Delay Simulation
384  */
385 /** timestamp (second) to send delayed message */
386 #define msg_delay_send		 msg_ev.hdr_data
387 
388 struct lnet_delay_rule {
389 	/** link chain on the_lnet.ln_delay_rules */
390 	struct list_head	dl_link;
391 	/** link chain on delay_dd.dd_sched_rules */
392 	struct list_head	dl_sched_link;
393 	/** attributes of this rule */
394 	struct lnet_fault_attr	dl_attr;
395 	/** lock to protect \a below members */
396 	spinlock_t		dl_lock;
397 	/** refcount of delay rule */
398 	atomic_t		dl_refcount;
399 	/**
400 	 * the message sequence to delay, which means message is delayed when
401 	 * dl_stat.fs_count == dl_delay_at
402 	 */
403 	unsigned long		dl_delay_at;
404 	/**
405 	 * seconds to delay the next message, it's exclusive with dl_delay_at
406 	 */
407 	unsigned long		dl_delay_time;
408 	/** baseline to caculate dl_delay_time */
409 	unsigned long		dl_time_base;
410 	/** jiffies to send the next delayed message */
411 	unsigned long		dl_msg_send;
412 	/** delayed message list */
413 	struct list_head	dl_msg_list;
414 	/** statistic of delayed messages */
415 	struct lnet_fault_stat	dl_stat;
416 	/** timer to wakeup delay_daemon */
417 	struct timer_list	dl_timer;
418 };
419 
420 struct delay_daemon_data {
421 	/** serialise rule add/remove */
422 	struct mutex		dd_mutex;
423 	/** protect rules on \a dd_sched_rules */
424 	spinlock_t		dd_lock;
425 	/** scheduled delay rules (by timer) */
426 	struct list_head	dd_sched_rules;
427 	/** daemon thread sleeps at here */
428 	wait_queue_head_t	dd_waitq;
429 	/** controller (lctl command) wait at here */
430 	wait_queue_head_t	dd_ctl_waitq;
431 	/** daemon is running */
432 	unsigned int		dd_running;
433 	/** daemon stopped */
434 	unsigned int		dd_stopped;
435 };
436 
437 static struct delay_daemon_data	delay_dd;
438 
439 static unsigned long
round_timeout(unsigned long timeout)440 round_timeout(unsigned long timeout)
441 {
442 	return cfs_time_seconds((unsigned int)
443 			cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
444 }
445 
446 static void
delay_rule_decref(struct lnet_delay_rule * rule)447 delay_rule_decref(struct lnet_delay_rule *rule)
448 {
449 	if (atomic_dec_and_test(&rule->dl_refcount)) {
450 		LASSERT(list_empty(&rule->dl_sched_link));
451 		LASSERT(list_empty(&rule->dl_msg_list));
452 		LASSERT(list_empty(&rule->dl_link));
453 
454 		CFS_FREE_PTR(rule);
455 	}
456 }
457 
458 /**
459  * check source/destination NID, portal, message type and delay rate,
460  * decide whether should delay this message or not
461  */
462 static bool
delay_rule_match(struct lnet_delay_rule * rule,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal,struct lnet_msg * msg)463 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
464 		 lnet_nid_t dst, unsigned int type, unsigned int portal,
465 		 struct lnet_msg *msg)
466 {
467 	struct lnet_fault_attr *attr = &rule->dl_attr;
468 	bool delay;
469 
470 	if (!lnet_fault_attr_match(attr, src, dst, type, portal))
471 		return false;
472 
473 	/* match this rule, check delay rate now */
474 	spin_lock(&rule->dl_lock);
475 	if (rule->dl_delay_time) { /* time based delay */
476 		unsigned long now = cfs_time_current();
477 
478 		rule->dl_stat.fs_count++;
479 		delay = cfs_time_aftereq(now, rule->dl_delay_time);
480 		if (delay) {
481 			if (cfs_time_after(now, rule->dl_time_base))
482 				rule->dl_time_base = now;
483 
484 			rule->dl_delay_time = rule->dl_time_base +
485 					     cfs_time_seconds(cfs_rand() %
486 						attr->u.delay.la_interval);
487 			rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
488 
489 			CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
490 			       libcfs_nid2str(attr->fa_src),
491 			       libcfs_nid2str(attr->fa_dst),
492 			       rule->dl_delay_time);
493 		}
494 
495 	} else { /* rate based delay */
496 		delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
497 		/* generate the next random rate sequence */
498 		if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
499 			rule->dl_delay_at = rule->dl_stat.fs_count +
500 					    cfs_rand() % attr->u.delay.la_rate;
501 			CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502 			       libcfs_nid2str(attr->fa_src),
503 			       libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
504 		}
505 	}
506 
507 	if (!delay) {
508 		spin_unlock(&rule->dl_lock);
509 		return false;
510 	}
511 
512 	/* delay this message, update counters */
513 	lnet_fault_stat_inc(&rule->dl_stat, type);
514 	rule->dl_stat.u.delay.ls_delayed++;
515 
516 	list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517 	msg->msg_delay_send = round_timeout(
518 			cfs_time_shift(attr->u.delay.la_latency));
519 	if (rule->dl_msg_send == -1) {
520 		rule->dl_msg_send = msg->msg_delay_send;
521 		mod_timer(&rule->dl_timer, rule->dl_msg_send);
522 	}
523 
524 	spin_unlock(&rule->dl_lock);
525 	return true;
526 }
527 
528 /**
529  * check if \a msg can match any Delay Rule, receiving of this message
530  * will be delayed if there is a match.
531  */
532 bool
lnet_delay_rule_match_locked(lnet_hdr_t * hdr,struct lnet_msg * msg)533 lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
534 {
535 	struct lnet_delay_rule *rule;
536 	lnet_nid_t src = le64_to_cpu(hdr->src_nid);
537 	lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
538 	unsigned int typ = le32_to_cpu(hdr->type);
539 	unsigned int ptl = -1;
540 
541 	/* NB: called with hold of lnet_net_lock */
542 
543 	/**
544 	 * NB: if Portal is specified, then only PUT and GET will be
545 	 * filtered by delay rule
546 	 */
547 	if (typ == LNET_MSG_PUT)
548 		ptl = le32_to_cpu(hdr->msg.put.ptl_index);
549 	else if (typ == LNET_MSG_GET)
550 		ptl = le32_to_cpu(hdr->msg.get.ptl_index);
551 
552 	list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
553 		if (delay_rule_match(rule, src, dst, typ, ptl, msg))
554 			return true;
555 	}
556 
557 	return false;
558 }
559 
560 /** check out delayed messages for send */
561 static void
delayed_msg_check(struct lnet_delay_rule * rule,bool all,struct list_head * msg_list)562 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
563 		  struct list_head *msg_list)
564 {
565 	struct lnet_msg *msg;
566 	struct lnet_msg *tmp;
567 	unsigned long now = cfs_time_current();
568 
569 	if (!all && rule->dl_msg_send > now)
570 		return;
571 
572 	spin_lock(&rule->dl_lock);
573 	list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
574 		if (!all && msg->msg_delay_send > now)
575 			break;
576 
577 		msg->msg_delay_send = 0;
578 		list_move_tail(&msg->msg_list, msg_list);
579 	}
580 
581 	if (list_empty(&rule->dl_msg_list)) {
582 		del_timer(&rule->dl_timer);
583 		rule->dl_msg_send = -1;
584 
585 	} else if (!list_empty(msg_list)) {
586 		/*
587 		 * dequeued some timedout messages, update timer for the
588 		 * next delayed message on rule
589 		 */
590 		msg = list_entry(rule->dl_msg_list.next,
591 				 struct lnet_msg, msg_list);
592 		rule->dl_msg_send = msg->msg_delay_send;
593 		mod_timer(&rule->dl_timer, rule->dl_msg_send);
594 	}
595 	spin_unlock(&rule->dl_lock);
596 }
597 
598 static void
delayed_msg_process(struct list_head * msg_list,bool drop)599 delayed_msg_process(struct list_head *msg_list, bool drop)
600 {
601 	struct lnet_msg	*msg;
602 
603 	while (!list_empty(msg_list)) {
604 		struct lnet_ni *ni;
605 		int cpt;
606 		int rc;
607 
608 		msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
609 		LASSERT(msg->msg_rxpeer);
610 
611 		ni = msg->msg_rxpeer->lp_ni;
612 		cpt = msg->msg_rx_cpt;
613 
614 		list_del_init(&msg->msg_list);
615 		if (drop) {
616 			rc = -ECANCELED;
617 
618 		} else if (!msg->msg_routing) {
619 			rc = lnet_parse_local(ni, msg);
620 			if (!rc)
621 				continue;
622 
623 		} else {
624 			lnet_net_lock(cpt);
625 			rc = lnet_parse_forward_locked(ni, msg);
626 			lnet_net_unlock(cpt);
627 
628 			switch (rc) {
629 			case LNET_CREDIT_OK:
630 				lnet_ni_recv(ni, msg->msg_private, msg, 0,
631 					     0, msg->msg_len, msg->msg_len);
632 			case LNET_CREDIT_WAIT:
633 				continue;
634 			default: /* failures */
635 				break;
636 			}
637 		}
638 
639 		lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
640 		lnet_finalize(ni, msg, rc);
641 	}
642 }
643 
644 /**
645  * Process delayed messages for scheduled rules
646  * This function can either be called by delay_rule_daemon, or by lnet_finalise
647  */
648 void
lnet_delay_rule_check(void)649 lnet_delay_rule_check(void)
650 {
651 	struct lnet_delay_rule *rule;
652 	struct list_head msgs;
653 
654 	INIT_LIST_HEAD(&msgs);
655 	while (1) {
656 		if (list_empty(&delay_dd.dd_sched_rules))
657 			break;
658 
659 		spin_lock_bh(&delay_dd.dd_lock);
660 		if (list_empty(&delay_dd.dd_sched_rules)) {
661 			spin_unlock_bh(&delay_dd.dd_lock);
662 			break;
663 		}
664 
665 		rule = list_entry(delay_dd.dd_sched_rules.next,
666 				  struct lnet_delay_rule, dl_sched_link);
667 		list_del_init(&rule->dl_sched_link);
668 		spin_unlock_bh(&delay_dd.dd_lock);
669 
670 		delayed_msg_check(rule, false, &msgs);
671 		delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
672 	}
673 
674 	if (!list_empty(&msgs))
675 		delayed_msg_process(&msgs, false);
676 }
677 
678 /** daemon thread to handle delayed messages */
679 static int
lnet_delay_rule_daemon(void * arg)680 lnet_delay_rule_daemon(void *arg)
681 {
682 	delay_dd.dd_running = 1;
683 	wake_up(&delay_dd.dd_ctl_waitq);
684 
685 	while (delay_dd.dd_running) {
686 		wait_event_interruptible(delay_dd.dd_waitq,
687 					 !delay_dd.dd_running ||
688 					 !list_empty(&delay_dd.dd_sched_rules));
689 		lnet_delay_rule_check();
690 	}
691 
692 	/* in case more rules have been enqueued after my last check */
693 	lnet_delay_rule_check();
694 	delay_dd.dd_stopped = 1;
695 	wake_up(&delay_dd.dd_ctl_waitq);
696 
697 	return 0;
698 }
699 
700 static void
delay_timer_cb(unsigned long arg)701 delay_timer_cb(unsigned long arg)
702 {
703 	struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
704 
705 	spin_lock_bh(&delay_dd.dd_lock);
706 	if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
707 		atomic_inc(&rule->dl_refcount);
708 		list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
709 		wake_up(&delay_dd.dd_waitq);
710 	}
711 	spin_unlock_bh(&delay_dd.dd_lock);
712 }
713 
714 /**
715  * Add a new delay rule to LNet
716  * There is no check for duplicated delay rule, all rules will be checked for
717  * incoming message.
718  */
719 int
lnet_delay_rule_add(struct lnet_fault_attr * attr)720 lnet_delay_rule_add(struct lnet_fault_attr *attr)
721 {
722 	struct lnet_delay_rule *rule;
723 	int rc = 0;
724 
725 	if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
726 		CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
727 		       attr->u.delay.la_rate, attr->u.delay.la_interval);
728 		return -EINVAL;
729 	}
730 
731 	if (!attr->u.delay.la_latency) {
732 		CDEBUG(D_NET, "delay latency cannot be zero\n");
733 		return -EINVAL;
734 	}
735 
736 	if (lnet_fault_attr_validate(attr))
737 		return -EINVAL;
738 
739 	CFS_ALLOC_PTR(rule);
740 	if (!rule)
741 		return -ENOMEM;
742 
743 	mutex_lock(&delay_dd.dd_mutex);
744 	if (!delay_dd.dd_running) {
745 		struct task_struct *task;
746 
747 		/**
748 		 *  NB: although LND threads will process delayed message
749 		 * in lnet_finalize, but there is no guarantee that LND
750 		 * threads will be waken up if no other message needs to
751 		 * be handled.
752 		 * Only one daemon thread, performance is not the concern
753 		 * of this simualation module.
754 		 */
755 		task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
756 		if (IS_ERR(task)) {
757 			rc = PTR_ERR(task);
758 			goto failed;
759 		}
760 		wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
761 	}
762 
763 	setup_timer(&rule->dl_timer, delay_timer_cb, (unsigned long)rule);
764 
765 	spin_lock_init(&rule->dl_lock);
766 	INIT_LIST_HEAD(&rule->dl_msg_list);
767 	INIT_LIST_HEAD(&rule->dl_sched_link);
768 
769 	rule->dl_attr = *attr;
770 	if (attr->u.delay.la_interval) {
771 		rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
772 		rule->dl_delay_time = cfs_time_shift(cfs_rand() %
773 						     attr->u.delay.la_interval);
774 	} else {
775 		rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
776 	}
777 
778 	rule->dl_msg_send = -1;
779 
780 	lnet_net_lock(LNET_LOCK_EX);
781 	atomic_set(&rule->dl_refcount, 1);
782 	list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
783 	lnet_net_unlock(LNET_LOCK_EX);
784 
785 	CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
786 	       libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
787 	       attr->u.delay.la_rate);
788 
789 	mutex_unlock(&delay_dd.dd_mutex);
790 	return 0;
791 failed:
792 	mutex_unlock(&delay_dd.dd_mutex);
793 	CFS_FREE_PTR(rule);
794 	return rc;
795 }
796 
797 /**
798  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
799  * and \a dst are zero, all rules will be removed, otherwise only matched rules
800  * will be removed.
801  * If \a src is zero, then all rules have \a dst as destination will be remove
802  * If \a dst is zero, then all rules have \a src as source will be removed
803  *
804  * When a delay rule is removed, all delayed messages of this rule will be
805  * processed immediately.
806  */
807 int
lnet_delay_rule_del(lnet_nid_t src,lnet_nid_t dst,bool shutdown)808 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
809 {
810 	struct lnet_delay_rule *rule;
811 	struct lnet_delay_rule *tmp;
812 	struct list_head rule_list;
813 	struct list_head msg_list;
814 	int n = 0;
815 	bool cleanup;
816 
817 	INIT_LIST_HEAD(&rule_list);
818 	INIT_LIST_HEAD(&msg_list);
819 
820 	if (shutdown) {
821 		src = 0;
822 		dst = 0;
823 	}
824 
825 	mutex_lock(&delay_dd.dd_mutex);
826 	lnet_net_lock(LNET_LOCK_EX);
827 
828 	list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
829 		if (rule->dl_attr.fa_src != src && src)
830 			continue;
831 
832 		if (rule->dl_attr.fa_dst != dst && dst)
833 			continue;
834 
835 		CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
836 		       libcfs_nid2str(rule->dl_attr.fa_src),
837 		       libcfs_nid2str(rule->dl_attr.fa_dst),
838 		       rule->dl_attr.u.delay.la_rate,
839 		       rule->dl_attr.u.delay.la_interval);
840 		/* refcount is taken over by rule_list */
841 		list_move(&rule->dl_link, &rule_list);
842 	}
843 
844 	/* check if we need to shutdown delay_daemon */
845 	cleanup = list_empty(&the_lnet.ln_delay_rules) &&
846 		  !list_empty(&rule_list);
847 	lnet_net_unlock(LNET_LOCK_EX);
848 
849 	list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
850 		list_del_init(&rule->dl_link);
851 
852 		del_timer_sync(&rule->dl_timer);
853 		delayed_msg_check(rule, true, &msg_list);
854 		delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
855 		n++;
856 	}
857 
858 	if (cleanup) { /* no more delay rule, shutdown delay_daemon */
859 		LASSERT(delay_dd.dd_running);
860 		delay_dd.dd_running = 0;
861 		wake_up(&delay_dd.dd_waitq);
862 
863 		while (!delay_dd.dd_stopped)
864 			wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
865 	}
866 	mutex_unlock(&delay_dd.dd_mutex);
867 
868 	if (!list_empty(&msg_list))
869 		delayed_msg_process(&msg_list, shutdown);
870 
871 	return n;
872 }
873 
874 /**
875  * List Delay Rule at position of \a pos
876  */
877 int
lnet_delay_rule_list(int pos,struct lnet_fault_attr * attr,struct lnet_fault_stat * stat)878 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
879 		     struct lnet_fault_stat *stat)
880 {
881 	struct lnet_delay_rule *rule;
882 	int cpt;
883 	int i = 0;
884 	int rc = -ENOENT;
885 
886 	cpt = lnet_net_lock_current();
887 	list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
888 		if (i++ < pos)
889 			continue;
890 
891 		spin_lock(&rule->dl_lock);
892 		*attr = rule->dl_attr;
893 		*stat = rule->dl_stat;
894 		spin_unlock(&rule->dl_lock);
895 		rc = 0;
896 		break;
897 	}
898 
899 	lnet_net_unlock(cpt);
900 	return rc;
901 }
902 
903 /**
904  * reset counters for all Delay Rules
905  */
906 void
lnet_delay_rule_reset(void)907 lnet_delay_rule_reset(void)
908 {
909 	struct lnet_delay_rule *rule;
910 	int cpt;
911 
912 	cpt = lnet_net_lock_current();
913 
914 	list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
915 		struct lnet_fault_attr *attr = &rule->dl_attr;
916 
917 		spin_lock(&rule->dl_lock);
918 
919 		memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
920 		if (attr->u.delay.la_rate) {
921 			rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
922 		} else {
923 			rule->dl_delay_time = cfs_time_shift(cfs_rand() %
924 						attr->u.delay.la_interval);
925 			rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
926 		}
927 		spin_unlock(&rule->dl_lock);
928 	}
929 
930 	lnet_net_unlock(cpt);
931 }
932 
933 int
lnet_fault_ctl(int opc,struct libcfs_ioctl_data * data)934 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
935 {
936 	struct lnet_fault_attr *attr;
937 	struct lnet_fault_stat *stat;
938 
939 	attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
940 
941 	switch (opc) {
942 	default:
943 		return -EINVAL;
944 
945 	case LNET_CTL_DROP_ADD:
946 		if (!attr)
947 			return -EINVAL;
948 
949 		return lnet_drop_rule_add(attr);
950 
951 	case LNET_CTL_DROP_DEL:
952 		if (!attr)
953 			return -EINVAL;
954 
955 		data->ioc_count = lnet_drop_rule_del(attr->fa_src,
956 						     attr->fa_dst);
957 		return 0;
958 
959 	case LNET_CTL_DROP_RESET:
960 		lnet_drop_rule_reset();
961 		return 0;
962 
963 	case LNET_CTL_DROP_LIST:
964 		stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
965 		if (!attr || !stat)
966 			return -EINVAL;
967 
968 		return lnet_drop_rule_list(data->ioc_count, attr, stat);
969 
970 	case LNET_CTL_DELAY_ADD:
971 		if (!attr)
972 			return -EINVAL;
973 
974 		return lnet_delay_rule_add(attr);
975 
976 	case LNET_CTL_DELAY_DEL:
977 		if (!attr)
978 			return -EINVAL;
979 
980 		data->ioc_count = lnet_delay_rule_del(attr->fa_src,
981 						      attr->fa_dst, false);
982 		return 0;
983 
984 	case LNET_CTL_DELAY_RESET:
985 		lnet_delay_rule_reset();
986 		return 0;
987 
988 	case LNET_CTL_DELAY_LIST:
989 		stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
990 		if (!attr || !stat)
991 			return -EINVAL;
992 
993 		return lnet_delay_rule_list(data->ioc_count, attr, stat);
994 	}
995 }
996 
997 int
lnet_fault_init(void)998 lnet_fault_init(void)
999 {
1000 	CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1001 	CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1002 	CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1003 	CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1004 
1005 	mutex_init(&delay_dd.dd_mutex);
1006 	spin_lock_init(&delay_dd.dd_lock);
1007 	init_waitqueue_head(&delay_dd.dd_waitq);
1008 	init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1009 	INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1010 
1011 	return 0;
1012 }
1013 
1014 void
lnet_fault_fini(void)1015 lnet_fault_fini(void)
1016 {
1017 	lnet_drop_rule_del(0, 0);
1018 	lnet_delay_rule_del(0, 0, true);
1019 
1020 	LASSERT(list_empty(&the_lnet.ln_drop_rules));
1021 	LASSERT(list_empty(&the_lnet.ln_delay_rules));
1022 	LASSERT(list_empty(&delay_dd.dd_sched_rules));
1023 }
1024