• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * GPL HEADER END
17  */
18 /*
19  * Copyright (c) 2012, 2015, Intel Corporation.
20  */
21 /*
22  * This file is part of Lustre, http://www.lustre.org/
23  * Lustre is a trademark of Sun Microsystems, Inc.
24  *
25  * lnet/lnet/lib-ptl.c
26  *
27  * portal & match routines
28  *
29  * Author: liang@whamcloud.com
30  */
31 
32 #define DEBUG_SUBSYSTEM S_LNET
33 
34 #include "../../include/linux/lnet/lib-lnet.h"
35 
36 /* NB: add /proc interfaces in upcoming patches */
37 int portal_rotor = LNET_PTL_ROTOR_HASH_RT;
38 module_param(portal_rotor, int, 0644);
39 MODULE_PARM_DESC(portal_rotor, "redirect PUTs to different cpu-partitions");
40 
41 static int
lnet_ptl_match_type(unsigned int index,lnet_process_id_t match_id,__u64 mbits,__u64 ignore_bits)42 lnet_ptl_match_type(unsigned int index, lnet_process_id_t match_id,
43 		    __u64 mbits, __u64 ignore_bits)
44 {
45 	struct lnet_portal *ptl = the_lnet.ln_portals[index];
46 	int unique;
47 
48 	unique = !ignore_bits &&
49 		 match_id.nid != LNET_NID_ANY &&
50 		 match_id.pid != LNET_PID_ANY;
51 
52 	LASSERT(!lnet_ptl_is_unique(ptl) || !lnet_ptl_is_wildcard(ptl));
53 
54 	/* prefer to check w/o any lock */
55 	if (likely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl)))
56 		goto match;
57 
58 	/* unset, new portal */
59 	lnet_ptl_lock(ptl);
60 	/* check again with lock */
61 	if (unlikely(lnet_ptl_is_unique(ptl) || lnet_ptl_is_wildcard(ptl))) {
62 		lnet_ptl_unlock(ptl);
63 		goto match;
64 	}
65 
66 	/* still not set */
67 	if (unique)
68 		lnet_ptl_setopt(ptl, LNET_PTL_MATCH_UNIQUE);
69 	else
70 		lnet_ptl_setopt(ptl, LNET_PTL_MATCH_WILDCARD);
71 
72 	lnet_ptl_unlock(ptl);
73 
74 	return 1;
75 
76  match:
77 	if ((lnet_ptl_is_unique(ptl) && !unique) ||
78 	    (lnet_ptl_is_wildcard(ptl) && unique))
79 		return 0;
80 	return 1;
81 }
82 
83 static void
lnet_ptl_enable_mt(struct lnet_portal * ptl,int cpt)84 lnet_ptl_enable_mt(struct lnet_portal *ptl, int cpt)
85 {
86 	struct lnet_match_table	*mtable = ptl->ptl_mtables[cpt];
87 	int i;
88 
89 	/* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
90 	LASSERT(lnet_ptl_is_wildcard(ptl));
91 
92 	mtable->mt_enabled = 1;
93 
94 	ptl->ptl_mt_maps[ptl->ptl_mt_nmaps] = cpt;
95 	for (i = ptl->ptl_mt_nmaps - 1; i >= 0; i--) {
96 		LASSERT(ptl->ptl_mt_maps[i] != cpt);
97 		if (ptl->ptl_mt_maps[i] < cpt)
98 			break;
99 
100 		/* swap to order */
101 		ptl->ptl_mt_maps[i + 1] = ptl->ptl_mt_maps[i];
102 		ptl->ptl_mt_maps[i] = cpt;
103 	}
104 
105 	ptl->ptl_mt_nmaps++;
106 }
107 
108 static void
lnet_ptl_disable_mt(struct lnet_portal * ptl,int cpt)109 lnet_ptl_disable_mt(struct lnet_portal *ptl, int cpt)
110 {
111 	struct lnet_match_table	*mtable = ptl->ptl_mtables[cpt];
112 	int i;
113 
114 	/* with hold of both lnet_res_lock(cpt) and lnet_ptl_lock */
115 	LASSERT(lnet_ptl_is_wildcard(ptl));
116 
117 	if (LNET_CPT_NUMBER == 1)
118 		return; /* never disable the only match-table */
119 
120 	mtable->mt_enabled = 0;
121 
122 	LASSERT(ptl->ptl_mt_nmaps > 0 &&
123 		ptl->ptl_mt_nmaps <= LNET_CPT_NUMBER);
124 
125 	/* remove it from mt_maps */
126 	ptl->ptl_mt_nmaps--;
127 	for (i = 0; i < ptl->ptl_mt_nmaps; i++) {
128 		if (ptl->ptl_mt_maps[i] >= cpt) /* overwrite it */
129 			ptl->ptl_mt_maps[i] = ptl->ptl_mt_maps[i + 1];
130 	}
131 }
132 
133 static int
lnet_try_match_md(lnet_libmd_t * md,struct lnet_match_info * info,struct lnet_msg * msg)134 lnet_try_match_md(lnet_libmd_t *md,
135 		  struct lnet_match_info *info, struct lnet_msg *msg)
136 {
137 	/*
138 	 * ALWAYS called holding the lnet_res_lock, and can't lnet_res_unlock;
139 	 * lnet_match_blocked_msg() relies on this to avoid races
140 	 */
141 	unsigned int offset;
142 	unsigned int mlength;
143 	lnet_me_t *me = md->md_me;
144 
145 	/* MD exhausted */
146 	if (lnet_md_exhausted(md))
147 		return LNET_MATCHMD_NONE | LNET_MATCHMD_EXHAUSTED;
148 
149 	/* mismatched MD op */
150 	if (!(md->md_options & info->mi_opc))
151 		return LNET_MATCHMD_NONE;
152 
153 	/* mismatched ME nid/pid? */
154 	if (me->me_match_id.nid != LNET_NID_ANY &&
155 	    me->me_match_id.nid != info->mi_id.nid)
156 		return LNET_MATCHMD_NONE;
157 
158 	if (me->me_match_id.pid != LNET_PID_ANY &&
159 	    me->me_match_id.pid != info->mi_id.pid)
160 		return LNET_MATCHMD_NONE;
161 
162 	/* mismatched ME matchbits? */
163 	if ((me->me_match_bits ^ info->mi_mbits) & ~me->me_ignore_bits)
164 		return LNET_MATCHMD_NONE;
165 
166 	/* Hurrah! This _is_ a match; check it out... */
167 
168 	if (!(md->md_options & LNET_MD_MANAGE_REMOTE))
169 		offset = md->md_offset;
170 	else
171 		offset = info->mi_roffset;
172 
173 	if (md->md_options & LNET_MD_MAX_SIZE) {
174 		mlength = md->md_max_size;
175 		LASSERT(md->md_offset + mlength <= md->md_length);
176 	} else {
177 		mlength = md->md_length - offset;
178 	}
179 
180 	if (info->mi_rlength <= mlength) {	/* fits in allowed space */
181 		mlength = info->mi_rlength;
182 	} else if (!(md->md_options & LNET_MD_TRUNCATE)) {
183 		/* this packet _really_ is too big */
184 		CERROR("Matching packet from %s, match %llu length %d too big: %d left, %d allowed\n",
185 		       libcfs_id2str(info->mi_id), info->mi_mbits,
186 		       info->mi_rlength, md->md_length - offset, mlength);
187 
188 		return LNET_MATCHMD_DROP;
189 	}
190 
191 	/* Commit to this ME/MD */
192 	CDEBUG(D_NET, "Incoming %s index %x from %s of length %d/%d into md %#llx [%d] + %d\n",
193 	       (info->mi_opc == LNET_MD_OP_PUT) ? "put" : "get",
194 	       info->mi_portal, libcfs_id2str(info->mi_id), mlength,
195 	       info->mi_rlength, md->md_lh.lh_cookie, md->md_niov, offset);
196 
197 	lnet_msg_attach_md(msg, md, offset, mlength);
198 	md->md_offset = offset + mlength;
199 
200 	if (!lnet_md_exhausted(md))
201 		return LNET_MATCHMD_OK;
202 
203 	/*
204 	 * Auto-unlink NOW, so the ME gets unlinked if required.
205 	 * We bumped md->md_refcount above so the MD just gets flagged
206 	 * for unlink when it is finalized.
207 	 */
208 	if (md->md_flags & LNET_MD_FLAG_AUTO_UNLINK)
209 		lnet_md_unlink(md);
210 
211 	return LNET_MATCHMD_OK | LNET_MATCHMD_EXHAUSTED;
212 }
213 
214 static struct lnet_match_table *
lnet_match2mt(struct lnet_portal * ptl,lnet_process_id_t id,__u64 mbits)215 lnet_match2mt(struct lnet_portal *ptl, lnet_process_id_t id, __u64 mbits)
216 {
217 	if (LNET_CPT_NUMBER == 1)
218 		return ptl->ptl_mtables[0]; /* the only one */
219 
220 	/* if it's a unique portal, return match-table hashed by NID */
221 	return lnet_ptl_is_unique(ptl) ?
222 	       ptl->ptl_mtables[lnet_cpt_of_nid(id.nid)] : NULL;
223 }
224 
225 struct lnet_match_table *
lnet_mt_of_attach(unsigned int index,lnet_process_id_t id,__u64 mbits,__u64 ignore_bits,lnet_ins_pos_t pos)226 lnet_mt_of_attach(unsigned int index, lnet_process_id_t id,
227 		  __u64 mbits, __u64 ignore_bits, lnet_ins_pos_t pos)
228 {
229 	struct lnet_portal *ptl;
230 	struct lnet_match_table	*mtable;
231 
232 	/* NB: called w/o lock */
233 	LASSERT(index < the_lnet.ln_nportals);
234 
235 	if (!lnet_ptl_match_type(index, id, mbits, ignore_bits))
236 		return NULL;
237 
238 	ptl = the_lnet.ln_portals[index];
239 
240 	mtable = lnet_match2mt(ptl, id, mbits);
241 	if (mtable) /* unique portal or only one match-table */
242 		return mtable;
243 
244 	/* it's a wildcard portal */
245 	switch (pos) {
246 	default:
247 		return NULL;
248 	case LNET_INS_BEFORE:
249 	case LNET_INS_AFTER:
250 		/*
251 		 * posted by no affinity thread, always hash to specific
252 		 * match-table to avoid buffer stealing which is heavy
253 		 */
254 		return ptl->ptl_mtables[ptl->ptl_index % LNET_CPT_NUMBER];
255 	case LNET_INS_LOCAL:
256 		/* posted by cpu-affinity thread */
257 		return ptl->ptl_mtables[lnet_cpt_current()];
258 	}
259 }
260 
261 static struct lnet_match_table *
lnet_mt_of_match(struct lnet_match_info * info,struct lnet_msg * msg)262 lnet_mt_of_match(struct lnet_match_info *info, struct lnet_msg *msg)
263 {
264 	struct lnet_match_table	*mtable;
265 	struct lnet_portal *ptl;
266 	unsigned int nmaps;
267 	unsigned int rotor;
268 	unsigned int cpt;
269 	bool routed;
270 
271 	/* NB: called w/o lock */
272 	LASSERT(info->mi_portal < the_lnet.ln_nportals);
273 	ptl = the_lnet.ln_portals[info->mi_portal];
274 
275 	LASSERT(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl));
276 
277 	mtable = lnet_match2mt(ptl, info->mi_id, info->mi_mbits);
278 	if (mtable)
279 		return mtable;
280 
281 	/* it's a wildcard portal */
282 	routed = LNET_NIDNET(msg->msg_hdr.src_nid) !=
283 		 LNET_NIDNET(msg->msg_hdr.dest_nid);
284 
285 	if (portal_rotor == LNET_PTL_ROTOR_OFF ||
286 	    (portal_rotor != LNET_PTL_ROTOR_ON && !routed)) {
287 		cpt = lnet_cpt_current();
288 		if (ptl->ptl_mtables[cpt]->mt_enabled)
289 			return ptl->ptl_mtables[cpt];
290 	}
291 
292 	rotor = ptl->ptl_rotor++; /* get round-robin factor */
293 	if (portal_rotor == LNET_PTL_ROTOR_HASH_RT && routed)
294 		cpt = lnet_cpt_of_nid(msg->msg_hdr.src_nid);
295 	else
296 		cpt = rotor % LNET_CPT_NUMBER;
297 
298 	if (!ptl->ptl_mtables[cpt]->mt_enabled) {
299 		/* is there any active entry for this portal? */
300 		nmaps = ptl->ptl_mt_nmaps;
301 		/* map to an active mtable to avoid heavy "stealing" */
302 		if (nmaps) {
303 			/*
304 			 * NB: there is possibility that ptl_mt_maps is being
305 			 * changed because we are not under protection of
306 			 * lnet_ptl_lock, but it shouldn't hurt anything
307 			 */
308 			cpt = ptl->ptl_mt_maps[rotor % nmaps];
309 		}
310 	}
311 
312 	return ptl->ptl_mtables[cpt];
313 }
314 
315 static int
lnet_mt_test_exhausted(struct lnet_match_table * mtable,int pos)316 lnet_mt_test_exhausted(struct lnet_match_table *mtable, int pos)
317 {
318 	__u64 *bmap;
319 	int i;
320 
321 	if (!lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
322 		return 0;
323 
324 	if (pos < 0) { /* check all bits */
325 		for (i = 0; i < LNET_MT_EXHAUSTED_BMAP; i++) {
326 			if (mtable->mt_exhausted[i] != (__u64)(-1))
327 				return 0;
328 		}
329 		return 1;
330 	}
331 
332 	LASSERT(pos <= LNET_MT_HASH_IGNORE);
333 	/* mtable::mt_mhash[pos] is marked as exhausted or not */
334 	bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
335 	pos &= (1 << LNET_MT_BITS_U64) - 1;
336 
337 	return (*bmap & (1ULL << pos));
338 }
339 
340 static void
lnet_mt_set_exhausted(struct lnet_match_table * mtable,int pos,int exhausted)341 lnet_mt_set_exhausted(struct lnet_match_table *mtable, int pos, int exhausted)
342 {
343 	__u64 *bmap;
344 
345 	LASSERT(lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]));
346 	LASSERT(pos <= LNET_MT_HASH_IGNORE);
347 
348 	/* set mtable::mt_mhash[pos] as exhausted/non-exhausted */
349 	bmap = &mtable->mt_exhausted[pos >> LNET_MT_BITS_U64];
350 	pos &= (1 << LNET_MT_BITS_U64) - 1;
351 
352 	if (!exhausted)
353 		*bmap &= ~(1ULL << pos);
354 	else
355 		*bmap |= 1ULL << pos;
356 }
357 
358 struct list_head *
lnet_mt_match_head(struct lnet_match_table * mtable,lnet_process_id_t id,__u64 mbits)359 lnet_mt_match_head(struct lnet_match_table *mtable,
360 		   lnet_process_id_t id, __u64 mbits)
361 {
362 	struct lnet_portal *ptl = the_lnet.ln_portals[mtable->mt_portal];
363 	unsigned long hash = mbits;
364 
365 	if (!lnet_ptl_is_wildcard(ptl)) {
366 		hash += id.nid + id.pid;
367 
368 		LASSERT(lnet_ptl_is_unique(ptl));
369 		hash = hash_long(hash, LNET_MT_HASH_BITS);
370 	}
371 	return &mtable->mt_mhash[hash & LNET_MT_HASH_MASK];
372 }
373 
374 int
lnet_mt_match_md(struct lnet_match_table * mtable,struct lnet_match_info * info,struct lnet_msg * msg)375 lnet_mt_match_md(struct lnet_match_table *mtable,
376 		 struct lnet_match_info *info, struct lnet_msg *msg)
377 {
378 	struct list_head *head;
379 	lnet_me_t *me;
380 	lnet_me_t *tmp;
381 	int exhausted = 0;
382 	int rc;
383 
384 	/* any ME with ignore bits? */
385 	if (!list_empty(&mtable->mt_mhash[LNET_MT_HASH_IGNORE]))
386 		head = &mtable->mt_mhash[LNET_MT_HASH_IGNORE];
387 	else
388 		head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
389  again:
390 	/* NB: only wildcard portal needs to return LNET_MATCHMD_EXHAUSTED */
391 	if (lnet_ptl_is_wildcard(the_lnet.ln_portals[mtable->mt_portal]))
392 		exhausted = LNET_MATCHMD_EXHAUSTED;
393 
394 	list_for_each_entry_safe(me, tmp, head, me_list) {
395 		/* ME attached but MD not attached yet */
396 		if (!me->me_md)
397 			continue;
398 
399 		LASSERT(me == me->me_md->md_me);
400 
401 		rc = lnet_try_match_md(me->me_md, info, msg);
402 		if (!(rc & LNET_MATCHMD_EXHAUSTED))
403 			exhausted = 0; /* mlist is not empty */
404 
405 		if (rc & LNET_MATCHMD_FINISH) {
406 			/*
407 			 * don't return EXHAUSTED bit because we don't know
408 			 * whether the mlist is empty or not
409 			 */
410 			return rc & ~LNET_MATCHMD_EXHAUSTED;
411 		}
412 	}
413 
414 	if (exhausted == LNET_MATCHMD_EXHAUSTED) { /* @head is exhausted */
415 		lnet_mt_set_exhausted(mtable, head - mtable->mt_mhash, 1);
416 		if (!lnet_mt_test_exhausted(mtable, -1))
417 			exhausted = 0;
418 	}
419 
420 	if (!exhausted && head == &mtable->mt_mhash[LNET_MT_HASH_IGNORE]) {
421 		head = lnet_mt_match_head(mtable, info->mi_id, info->mi_mbits);
422 		goto again; /* re-check MEs w/o ignore-bits */
423 	}
424 
425 	if (info->mi_opc == LNET_MD_OP_GET ||
426 	    !lnet_ptl_is_lazy(the_lnet.ln_portals[info->mi_portal]))
427 		return exhausted | LNET_MATCHMD_DROP;
428 
429 	return exhausted | LNET_MATCHMD_NONE;
430 }
431 
432 static int
lnet_ptl_match_early(struct lnet_portal * ptl,struct lnet_msg * msg)433 lnet_ptl_match_early(struct lnet_portal *ptl, struct lnet_msg *msg)
434 {
435 	int rc;
436 
437 	/*
438 	 * message arrived before any buffer posting on this portal,
439 	 * simply delay or drop this message
440 	 */
441 	if (likely(lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)))
442 		return 0;
443 
444 	lnet_ptl_lock(ptl);
445 	/* check it again with hold of lock */
446 	if (lnet_ptl_is_wildcard(ptl) || lnet_ptl_is_unique(ptl)) {
447 		lnet_ptl_unlock(ptl);
448 		return 0;
449 	}
450 
451 	if (lnet_ptl_is_lazy(ptl)) {
452 		if (msg->msg_rx_ready_delay) {
453 			msg->msg_rx_delayed = 1;
454 			list_add_tail(&msg->msg_list,
455 				      &ptl->ptl_msg_delayed);
456 		}
457 		rc = LNET_MATCHMD_NONE;
458 	} else {
459 		rc = LNET_MATCHMD_DROP;
460 	}
461 
462 	lnet_ptl_unlock(ptl);
463 	return rc;
464 }
465 
466 static int
lnet_ptl_match_delay(struct lnet_portal * ptl,struct lnet_match_info * info,struct lnet_msg * msg)467 lnet_ptl_match_delay(struct lnet_portal *ptl,
468 		     struct lnet_match_info *info, struct lnet_msg *msg)
469 {
470 	int first = ptl->ptl_mt_maps[0]; /* read w/o lock */
471 	int rc = 0;
472 	int i;
473 
474 	/**
475 	 * Steal buffer from other CPTs, and delay msg if nothing to
476 	 * steal. This function is more expensive than a regular
477 	 * match, but we don't expect it can happen a lot. The return
478 	 * code contains one of LNET_MATCHMD_OK, LNET_MATCHMD_DROP, or
479 	 * LNET_MATCHMD_NONE.
480 	 */
481 	LASSERT(lnet_ptl_is_wildcard(ptl));
482 
483 	for (i = 0; i < LNET_CPT_NUMBER; i++) {
484 		struct lnet_match_table *mtable;
485 		int cpt;
486 
487 		cpt = (first + i) % LNET_CPT_NUMBER;
488 		mtable = ptl->ptl_mtables[cpt];
489 		if (i && i != LNET_CPT_NUMBER - 1 && !mtable->mt_enabled)
490 			continue;
491 
492 		lnet_res_lock(cpt);
493 		lnet_ptl_lock(ptl);
494 
495 		if (!i) {
496 			/* The first try, add to stealing list. */
497 			list_add_tail(&msg->msg_list,
498 				      &ptl->ptl_msg_stealing);
499 		}
500 
501 		if (!list_empty(&msg->msg_list)) {
502 			/* On stealing list. */
503 			rc = lnet_mt_match_md(mtable, info, msg);
504 
505 			if ((rc & LNET_MATCHMD_EXHAUSTED) &&
506 			    mtable->mt_enabled)
507 				lnet_ptl_disable_mt(ptl, cpt);
508 
509 			if (rc & LNET_MATCHMD_FINISH) {
510 				/* Match found, remove from stealing list. */
511 				list_del_init(&msg->msg_list);
512 			} else if (i == LNET_CPT_NUMBER - 1 ||	/* (1) */
513 				   !ptl->ptl_mt_nmaps ||	/* (2) */
514 				   (ptl->ptl_mt_nmaps == 1 &&	/* (3) */
515 				    ptl->ptl_mt_maps[0] == cpt)) {
516 				/**
517 				 * No match found, and this is either
518 				 * (1) the last cpt to check, or
519 				 * (2) there is no active cpt, or
520 				 * (3) this is the only active cpt.
521 				 * There is nothing to steal: delay or
522 				 * drop the message.
523 				 */
524 				list_del_init(&msg->msg_list);
525 
526 				if (lnet_ptl_is_lazy(ptl)) {
527 					msg->msg_rx_delayed = 1;
528 					list_add_tail(&msg->msg_list,
529 						      &ptl->ptl_msg_delayed);
530 					rc = LNET_MATCHMD_NONE;
531 				} else {
532 					rc = LNET_MATCHMD_DROP;
533 				}
534 			} else {
535 				/* Do another iteration. */
536 				rc = 0;
537 			}
538 		} else {
539 			/**
540 			 * No longer on stealing list: another thread
541 			 * matched the message in lnet_ptl_attach_md().
542 			 * We are now expected to handle the message.
543 			 */
544 			rc = !msg->msg_md ?
545 			     LNET_MATCHMD_DROP : LNET_MATCHMD_OK;
546 		}
547 
548 		lnet_ptl_unlock(ptl);
549 		lnet_res_unlock(cpt);
550 
551 		/**
552 		 * Note that test (1) above ensures that we always
553 		 * exit the loop through this break statement.
554 		 *
555 		 * LNET_MATCHMD_NONE means msg was added to the
556 		 * delayed queue, and we may no longer reference it
557 		 * after lnet_ptl_unlock() and lnet_res_unlock().
558 		 */
559 		if (rc & (LNET_MATCHMD_FINISH | LNET_MATCHMD_NONE))
560 			break;
561 	}
562 
563 	return rc;
564 }
565 
566 int
lnet_ptl_match_md(struct lnet_match_info * info,struct lnet_msg * msg)567 lnet_ptl_match_md(struct lnet_match_info *info, struct lnet_msg *msg)
568 {
569 	struct lnet_match_table	*mtable;
570 	struct lnet_portal *ptl;
571 	int rc;
572 
573 	CDEBUG(D_NET, "Request from %s of length %d into portal %d MB=%#llx\n",
574 	       libcfs_id2str(info->mi_id), info->mi_rlength, info->mi_portal,
575 	       info->mi_mbits);
576 
577 	if (info->mi_portal >= the_lnet.ln_nportals) {
578 		CERROR("Invalid portal %d not in [0-%d]\n",
579 		       info->mi_portal, the_lnet.ln_nportals);
580 		return LNET_MATCHMD_DROP;
581 	}
582 
583 	ptl = the_lnet.ln_portals[info->mi_portal];
584 	rc = lnet_ptl_match_early(ptl, msg);
585 	if (rc) /* matched or delayed early message */
586 		return rc;
587 
588 	mtable = lnet_mt_of_match(info, msg);
589 	lnet_res_lock(mtable->mt_cpt);
590 
591 	if (the_lnet.ln_shutdown) {
592 		rc = LNET_MATCHMD_DROP;
593 		goto out1;
594 	}
595 
596 	rc = lnet_mt_match_md(mtable, info, msg);
597 	if ((rc & LNET_MATCHMD_EXHAUSTED) && mtable->mt_enabled) {
598 		lnet_ptl_lock(ptl);
599 		lnet_ptl_disable_mt(ptl, mtable->mt_cpt);
600 		lnet_ptl_unlock(ptl);
601 	}
602 
603 	if (rc & LNET_MATCHMD_FINISH)	/* matched or dropping */
604 		goto out1;
605 
606 	if (!msg->msg_rx_ready_delay)
607 		goto out1;
608 
609 	LASSERT(lnet_ptl_is_lazy(ptl));
610 	LASSERT(!msg->msg_rx_delayed);
611 
612 	/* NB: we don't expect "delay" can happen a lot */
613 	if (lnet_ptl_is_unique(ptl) || LNET_CPT_NUMBER == 1) {
614 		lnet_ptl_lock(ptl);
615 
616 		msg->msg_rx_delayed = 1;
617 		list_add_tail(&msg->msg_list, &ptl->ptl_msg_delayed);
618 
619 		lnet_ptl_unlock(ptl);
620 		lnet_res_unlock(mtable->mt_cpt);
621 		rc = LNET_MATCHMD_NONE;
622 	} else  {
623 		lnet_res_unlock(mtable->mt_cpt);
624 		rc = lnet_ptl_match_delay(ptl, info, msg);
625 	}
626 
627 	/* LNET_MATCHMD_NONE means msg was added to the delay queue */
628 	if (rc & LNET_MATCHMD_NONE) {
629 		CDEBUG(D_NET,
630 		       "Delaying %s from %s ptl %d MB %#llx off %d len %d\n",
631 		       info->mi_opc == LNET_MD_OP_PUT ? "PUT" : "GET",
632 		       libcfs_id2str(info->mi_id), info->mi_portal,
633 		       info->mi_mbits, info->mi_roffset, info->mi_rlength);
634 	}
635 	goto out0;
636  out1:
637 	lnet_res_unlock(mtable->mt_cpt);
638  out0:
639 	/* EXHAUSTED bit is only meaningful for internal functions */
640 	return rc & ~LNET_MATCHMD_EXHAUSTED;
641 }
642 
643 void
lnet_ptl_detach_md(lnet_me_t * me,lnet_libmd_t * md)644 lnet_ptl_detach_md(lnet_me_t *me, lnet_libmd_t *md)
645 {
646 	LASSERT(me->me_md == md && md->md_me == me);
647 
648 	me->me_md = NULL;
649 	md->md_me = NULL;
650 }
651 
652 /* called with lnet_res_lock held */
653 void
lnet_ptl_attach_md(lnet_me_t * me,lnet_libmd_t * md,struct list_head * matches,struct list_head * drops)654 lnet_ptl_attach_md(lnet_me_t *me, lnet_libmd_t *md,
655 		   struct list_head *matches, struct list_head *drops)
656 {
657 	struct lnet_portal *ptl = the_lnet.ln_portals[me->me_portal];
658 	struct lnet_match_table	*mtable;
659 	struct list_head *head;
660 	lnet_msg_t *tmp;
661 	lnet_msg_t *msg;
662 	int exhausted = 0;
663 	int cpt;
664 
665 	LASSERT(!md->md_refcount); /* a brand new MD */
666 
667 	me->me_md = md;
668 	md->md_me = me;
669 
670 	cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
671 	mtable = ptl->ptl_mtables[cpt];
672 
673 	if (list_empty(&ptl->ptl_msg_stealing) &&
674 	    list_empty(&ptl->ptl_msg_delayed) &&
675 	    !lnet_mt_test_exhausted(mtable, me->me_pos))
676 		return;
677 
678 	lnet_ptl_lock(ptl);
679 	head = &ptl->ptl_msg_stealing;
680  again:
681 	list_for_each_entry_safe(msg, tmp, head, msg_list) {
682 		struct lnet_match_info info;
683 		lnet_hdr_t *hdr;
684 		int rc;
685 
686 		LASSERT(msg->msg_rx_delayed || head == &ptl->ptl_msg_stealing);
687 
688 		hdr = &msg->msg_hdr;
689 		info.mi_id.nid  = hdr->src_nid;
690 		info.mi_id.pid  = hdr->src_pid;
691 		info.mi_opc     = LNET_MD_OP_PUT;
692 		info.mi_portal  = hdr->msg.put.ptl_index;
693 		info.mi_rlength = hdr->payload_length;
694 		info.mi_roffset = hdr->msg.put.offset;
695 		info.mi_mbits   = hdr->msg.put.match_bits;
696 
697 		rc = lnet_try_match_md(md, &info, msg);
698 
699 		exhausted = (rc & LNET_MATCHMD_EXHAUSTED);
700 		if (rc & LNET_MATCHMD_NONE) {
701 			if (exhausted)
702 				break;
703 			continue;
704 		}
705 
706 		/* Hurrah! This _is_ a match */
707 		LASSERT(rc & LNET_MATCHMD_FINISH);
708 		list_del_init(&msg->msg_list);
709 
710 		if (head == &ptl->ptl_msg_stealing) {
711 			if (exhausted)
712 				break;
713 			/* stealing thread will handle the message */
714 			continue;
715 		}
716 
717 		if (rc & LNET_MATCHMD_OK) {
718 			list_add_tail(&msg->msg_list, matches);
719 
720 			CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
721 			       libcfs_id2str(info.mi_id),
722 			       info.mi_portal, info.mi_mbits,
723 			       info.mi_roffset, info.mi_rlength);
724 		} else {
725 			list_add_tail(&msg->msg_list, drops);
726 		}
727 
728 		if (exhausted)
729 			break;
730 	}
731 
732 	if (!exhausted && head == &ptl->ptl_msg_stealing) {
733 		head = &ptl->ptl_msg_delayed;
734 		goto again;
735 	}
736 
737 	if (lnet_ptl_is_wildcard(ptl) && !exhausted) {
738 		lnet_mt_set_exhausted(mtable, me->me_pos, 0);
739 		if (!mtable->mt_enabled)
740 			lnet_ptl_enable_mt(ptl, cpt);
741 	}
742 
743 	lnet_ptl_unlock(ptl);
744 }
745 
746 static void
lnet_ptl_cleanup(struct lnet_portal * ptl)747 lnet_ptl_cleanup(struct lnet_portal *ptl)
748 {
749 	struct lnet_match_table	*mtable;
750 	int i;
751 
752 	if (!ptl->ptl_mtables) /* uninitialized portal */
753 		return;
754 
755 	LASSERT(list_empty(&ptl->ptl_msg_delayed));
756 	LASSERT(list_empty(&ptl->ptl_msg_stealing));
757 	cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
758 		struct list_head *mhash;
759 		lnet_me_t *me;
760 		int j;
761 
762 		if (!mtable->mt_mhash) /* uninitialized match-table */
763 			continue;
764 
765 		mhash = mtable->mt_mhash;
766 		/* cleanup ME */
767 		for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++) {
768 			while (!list_empty(&mhash[j])) {
769 				me = list_entry(mhash[j].next,
770 						lnet_me_t, me_list);
771 				CERROR("Active ME %p on exit\n", me);
772 				list_del(&me->me_list);
773 				lnet_me_free(me);
774 			}
775 		}
776 		/* the extra entry is for MEs with ignore bits */
777 		LIBCFS_FREE(mhash, sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
778 	}
779 
780 	cfs_percpt_free(ptl->ptl_mtables);
781 	ptl->ptl_mtables = NULL;
782 }
783 
784 static int
lnet_ptl_setup(struct lnet_portal * ptl,int index)785 lnet_ptl_setup(struct lnet_portal *ptl, int index)
786 {
787 	struct lnet_match_table	*mtable;
788 	struct list_head *mhash;
789 	int i;
790 	int j;
791 
792 	ptl->ptl_mtables = cfs_percpt_alloc(lnet_cpt_table(),
793 					    sizeof(struct lnet_match_table));
794 	if (!ptl->ptl_mtables) {
795 		CERROR("Failed to create match table for portal %d\n", index);
796 		return -ENOMEM;
797 	}
798 
799 	ptl->ptl_index = index;
800 	INIT_LIST_HEAD(&ptl->ptl_msg_delayed);
801 	INIT_LIST_HEAD(&ptl->ptl_msg_stealing);
802 	spin_lock_init(&ptl->ptl_lock);
803 	cfs_percpt_for_each(mtable, i, ptl->ptl_mtables) {
804 		/* the extra entry is for MEs with ignore bits */
805 		LIBCFS_CPT_ALLOC(mhash, lnet_cpt_table(), i,
806 				 sizeof(*mhash) * (LNET_MT_HASH_SIZE + 1));
807 		if (!mhash) {
808 			CERROR("Failed to create match hash for portal %d\n",
809 			       index);
810 			goto failed;
811 		}
812 
813 		memset(&mtable->mt_exhausted[0], -1,
814 		       sizeof(mtable->mt_exhausted[0]) *
815 		       LNET_MT_EXHAUSTED_BMAP);
816 		mtable->mt_mhash = mhash;
817 		for (j = 0; j < LNET_MT_HASH_SIZE + 1; j++)
818 			INIT_LIST_HEAD(&mhash[j]);
819 
820 		mtable->mt_portal = index;
821 		mtable->mt_cpt = i;
822 	}
823 
824 	return 0;
825  failed:
826 	lnet_ptl_cleanup(ptl);
827 	return -ENOMEM;
828 }
829 
830 void
lnet_portals_destroy(void)831 lnet_portals_destroy(void)
832 {
833 	int i;
834 
835 	if (!the_lnet.ln_portals)
836 		return;
837 
838 	for (i = 0; i < the_lnet.ln_nportals; i++)
839 		lnet_ptl_cleanup(the_lnet.ln_portals[i]);
840 
841 	cfs_array_free(the_lnet.ln_portals);
842 	the_lnet.ln_portals = NULL;
843 }
844 
845 int
lnet_portals_create(void)846 lnet_portals_create(void)
847 {
848 	int size;
849 	int i;
850 
851 	size = offsetof(struct lnet_portal, ptl_mt_maps[LNET_CPT_NUMBER]);
852 
853 	the_lnet.ln_nportals = MAX_PORTALS;
854 	the_lnet.ln_portals = cfs_array_alloc(the_lnet.ln_nportals, size);
855 	if (!the_lnet.ln_portals) {
856 		CERROR("Failed to allocate portals table\n");
857 		return -ENOMEM;
858 	}
859 
860 	for (i = 0; i < the_lnet.ln_nportals; i++) {
861 		if (lnet_ptl_setup(the_lnet.ln_portals[i], i)) {
862 			lnet_portals_destroy();
863 			return -ENOMEM;
864 		}
865 	}
866 
867 	return 0;
868 }
869 
870 /**
871  * Turn on the lazy portal attribute. Use with caution!
872  *
873  * This portal attribute only affects incoming PUT requests to the portal,
874  * and is off by default. By default, if there's no matching MD for an
875  * incoming PUT request, it is simply dropped. With the lazy attribute on,
876  * such requests are queued indefinitely until either a matching MD is
877  * posted to the portal or the lazy attribute is turned off.
878  *
879  * It would prevent dropped requests, however it should be regarded as the
880  * last line of defense - i.e. users must keep a close watch on active
881  * buffers on a lazy portal and once it becomes too low post more buffers as
882  * soon as possible. This is because delayed requests usually have detrimental
883  * effects on underlying network connections. A few delayed requests often
884  * suffice to bring an underlying connection to a complete halt, due to flow
885  * control mechanisms.
886  *
887  * There's also a DOS attack risk. If users don't post match-all MDs on a
888  * lazy portal, a malicious peer can easily stop a service by sending some
889  * PUT requests with match bits that won't match any MD. A routed server is
890  * especially vulnerable since the connections to its neighbor routers are
891  * shared among all clients.
892  *
893  * \param portal Index of the portal to enable the lazy attribute on.
894  *
895  * \retval 0       On success.
896  * \retval -EINVAL If \a portal is not a valid index.
897  */
898 int
LNetSetLazyPortal(int portal)899 LNetSetLazyPortal(int portal)
900 {
901 	struct lnet_portal *ptl;
902 
903 	if (portal < 0 || portal >= the_lnet.ln_nportals)
904 		return -EINVAL;
905 
906 	CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
907 	ptl = the_lnet.ln_portals[portal];
908 
909 	lnet_res_lock(LNET_LOCK_EX);
910 	lnet_ptl_lock(ptl);
911 
912 	lnet_ptl_setopt(ptl, LNET_PTL_LAZY);
913 
914 	lnet_ptl_unlock(ptl);
915 	lnet_res_unlock(LNET_LOCK_EX);
916 
917 	return 0;
918 }
919 EXPORT_SYMBOL(LNetSetLazyPortal);
920 
921 int
lnet_clear_lazy_portal(struct lnet_ni * ni,int portal,char * reason)922 lnet_clear_lazy_portal(struct lnet_ni *ni, int portal, char *reason)
923 {
924 	struct lnet_portal *ptl;
925 	LIST_HEAD(zombies);
926 
927 	if (portal < 0 || portal >= the_lnet.ln_nportals)
928 		return -EINVAL;
929 
930 	ptl = the_lnet.ln_portals[portal];
931 
932 	lnet_res_lock(LNET_LOCK_EX);
933 	lnet_ptl_lock(ptl);
934 
935 	if (!lnet_ptl_is_lazy(ptl)) {
936 		lnet_ptl_unlock(ptl);
937 		lnet_res_unlock(LNET_LOCK_EX);
938 		return 0;
939 	}
940 
941 	if (ni) {
942 		struct lnet_msg *msg, *tmp;
943 
944 		/* grab all messages which are on the NI passed in */
945 		list_for_each_entry_safe(msg, tmp, &ptl->ptl_msg_delayed,
946 					 msg_list) {
947 			if (msg->msg_rxpeer->lp_ni == ni)
948 				list_move(&msg->msg_list, &zombies);
949 		}
950 	} else {
951 		if (the_lnet.ln_shutdown)
952 			CWARN("Active lazy portal %d on exit\n", portal);
953 		else
954 			CDEBUG(D_NET, "clearing portal %d lazy\n", portal);
955 
956 		/* grab all the blocked messages atomically */
957 		list_splice_init(&ptl->ptl_msg_delayed, &zombies);
958 
959 		lnet_ptl_unsetopt(ptl, LNET_PTL_LAZY);
960 	}
961 
962 	lnet_ptl_unlock(ptl);
963 	lnet_res_unlock(LNET_LOCK_EX);
964 
965 	lnet_drop_delayed_msg_list(&zombies, reason);
966 
967 	return 0;
968 }
969 
970 /**
971  * Turn off the lazy portal attribute. Delayed requests on the portal,
972  * if any, will be all dropped when this function returns.
973  *
974  * \param portal Index of the portal to disable the lazy attribute on.
975  *
976  * \retval 0       On success.
977  * \retval -EINVAL If \a portal is not a valid index.
978  */
979 int
LNetClearLazyPortal(int portal)980 LNetClearLazyPortal(int portal)
981 {
982 	return lnet_clear_lazy_portal(NULL, portal,
983 				      "Clearing lazy portal attr");
984 }
985 EXPORT_SYMBOL(LNetClearLazyPortal);
986