1 /*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
19 *
20 * GPL HEADER END
21 */
22 /*
23 * Copyright (c) 2014, Intel Corporation.
24 */
25 /*
26 * This file is part of Lustre, http://www.lustre.org/
27 * Lustre is a trademark of Seagate, Inc.
28 *
29 * lnet/lnet/net_fault.c
30 *
31 * Lustre network fault simulation
32 *
33 * Author: liang.zhen@intel.com
34 */
35
36 #define DEBUG_SUBSYSTEM S_LNET
37
38 #include "../../include/linux/lnet/lib-lnet.h"
39 #include "../../include/linux/lnet/lnetctl.h"
40
41 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
42 LNET_GET_BIT | LNET_REPLY_BIT)
43
44 struct lnet_drop_rule {
45 /** link chain on the_lnet.ln_drop_rules */
46 struct list_head dr_link;
47 /** attributes of this rule */
48 struct lnet_fault_attr dr_attr;
49 /** lock to protect \a dr_drop_at and \a dr_stat */
50 spinlock_t dr_lock;
51 /**
52 * the message sequence to drop, which means message is dropped when
53 * dr_stat.drs_count == dr_drop_at
54 */
55 unsigned long dr_drop_at;
56 /**
57 * seconds to drop the next message, it's exclusive with dr_drop_at
58 */
59 unsigned long dr_drop_time;
60 /** baseline to caculate dr_drop_time */
61 unsigned long dr_time_base;
62 /** statistic of dropped messages */
63 struct lnet_fault_stat dr_stat;
64 };
65
66 static bool
lnet_fault_nid_match(lnet_nid_t nid,lnet_nid_t msg_nid)67 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
68 {
69 if (nid == msg_nid || nid == LNET_NID_ANY)
70 return true;
71
72 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
73 return false;
74
75 /* 255.255.255.255@net is wildcard for all addresses in a network */
76 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
77 }
78
79 static bool
lnet_fault_attr_match(struct lnet_fault_attr * attr,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal)80 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
81 lnet_nid_t dst, unsigned int type, unsigned int portal)
82 {
83 if (!lnet_fault_nid_match(attr->fa_src, src) ||
84 !lnet_fault_nid_match(attr->fa_dst, dst))
85 return false;
86
87 if (!(attr->fa_msg_mask & (1 << type)))
88 return false;
89
90 /**
91 * NB: ACK and REPLY have no portal, but they should have been
92 * rejected by message mask
93 */
94 if (attr->fa_ptl_mask && /* has portal filter */
95 !(attr->fa_ptl_mask & (1ULL << portal)))
96 return false;
97
98 return true;
99 }
100
101 static int
lnet_fault_attr_validate(struct lnet_fault_attr * attr)102 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 {
104 if (!attr->fa_msg_mask)
105 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106
107 if (!attr->fa_ptl_mask) /* no portal filter */
108 return 0;
109
110 /* NB: only PUT and GET can be filtered if portal filter has been set */
111 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
112 if (!attr->fa_msg_mask) {
113 CDEBUG(D_NET, "can't find valid message type bits %x\n",
114 attr->fa_msg_mask);
115 return -EINVAL;
116 }
117 return 0;
118 }
119
120 static void
lnet_fault_stat_inc(struct lnet_fault_stat * stat,unsigned int type)121 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 {
123 /* NB: fs_counter is NOT updated by this function */
124 switch (type) {
125 case LNET_MSG_PUT:
126 stat->fs_put++;
127 return;
128 case LNET_MSG_ACK:
129 stat->fs_ack++;
130 return;
131 case LNET_MSG_GET:
132 stat->fs_get++;
133 return;
134 case LNET_MSG_REPLY:
135 stat->fs_reply++;
136 return;
137 }
138 }
139
140 /**
141 * LNet message drop simulation
142 */
143
144 /**
145 * Add a new drop rule to LNet
146 * There is no check for duplicated drop rule, all rules will be checked for
147 * incoming message.
148 */
149 static int
lnet_drop_rule_add(struct lnet_fault_attr * attr)150 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 {
152 struct lnet_drop_rule *rule;
153
154 if (attr->u.drop.da_rate & attr->u.drop.da_interval) {
155 CDEBUG(D_NET, "please provide either drop rate or drop interval, but not both at the same time %d/%d\n",
156 attr->u.drop.da_rate, attr->u.drop.da_interval);
157 return -EINVAL;
158 }
159
160 if (lnet_fault_attr_validate(attr))
161 return -EINVAL;
162
163 CFS_ALLOC_PTR(rule);
164 if (!rule)
165 return -ENOMEM;
166
167 spin_lock_init(&rule->dr_lock);
168
169 rule->dr_attr = *attr;
170 if (attr->u.drop.da_interval) {
171 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
172 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
173 attr->u.drop.da_interval);
174 } else {
175 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
176 }
177
178 lnet_net_lock(LNET_LOCK_EX);
179 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
180 lnet_net_unlock(LNET_LOCK_EX);
181
182 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
183 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
184 attr->u.drop.da_rate, attr->u.drop.da_interval);
185 return 0;
186 }
187
188 /**
189 * Remove matched drop rules from lnet, all rules that can match \a src and
190 * \a dst will be removed.
191 * If \a src is zero, then all rules have \a dst as destination will be remove
192 * If \a dst is zero, then all rules have \a src as source will be removed
193 * If both of them are zero, all rules will be removed
194 */
195 static int
lnet_drop_rule_del(lnet_nid_t src,lnet_nid_t dst)196 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
197 {
198 struct lnet_drop_rule *rule;
199 struct lnet_drop_rule *tmp;
200 struct list_head zombies;
201 int n = 0;
202
203 INIT_LIST_HEAD(&zombies);
204
205 lnet_net_lock(LNET_LOCK_EX);
206 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
207 if (rule->dr_attr.fa_src != src && src)
208 continue;
209
210 if (rule->dr_attr.fa_dst != dst && dst)
211 continue;
212
213 list_move(&rule->dr_link, &zombies);
214 }
215 lnet_net_unlock(LNET_LOCK_EX);
216
217 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
218 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
219 libcfs_nid2str(rule->dr_attr.fa_src),
220 libcfs_nid2str(rule->dr_attr.fa_dst),
221 rule->dr_attr.u.drop.da_rate,
222 rule->dr_attr.u.drop.da_interval);
223
224 list_del(&rule->dr_link);
225 CFS_FREE_PTR(rule);
226 n++;
227 }
228
229 return n;
230 }
231
232 /**
233 * List drop rule at position of \a pos
234 */
235 static int
lnet_drop_rule_list(int pos,struct lnet_fault_attr * attr,struct lnet_fault_stat * stat)236 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
237 struct lnet_fault_stat *stat)
238 {
239 struct lnet_drop_rule *rule;
240 int cpt;
241 int i = 0;
242 int rc = -ENOENT;
243
244 cpt = lnet_net_lock_current();
245 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
246 if (i++ < pos)
247 continue;
248
249 spin_lock(&rule->dr_lock);
250 *attr = rule->dr_attr;
251 *stat = rule->dr_stat;
252 spin_unlock(&rule->dr_lock);
253 rc = 0;
254 break;
255 }
256
257 lnet_net_unlock(cpt);
258 return rc;
259 }
260
261 /**
262 * reset counters for all drop rules
263 */
264 static void
lnet_drop_rule_reset(void)265 lnet_drop_rule_reset(void)
266 {
267 struct lnet_drop_rule *rule;
268 int cpt;
269
270 cpt = lnet_net_lock_current();
271
272 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
273 struct lnet_fault_attr *attr = &rule->dr_attr;
274
275 spin_lock(&rule->dr_lock);
276
277 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
278 if (attr->u.drop.da_rate) {
279 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
280 } else {
281 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
282 attr->u.drop.da_interval);
283 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
284 }
285 spin_unlock(&rule->dr_lock);
286 }
287
288 lnet_net_unlock(cpt);
289 }
290
291 /**
292 * check source/destination NID, portal, message type and drop rate,
293 * decide whether should drop this message or not
294 */
295 static bool
drop_rule_match(struct lnet_drop_rule * rule,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal)296 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
297 lnet_nid_t dst, unsigned int type, unsigned int portal)
298 {
299 struct lnet_fault_attr *attr = &rule->dr_attr;
300 bool drop;
301
302 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
303 return false;
304
305 /* match this rule, check drop rate now */
306 spin_lock(&rule->dr_lock);
307 if (rule->dr_drop_time) { /* time based drop */
308 unsigned long now = cfs_time_current();
309
310 rule->dr_stat.fs_count++;
311 drop = cfs_time_aftereq(now, rule->dr_drop_time);
312 if (drop) {
313 if (cfs_time_after(now, rule->dr_time_base))
314 rule->dr_time_base = now;
315
316 rule->dr_drop_time = rule->dr_time_base +
317 cfs_time_seconds(cfs_rand() %
318 attr->u.drop.da_interval);
319 rule->dr_time_base += cfs_time_seconds(attr->u.drop.da_interval);
320
321 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lu\n",
322 libcfs_nid2str(attr->fa_src),
323 libcfs_nid2str(attr->fa_dst),
324 rule->dr_drop_time);
325 }
326
327 } else { /* rate based drop */
328 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
329
330 if (!do_div(rule->dr_stat.fs_count, attr->u.drop.da_rate)) {
331 rule->dr_drop_at = rule->dr_stat.fs_count +
332 cfs_rand() % attr->u.drop.da_rate;
333 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
334 libcfs_nid2str(attr->fa_src),
335 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
336 }
337 }
338
339 if (drop) { /* drop this message, update counters */
340 lnet_fault_stat_inc(&rule->dr_stat, type);
341 rule->dr_stat.u.drop.ds_dropped++;
342 }
343
344 spin_unlock(&rule->dr_lock);
345 return drop;
346 }
347
348 /**
349 * Check if message from \a src to \a dst can match any existed drop rule
350 */
351 bool
lnet_drop_rule_match(lnet_hdr_t * hdr)352 lnet_drop_rule_match(lnet_hdr_t *hdr)
353 {
354 struct lnet_drop_rule *rule;
355 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
356 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
357 unsigned int typ = le32_to_cpu(hdr->type);
358 unsigned int ptl = -1;
359 bool drop = false;
360 int cpt;
361
362 /**
363 * NB: if Portal is specified, then only PUT and GET will be
364 * filtered by drop rule
365 */
366 if (typ == LNET_MSG_PUT)
367 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
368 else if (typ == LNET_MSG_GET)
369 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
370
371 cpt = lnet_net_lock_current();
372 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
373 drop = drop_rule_match(rule, src, dst, typ, ptl);
374 if (drop)
375 break;
376 }
377
378 lnet_net_unlock(cpt);
379 return drop;
380 }
381
382 /**
383 * LNet Delay Simulation
384 */
385 /** timestamp (second) to send delayed message */
386 #define msg_delay_send msg_ev.hdr_data
387
388 struct lnet_delay_rule {
389 /** link chain on the_lnet.ln_delay_rules */
390 struct list_head dl_link;
391 /** link chain on delay_dd.dd_sched_rules */
392 struct list_head dl_sched_link;
393 /** attributes of this rule */
394 struct lnet_fault_attr dl_attr;
395 /** lock to protect \a below members */
396 spinlock_t dl_lock;
397 /** refcount of delay rule */
398 atomic_t dl_refcount;
399 /**
400 * the message sequence to delay, which means message is delayed when
401 * dl_stat.fs_count == dl_delay_at
402 */
403 unsigned long dl_delay_at;
404 /**
405 * seconds to delay the next message, it's exclusive with dl_delay_at
406 */
407 unsigned long dl_delay_time;
408 /** baseline to caculate dl_delay_time */
409 unsigned long dl_time_base;
410 /** jiffies to send the next delayed message */
411 unsigned long dl_msg_send;
412 /** delayed message list */
413 struct list_head dl_msg_list;
414 /** statistic of delayed messages */
415 struct lnet_fault_stat dl_stat;
416 /** timer to wakeup delay_daemon */
417 struct timer_list dl_timer;
418 };
419
420 struct delay_daemon_data {
421 /** serialise rule add/remove */
422 struct mutex dd_mutex;
423 /** protect rules on \a dd_sched_rules */
424 spinlock_t dd_lock;
425 /** scheduled delay rules (by timer) */
426 struct list_head dd_sched_rules;
427 /** daemon thread sleeps at here */
428 wait_queue_head_t dd_waitq;
429 /** controller (lctl command) wait at here */
430 wait_queue_head_t dd_ctl_waitq;
431 /** daemon is running */
432 unsigned int dd_running;
433 /** daemon stopped */
434 unsigned int dd_stopped;
435 };
436
437 static struct delay_daemon_data delay_dd;
438
439 static unsigned long
round_timeout(unsigned long timeout)440 round_timeout(unsigned long timeout)
441 {
442 return cfs_time_seconds((unsigned int)
443 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
444 }
445
446 static void
delay_rule_decref(struct lnet_delay_rule * rule)447 delay_rule_decref(struct lnet_delay_rule *rule)
448 {
449 if (atomic_dec_and_test(&rule->dl_refcount)) {
450 LASSERT(list_empty(&rule->dl_sched_link));
451 LASSERT(list_empty(&rule->dl_msg_list));
452 LASSERT(list_empty(&rule->dl_link));
453
454 CFS_FREE_PTR(rule);
455 }
456 }
457
458 /**
459 * check source/destination NID, portal, message type and delay rate,
460 * decide whether should delay this message or not
461 */
462 static bool
delay_rule_match(struct lnet_delay_rule * rule,lnet_nid_t src,lnet_nid_t dst,unsigned int type,unsigned int portal,struct lnet_msg * msg)463 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
464 lnet_nid_t dst, unsigned int type, unsigned int portal,
465 struct lnet_msg *msg)
466 {
467 struct lnet_fault_attr *attr = &rule->dl_attr;
468 bool delay;
469
470 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
471 return false;
472
473 /* match this rule, check delay rate now */
474 spin_lock(&rule->dl_lock);
475 if (rule->dl_delay_time) { /* time based delay */
476 unsigned long now = cfs_time_current();
477
478 rule->dl_stat.fs_count++;
479 delay = cfs_time_aftereq(now, rule->dl_delay_time);
480 if (delay) {
481 if (cfs_time_after(now, rule->dl_time_base))
482 rule->dl_time_base = now;
483
484 rule->dl_delay_time = rule->dl_time_base +
485 cfs_time_seconds(cfs_rand() %
486 attr->u.delay.la_interval);
487 rule->dl_time_base += cfs_time_seconds(attr->u.delay.la_interval);
488
489 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lu\n",
490 libcfs_nid2str(attr->fa_src),
491 libcfs_nid2str(attr->fa_dst),
492 rule->dl_delay_time);
493 }
494
495 } else { /* rate based delay */
496 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
497 /* generate the next random rate sequence */
498 if (!do_div(rule->dl_stat.fs_count, attr->u.delay.la_rate)) {
499 rule->dl_delay_at = rule->dl_stat.fs_count +
500 cfs_rand() % attr->u.delay.la_rate;
501 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502 libcfs_nid2str(attr->fa_src),
503 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
504 }
505 }
506
507 if (!delay) {
508 spin_unlock(&rule->dl_lock);
509 return false;
510 }
511
512 /* delay this message, update counters */
513 lnet_fault_stat_inc(&rule->dl_stat, type);
514 rule->dl_stat.u.delay.ls_delayed++;
515
516 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517 msg->msg_delay_send = round_timeout(
518 cfs_time_shift(attr->u.delay.la_latency));
519 if (rule->dl_msg_send == -1) {
520 rule->dl_msg_send = msg->msg_delay_send;
521 mod_timer(&rule->dl_timer, rule->dl_msg_send);
522 }
523
524 spin_unlock(&rule->dl_lock);
525 return true;
526 }
527
528 /**
529 * check if \a msg can match any Delay Rule, receiving of this message
530 * will be delayed if there is a match.
531 */
532 bool
lnet_delay_rule_match_locked(lnet_hdr_t * hdr,struct lnet_msg * msg)533 lnet_delay_rule_match_locked(lnet_hdr_t *hdr, struct lnet_msg *msg)
534 {
535 struct lnet_delay_rule *rule;
536 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
537 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
538 unsigned int typ = le32_to_cpu(hdr->type);
539 unsigned int ptl = -1;
540
541 /* NB: called with hold of lnet_net_lock */
542
543 /**
544 * NB: if Portal is specified, then only PUT and GET will be
545 * filtered by delay rule
546 */
547 if (typ == LNET_MSG_PUT)
548 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
549 else if (typ == LNET_MSG_GET)
550 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
551
552 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
553 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
554 return true;
555 }
556
557 return false;
558 }
559
560 /** check out delayed messages for send */
561 static void
delayed_msg_check(struct lnet_delay_rule * rule,bool all,struct list_head * msg_list)562 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
563 struct list_head *msg_list)
564 {
565 struct lnet_msg *msg;
566 struct lnet_msg *tmp;
567 unsigned long now = cfs_time_current();
568
569 if (!all && rule->dl_msg_send > now)
570 return;
571
572 spin_lock(&rule->dl_lock);
573 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
574 if (!all && msg->msg_delay_send > now)
575 break;
576
577 msg->msg_delay_send = 0;
578 list_move_tail(&msg->msg_list, msg_list);
579 }
580
581 if (list_empty(&rule->dl_msg_list)) {
582 del_timer(&rule->dl_timer);
583 rule->dl_msg_send = -1;
584
585 } else if (!list_empty(msg_list)) {
586 /*
587 * dequeued some timedout messages, update timer for the
588 * next delayed message on rule
589 */
590 msg = list_entry(rule->dl_msg_list.next,
591 struct lnet_msg, msg_list);
592 rule->dl_msg_send = msg->msg_delay_send;
593 mod_timer(&rule->dl_timer, rule->dl_msg_send);
594 }
595 spin_unlock(&rule->dl_lock);
596 }
597
598 static void
delayed_msg_process(struct list_head * msg_list,bool drop)599 delayed_msg_process(struct list_head *msg_list, bool drop)
600 {
601 struct lnet_msg *msg;
602
603 while (!list_empty(msg_list)) {
604 struct lnet_ni *ni;
605 int cpt;
606 int rc;
607
608 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
609 LASSERT(msg->msg_rxpeer);
610
611 ni = msg->msg_rxpeer->lp_ni;
612 cpt = msg->msg_rx_cpt;
613
614 list_del_init(&msg->msg_list);
615 if (drop) {
616 rc = -ECANCELED;
617
618 } else if (!msg->msg_routing) {
619 rc = lnet_parse_local(ni, msg);
620 if (!rc)
621 continue;
622
623 } else {
624 lnet_net_lock(cpt);
625 rc = lnet_parse_forward_locked(ni, msg);
626 lnet_net_unlock(cpt);
627
628 switch (rc) {
629 case LNET_CREDIT_OK:
630 lnet_ni_recv(ni, msg->msg_private, msg, 0,
631 0, msg->msg_len, msg->msg_len);
632 case LNET_CREDIT_WAIT:
633 continue;
634 default: /* failures */
635 break;
636 }
637 }
638
639 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
640 lnet_finalize(ni, msg, rc);
641 }
642 }
643
644 /**
645 * Process delayed messages for scheduled rules
646 * This function can either be called by delay_rule_daemon, or by lnet_finalise
647 */
648 void
lnet_delay_rule_check(void)649 lnet_delay_rule_check(void)
650 {
651 struct lnet_delay_rule *rule;
652 struct list_head msgs;
653
654 INIT_LIST_HEAD(&msgs);
655 while (1) {
656 if (list_empty(&delay_dd.dd_sched_rules))
657 break;
658
659 spin_lock_bh(&delay_dd.dd_lock);
660 if (list_empty(&delay_dd.dd_sched_rules)) {
661 spin_unlock_bh(&delay_dd.dd_lock);
662 break;
663 }
664
665 rule = list_entry(delay_dd.dd_sched_rules.next,
666 struct lnet_delay_rule, dl_sched_link);
667 list_del_init(&rule->dl_sched_link);
668 spin_unlock_bh(&delay_dd.dd_lock);
669
670 delayed_msg_check(rule, false, &msgs);
671 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
672 }
673
674 if (!list_empty(&msgs))
675 delayed_msg_process(&msgs, false);
676 }
677
678 /** daemon thread to handle delayed messages */
679 static int
lnet_delay_rule_daemon(void * arg)680 lnet_delay_rule_daemon(void *arg)
681 {
682 delay_dd.dd_running = 1;
683 wake_up(&delay_dd.dd_ctl_waitq);
684
685 while (delay_dd.dd_running) {
686 wait_event_interruptible(delay_dd.dd_waitq,
687 !delay_dd.dd_running ||
688 !list_empty(&delay_dd.dd_sched_rules));
689 lnet_delay_rule_check();
690 }
691
692 /* in case more rules have been enqueued after my last check */
693 lnet_delay_rule_check();
694 delay_dd.dd_stopped = 1;
695 wake_up(&delay_dd.dd_ctl_waitq);
696
697 return 0;
698 }
699
700 static void
delay_timer_cb(unsigned long arg)701 delay_timer_cb(unsigned long arg)
702 {
703 struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
704
705 spin_lock_bh(&delay_dd.dd_lock);
706 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
707 atomic_inc(&rule->dl_refcount);
708 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
709 wake_up(&delay_dd.dd_waitq);
710 }
711 spin_unlock_bh(&delay_dd.dd_lock);
712 }
713
714 /**
715 * Add a new delay rule to LNet
716 * There is no check for duplicated delay rule, all rules will be checked for
717 * incoming message.
718 */
719 int
lnet_delay_rule_add(struct lnet_fault_attr * attr)720 lnet_delay_rule_add(struct lnet_fault_attr *attr)
721 {
722 struct lnet_delay_rule *rule;
723 int rc = 0;
724
725 if (attr->u.delay.la_rate & attr->u.delay.la_interval) {
726 CDEBUG(D_NET, "please provide either delay rate or delay interval, but not both at the same time %d/%d\n",
727 attr->u.delay.la_rate, attr->u.delay.la_interval);
728 return -EINVAL;
729 }
730
731 if (!attr->u.delay.la_latency) {
732 CDEBUG(D_NET, "delay latency cannot be zero\n");
733 return -EINVAL;
734 }
735
736 if (lnet_fault_attr_validate(attr))
737 return -EINVAL;
738
739 CFS_ALLOC_PTR(rule);
740 if (!rule)
741 return -ENOMEM;
742
743 mutex_lock(&delay_dd.dd_mutex);
744 if (!delay_dd.dd_running) {
745 struct task_struct *task;
746
747 /**
748 * NB: although LND threads will process delayed message
749 * in lnet_finalize, but there is no guarantee that LND
750 * threads will be waken up if no other message needs to
751 * be handled.
752 * Only one daemon thread, performance is not the concern
753 * of this simualation module.
754 */
755 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
756 if (IS_ERR(task)) {
757 rc = PTR_ERR(task);
758 goto failed;
759 }
760 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
761 }
762
763 setup_timer(&rule->dl_timer, delay_timer_cb, (unsigned long)rule);
764
765 spin_lock_init(&rule->dl_lock);
766 INIT_LIST_HEAD(&rule->dl_msg_list);
767 INIT_LIST_HEAD(&rule->dl_sched_link);
768
769 rule->dl_attr = *attr;
770 if (attr->u.delay.la_interval) {
771 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
772 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
773 attr->u.delay.la_interval);
774 } else {
775 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
776 }
777
778 rule->dl_msg_send = -1;
779
780 lnet_net_lock(LNET_LOCK_EX);
781 atomic_set(&rule->dl_refcount, 1);
782 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
783 lnet_net_unlock(LNET_LOCK_EX);
784
785 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
786 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
787 attr->u.delay.la_rate);
788
789 mutex_unlock(&delay_dd.dd_mutex);
790 return 0;
791 failed:
792 mutex_unlock(&delay_dd.dd_mutex);
793 CFS_FREE_PTR(rule);
794 return rc;
795 }
796
797 /**
798 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
799 * and \a dst are zero, all rules will be removed, otherwise only matched rules
800 * will be removed.
801 * If \a src is zero, then all rules have \a dst as destination will be remove
802 * If \a dst is zero, then all rules have \a src as source will be removed
803 *
804 * When a delay rule is removed, all delayed messages of this rule will be
805 * processed immediately.
806 */
807 int
lnet_delay_rule_del(lnet_nid_t src,lnet_nid_t dst,bool shutdown)808 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
809 {
810 struct lnet_delay_rule *rule;
811 struct lnet_delay_rule *tmp;
812 struct list_head rule_list;
813 struct list_head msg_list;
814 int n = 0;
815 bool cleanup;
816
817 INIT_LIST_HEAD(&rule_list);
818 INIT_LIST_HEAD(&msg_list);
819
820 if (shutdown) {
821 src = 0;
822 dst = 0;
823 }
824
825 mutex_lock(&delay_dd.dd_mutex);
826 lnet_net_lock(LNET_LOCK_EX);
827
828 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
829 if (rule->dl_attr.fa_src != src && src)
830 continue;
831
832 if (rule->dl_attr.fa_dst != dst && dst)
833 continue;
834
835 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
836 libcfs_nid2str(rule->dl_attr.fa_src),
837 libcfs_nid2str(rule->dl_attr.fa_dst),
838 rule->dl_attr.u.delay.la_rate,
839 rule->dl_attr.u.delay.la_interval);
840 /* refcount is taken over by rule_list */
841 list_move(&rule->dl_link, &rule_list);
842 }
843
844 /* check if we need to shutdown delay_daemon */
845 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
846 !list_empty(&rule_list);
847 lnet_net_unlock(LNET_LOCK_EX);
848
849 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
850 list_del_init(&rule->dl_link);
851
852 del_timer_sync(&rule->dl_timer);
853 delayed_msg_check(rule, true, &msg_list);
854 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
855 n++;
856 }
857
858 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
859 LASSERT(delay_dd.dd_running);
860 delay_dd.dd_running = 0;
861 wake_up(&delay_dd.dd_waitq);
862
863 while (!delay_dd.dd_stopped)
864 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
865 }
866 mutex_unlock(&delay_dd.dd_mutex);
867
868 if (!list_empty(&msg_list))
869 delayed_msg_process(&msg_list, shutdown);
870
871 return n;
872 }
873
874 /**
875 * List Delay Rule at position of \a pos
876 */
877 int
lnet_delay_rule_list(int pos,struct lnet_fault_attr * attr,struct lnet_fault_stat * stat)878 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
879 struct lnet_fault_stat *stat)
880 {
881 struct lnet_delay_rule *rule;
882 int cpt;
883 int i = 0;
884 int rc = -ENOENT;
885
886 cpt = lnet_net_lock_current();
887 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
888 if (i++ < pos)
889 continue;
890
891 spin_lock(&rule->dl_lock);
892 *attr = rule->dl_attr;
893 *stat = rule->dl_stat;
894 spin_unlock(&rule->dl_lock);
895 rc = 0;
896 break;
897 }
898
899 lnet_net_unlock(cpt);
900 return rc;
901 }
902
903 /**
904 * reset counters for all Delay Rules
905 */
906 void
lnet_delay_rule_reset(void)907 lnet_delay_rule_reset(void)
908 {
909 struct lnet_delay_rule *rule;
910 int cpt;
911
912 cpt = lnet_net_lock_current();
913
914 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
915 struct lnet_fault_attr *attr = &rule->dl_attr;
916
917 spin_lock(&rule->dl_lock);
918
919 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
920 if (attr->u.delay.la_rate) {
921 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
922 } else {
923 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
924 attr->u.delay.la_interval);
925 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
926 }
927 spin_unlock(&rule->dl_lock);
928 }
929
930 lnet_net_unlock(cpt);
931 }
932
933 int
lnet_fault_ctl(int opc,struct libcfs_ioctl_data * data)934 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
935 {
936 struct lnet_fault_attr *attr;
937 struct lnet_fault_stat *stat;
938
939 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
940
941 switch (opc) {
942 default:
943 return -EINVAL;
944
945 case LNET_CTL_DROP_ADD:
946 if (!attr)
947 return -EINVAL;
948
949 return lnet_drop_rule_add(attr);
950
951 case LNET_CTL_DROP_DEL:
952 if (!attr)
953 return -EINVAL;
954
955 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
956 attr->fa_dst);
957 return 0;
958
959 case LNET_CTL_DROP_RESET:
960 lnet_drop_rule_reset();
961 return 0;
962
963 case LNET_CTL_DROP_LIST:
964 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
965 if (!attr || !stat)
966 return -EINVAL;
967
968 return lnet_drop_rule_list(data->ioc_count, attr, stat);
969
970 case LNET_CTL_DELAY_ADD:
971 if (!attr)
972 return -EINVAL;
973
974 return lnet_delay_rule_add(attr);
975
976 case LNET_CTL_DELAY_DEL:
977 if (!attr)
978 return -EINVAL;
979
980 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
981 attr->fa_dst, false);
982 return 0;
983
984 case LNET_CTL_DELAY_RESET:
985 lnet_delay_rule_reset();
986 return 0;
987
988 case LNET_CTL_DELAY_LIST:
989 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
990 if (!attr || !stat)
991 return -EINVAL;
992
993 return lnet_delay_rule_list(data->ioc_count, attr, stat);
994 }
995 }
996
997 int
lnet_fault_init(void)998 lnet_fault_init(void)
999 {
1000 CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1001 CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1002 CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1003 CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1004
1005 mutex_init(&delay_dd.dd_mutex);
1006 spin_lock_init(&delay_dd.dd_lock);
1007 init_waitqueue_head(&delay_dd.dd_waitq);
1008 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1009 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1010
1011 return 0;
1012 }
1013
1014 void
lnet_fault_fini(void)1015 lnet_fault_fini(void)
1016 {
1017 lnet_drop_rule_del(0, 0);
1018 lnet_delay_rule_del(0, 0, true);
1019
1020 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1021 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1022 LASSERT(list_empty(&delay_dd.dd_sched_rules));
1023 }
1024