• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-msg.c
37  *
38  * Message decoding, parsing and finalizing routines
39  */
40 
41 #define DEBUG_SUBSYSTEM S_LNET
42 
43 #include "../../include/linux/lnet/lib-lnet.h"
44 
45 void
lnet_build_unlink_event(lnet_libmd_t * md,lnet_event_t * ev)46 lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
47 {
48 	memset(ev, 0, sizeof(*ev));
49 
50 	ev->status   = 0;
51 	ev->unlinked = 1;
52 	ev->type     = LNET_EVENT_UNLINK;
53 	lnet_md_deconstruct(md, &ev->md);
54 	lnet_md2handle(&ev->md_handle, md);
55 }
56 
57 /*
58  * Don't need any lock, must be called after lnet_commit_md
59  */
60 void
lnet_build_msg_event(lnet_msg_t * msg,lnet_event_kind_t ev_type)61 lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
62 {
63 	lnet_hdr_t	*hdr = &msg->msg_hdr;
64 	lnet_event_t	*ev  = &msg->msg_ev;
65 
66 	LASSERT(!msg->msg_routing);
67 
68 	ev->type = ev_type;
69 
70 	if (ev_type == LNET_EVENT_SEND) {
71 		/* event for active message */
72 		ev->target.nid    = le64_to_cpu(hdr->dest_nid);
73 		ev->target.pid    = le32_to_cpu(hdr->dest_pid);
74 		ev->initiator.nid = LNET_NID_ANY;
75 		ev->initiator.pid = the_lnet.ln_pid;
76 		ev->sender	  = LNET_NID_ANY;
77 
78 	} else {
79 		/* event for passive message */
80 		ev->target.pid    = hdr->dest_pid;
81 		ev->target.nid    = hdr->dest_nid;
82 		ev->initiator.pid = hdr->src_pid;
83 		ev->initiator.nid = hdr->src_nid;
84 		ev->rlength       = hdr->payload_length;
85 		ev->sender	  = msg->msg_from;
86 		ev->mlength	  = msg->msg_wanted;
87 		ev->offset	  = msg->msg_offset;
88 	}
89 
90 	switch (ev_type) {
91 	default:
92 		LBUG();
93 
94 	case LNET_EVENT_PUT: /* passive PUT */
95 		ev->pt_index   = hdr->msg.put.ptl_index;
96 		ev->match_bits = hdr->msg.put.match_bits;
97 		ev->hdr_data   = hdr->msg.put.hdr_data;
98 		return;
99 
100 	case LNET_EVENT_GET: /* passive GET */
101 		ev->pt_index   = hdr->msg.get.ptl_index;
102 		ev->match_bits = hdr->msg.get.match_bits;
103 		ev->hdr_data   = 0;
104 		return;
105 
106 	case LNET_EVENT_ACK: /* ACK */
107 		ev->match_bits = hdr->msg.ack.match_bits;
108 		ev->mlength    = hdr->msg.ack.mlength;
109 		return;
110 
111 	case LNET_EVENT_REPLY: /* REPLY */
112 		return;
113 
114 	case LNET_EVENT_SEND: /* active message */
115 		if (msg->msg_type == LNET_MSG_PUT) {
116 			ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
117 			ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
118 			ev->offset     = le32_to_cpu(hdr->msg.put.offset);
119 			ev->mlength    =
120 			ev->rlength    = le32_to_cpu(hdr->payload_length);
121 			ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
122 
123 		} else {
124 			LASSERT(msg->msg_type == LNET_MSG_GET);
125 			ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
126 			ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
127 			ev->mlength    =
128 			ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
129 			ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
130 			ev->hdr_data   = 0;
131 		}
132 		return;
133 	}
134 }
135 
136 void
lnet_msg_commit(lnet_msg_t * msg,int cpt)137 lnet_msg_commit(lnet_msg_t *msg, int cpt)
138 {
139 	struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
140 	lnet_counters_t		  *counters  = the_lnet.ln_counters[cpt];
141 
142 	/* routed message can be committed for both receiving and sending */
143 	LASSERT(!msg->msg_tx_committed);
144 
145 	if (msg->msg_sending) {
146 		LASSERT(!msg->msg_receiving);
147 
148 		msg->msg_tx_cpt = cpt;
149 		msg->msg_tx_committed = 1;
150 		if (msg->msg_rx_committed) { /* routed message REPLY */
151 			LASSERT(msg->msg_onactivelist);
152 			return;
153 		}
154 	} else {
155 		LASSERT(!msg->msg_sending);
156 		msg->msg_rx_cpt = cpt;
157 		msg->msg_rx_committed = 1;
158 	}
159 
160 	LASSERT(!msg->msg_onactivelist);
161 	msg->msg_onactivelist = 1;
162 	list_add(&msg->msg_activelist, &container->msc_active);
163 
164 	counters->msgs_alloc++;
165 	if (counters->msgs_alloc > counters->msgs_max)
166 		counters->msgs_max = counters->msgs_alloc;
167 }
168 
169 static void
lnet_msg_decommit_tx(lnet_msg_t * msg,int status)170 lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
171 {
172 	lnet_counters_t	*counters;
173 	lnet_event_t	*ev = &msg->msg_ev;
174 
175 	LASSERT(msg->msg_tx_committed);
176 	if (status != 0)
177 		goto out;
178 
179 	counters = the_lnet.ln_counters[msg->msg_tx_cpt];
180 	switch (ev->type) {
181 	default: /* routed message */
182 		LASSERT(msg->msg_routing);
183 		LASSERT(msg->msg_rx_committed);
184 		LASSERT(ev->type == 0);
185 
186 		counters->route_length += msg->msg_len;
187 		counters->route_count++;
188 		goto out;
189 
190 	case LNET_EVENT_PUT:
191 		/* should have been decommitted */
192 		LASSERT(!msg->msg_rx_committed);
193 		/* overwritten while sending ACK */
194 		LASSERT(msg->msg_type == LNET_MSG_ACK);
195 		msg->msg_type = LNET_MSG_PUT; /* fix type */
196 		break;
197 
198 	case LNET_EVENT_SEND:
199 		LASSERT(!msg->msg_rx_committed);
200 		if (msg->msg_type == LNET_MSG_PUT)
201 			counters->send_length += msg->msg_len;
202 		break;
203 
204 	case LNET_EVENT_GET:
205 		LASSERT(msg->msg_rx_committed);
206 		/* overwritten while sending reply, we should never be
207 		 * here for optimized GET */
208 		LASSERT(msg->msg_type == LNET_MSG_REPLY);
209 		msg->msg_type = LNET_MSG_GET; /* fix type */
210 		break;
211 	}
212 
213 	counters->send_count++;
214  out:
215 	lnet_return_tx_credits_locked(msg);
216 	msg->msg_tx_committed = 0;
217 }
218 
219 static void
lnet_msg_decommit_rx(lnet_msg_t * msg,int status)220 lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
221 {
222 	lnet_counters_t	*counters;
223 	lnet_event_t	*ev = &msg->msg_ev;
224 
225 	LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
226 	LASSERT(msg->msg_rx_committed);
227 
228 	if (status != 0)
229 		goto out;
230 
231 	counters = the_lnet.ln_counters[msg->msg_rx_cpt];
232 	switch (ev->type) {
233 	default:
234 		LASSERT(ev->type == 0);
235 		LASSERT(msg->msg_routing);
236 		goto out;
237 
238 	case LNET_EVENT_ACK:
239 		LASSERT(msg->msg_type == LNET_MSG_ACK);
240 		break;
241 
242 	case LNET_EVENT_GET:
243 		/* type is "REPLY" if it's an optimized GET on passive side,
244 		 * because optimized GET will never be committed for sending,
245 		 * so message type wouldn't be changed back to "GET" by
246 		 * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
247 		LASSERT(msg->msg_type == LNET_MSG_REPLY ||
248 			msg->msg_type == LNET_MSG_GET);
249 		counters->send_length += msg->msg_wanted;
250 		break;
251 
252 	case LNET_EVENT_PUT:
253 		LASSERT(msg->msg_type == LNET_MSG_PUT);
254 		break;
255 
256 	case LNET_EVENT_REPLY:
257 		/* type is "GET" if it's an optimized GET on active side,
258 		 * see details in lnet_create_reply_msg() */
259 		LASSERT(msg->msg_type == LNET_MSG_GET ||
260 			msg->msg_type == LNET_MSG_REPLY);
261 		break;
262 	}
263 
264 	counters->recv_count++;
265 	if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
266 		counters->recv_length += msg->msg_wanted;
267 
268  out:
269 	lnet_return_rx_credits_locked(msg);
270 	msg->msg_rx_committed = 0;
271 }
272 
273 void
lnet_msg_decommit(lnet_msg_t * msg,int cpt,int status)274 lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
275 {
276 	int	cpt2 = cpt;
277 
278 	LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
279 	LASSERT(msg->msg_onactivelist);
280 
281 	if (msg->msg_tx_committed) { /* always decommit for sending first */
282 		LASSERT(cpt == msg->msg_tx_cpt);
283 		lnet_msg_decommit_tx(msg, status);
284 	}
285 
286 	if (msg->msg_rx_committed) {
287 		/* forwarding msg committed for both receiving and sending */
288 		if (cpt != msg->msg_rx_cpt) {
289 			lnet_net_unlock(cpt);
290 			cpt2 = msg->msg_rx_cpt;
291 			lnet_net_lock(cpt2);
292 		}
293 		lnet_msg_decommit_rx(msg, status);
294 	}
295 
296 	list_del(&msg->msg_activelist);
297 	msg->msg_onactivelist = 0;
298 
299 	the_lnet.ln_counters[cpt2]->msgs_alloc--;
300 
301 	if (cpt2 != cpt) {
302 		lnet_net_unlock(cpt2);
303 		lnet_net_lock(cpt);
304 	}
305 }
306 
307 void
lnet_msg_attach_md(lnet_msg_t * msg,lnet_libmd_t * md,unsigned int offset,unsigned int mlen)308 lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
309 		   unsigned int offset, unsigned int mlen)
310 {
311 	/* NB: @offset and @len are only useful for receiving */
312 	/* Here, we attach the MD on lnet_msg and mark it busy and
313 	 * decrementing its threshold. Come what may, the lnet_msg "owns"
314 	 * the MD until a call to lnet_msg_detach_md or lnet_finalize()
315 	 * signals completion. */
316 	LASSERT(!msg->msg_routing);
317 
318 	msg->msg_md = md;
319 	if (msg->msg_receiving) { /* committed for receiving */
320 		msg->msg_offset = offset;
321 		msg->msg_wanted = mlen;
322 	}
323 
324 	md->md_refcount++;
325 	if (md->md_threshold != LNET_MD_THRESH_INF) {
326 		LASSERT(md->md_threshold > 0);
327 		md->md_threshold--;
328 	}
329 
330 	/* build umd in event */
331 	lnet_md2handle(&msg->msg_ev.md_handle, md);
332 	lnet_md_deconstruct(md, &msg->msg_ev.md);
333 }
334 
335 void
lnet_msg_detach_md(lnet_msg_t * msg,int status)336 lnet_msg_detach_md(lnet_msg_t *msg, int status)
337 {
338 	lnet_libmd_t	*md = msg->msg_md;
339 	int		unlink;
340 
341 	/* Now it's safe to drop my caller's ref */
342 	md->md_refcount--;
343 	LASSERT(md->md_refcount >= 0);
344 
345 	unlink = lnet_md_unlinkable(md);
346 	if (md->md_eq != NULL) {
347 		msg->msg_ev.status   = status;
348 		msg->msg_ev.unlinked = unlink;
349 		lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
350 	}
351 
352 	if (unlink)
353 		lnet_md_unlink(md);
354 
355 	msg->msg_md = NULL;
356 }
357 
358 static int
lnet_complete_msg_locked(lnet_msg_t * msg,int cpt)359 lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
360 {
361 	lnet_handle_wire_t ack_wmd;
362 	int		rc;
363 	int		status = msg->msg_ev.status;
364 
365 	LASSERT(msg->msg_onactivelist);
366 
367 	if (status == 0 && msg->msg_ack) {
368 		/* Only send an ACK if the PUT completed successfully */
369 
370 		lnet_msg_decommit(msg, cpt, 0);
371 
372 		msg->msg_ack = 0;
373 		lnet_net_unlock(cpt);
374 
375 		LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
376 		LASSERT(!msg->msg_routing);
377 
378 		ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
379 
380 		lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
381 
382 		msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
383 		msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
384 		msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
385 
386 		/* NB: we probably want to use NID of msg::msg_from as 3rd
387 		 * parameter (router NID) if it's routed message */
388 		rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
389 
390 		lnet_net_lock(cpt);
391 		/*
392 		 * NB: message is committed for sending, we should return
393 		 * on success because LND will finalize this message later.
394 		 *
395 		 * Also, there is possibility that message is committed for
396 		 * sending and also failed before delivering to LND,
397 		 * i.e: ENOMEM, in that case we can't fall through either
398 		 * because CPT for sending can be different with CPT for
399 		 * receiving, so we should return back to lnet_finalize()
400 		 * to make sure we are locking the correct partition.
401 		 */
402 		return rc;
403 
404 	} else if (status == 0 &&	/* OK so far */
405 		   (msg->msg_routing && !msg->msg_sending)) {
406 		/* not forwarded */
407 		LASSERT(!msg->msg_receiving);	/* called back recv already */
408 		lnet_net_unlock(cpt);
409 
410 		rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
411 
412 		lnet_net_lock(cpt);
413 		/*
414 		 * NB: message is committed for sending, we should return
415 		 * on success because LND will finalize this message later.
416 		 *
417 		 * Also, there is possibility that message is committed for
418 		 * sending and also failed before delivering to LND,
419 		 * i.e: ENOMEM, in that case we can't fall through either:
420 		 * - The rule is message must decommit for sending first if
421 		 *   the it's committed for both sending and receiving
422 		 * - CPT for sending can be different with CPT for receiving,
423 		 *   so we should return back to lnet_finalize() to make
424 		 *   sure we are locking the correct partition.
425 		 */
426 		return rc;
427 	}
428 
429 	lnet_msg_decommit(msg, cpt, status);
430 	lnet_msg_free_locked(msg);
431 	return 0;
432 }
433 
434 void
lnet_finalize(lnet_ni_t * ni,lnet_msg_t * msg,int status)435 lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
436 {
437 	struct lnet_msg_container	*container;
438 	int				my_slot;
439 	int				cpt;
440 	int				rc;
441 	int				i;
442 
443 	LASSERT(!in_interrupt());
444 
445 	if (msg == NULL)
446 		return;
447 #if 0
448 	CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
449 	       lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
450 	       msg->msg_target_is_router ? "t" : "",
451 	       msg->msg_routing ? "X" : "",
452 	       msg->msg_ack ? "A" : "",
453 	       msg->msg_sending ? "S" : "",
454 	       msg->msg_receiving ? "R" : "",
455 	       msg->msg_delayed ? "d" : "",
456 	       msg->msg_txcredit ? "C" : "",
457 	       msg->msg_peertxcredit ? "c" : "",
458 	       msg->msg_rtrcredit ? "F" : "",
459 	       msg->msg_peerrtrcredit ? "f" : "",
460 	       msg->msg_onactivelist ? "!" : "",
461 	       msg->msg_txpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
462 	       msg->msg_rxpeer == NULL ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
463 #endif
464 	msg->msg_ev.status = status;
465 
466 	if (msg->msg_md != NULL) {
467 		cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
468 
469 		lnet_res_lock(cpt);
470 		lnet_msg_detach_md(msg, status);
471 		lnet_res_unlock(cpt);
472 	}
473 
474  again:
475 	rc = 0;
476 	if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
477 		/* not committed to network yet */
478 		LASSERT(!msg->msg_onactivelist);
479 		lnet_msg_free(msg);
480 		return;
481 	}
482 
483 	/*
484 	 * NB: routed message can be committed for both receiving and sending,
485 	 * we should finalize in LIFO order and keep counters correct.
486 	 * (finalize sending first then finalize receiving)
487 	 */
488 	cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
489 	lnet_net_lock(cpt);
490 
491 	container = the_lnet.ln_msg_containers[cpt];
492 	list_add_tail(&msg->msg_list, &container->msc_finalizing);
493 
494 	/* Recursion breaker.  Don't complete the message here if I am (or
495 	 * enough other threads are) already completing messages */
496 
497 	my_slot = -1;
498 	for (i = 0; i < container->msc_nfinalizers; i++) {
499 		if (container->msc_finalizers[i] == current)
500 			break;
501 
502 		if (my_slot < 0 && container->msc_finalizers[i] == NULL)
503 			my_slot = i;
504 	}
505 
506 	if (i < container->msc_nfinalizers || my_slot < 0) {
507 		lnet_net_unlock(cpt);
508 		return;
509 	}
510 
511 	container->msc_finalizers[my_slot] = current;
512 
513 	while (!list_empty(&container->msc_finalizing)) {
514 		msg = list_entry(container->msc_finalizing.next,
515 				     lnet_msg_t, msg_list);
516 
517 		list_del(&msg->msg_list);
518 
519 		/* NB drops and regains the lnet lock if it actually does
520 		 * anything, so my finalizing friends can chomp along too */
521 		rc = lnet_complete_msg_locked(msg, cpt);
522 		if (rc != 0)
523 			break;
524 	}
525 
526 	container->msc_finalizers[my_slot] = NULL;
527 	lnet_net_unlock(cpt);
528 
529 	if (rc != 0)
530 		goto again;
531 }
532 EXPORT_SYMBOL(lnet_finalize);
533 
534 void
lnet_msg_container_cleanup(struct lnet_msg_container * container)535 lnet_msg_container_cleanup(struct lnet_msg_container *container)
536 {
537 	int     count = 0;
538 
539 	if (container->msc_init == 0)
540 		return;
541 
542 	while (!list_empty(&container->msc_active)) {
543 		lnet_msg_t *msg = list_entry(container->msc_active.next,
544 						 lnet_msg_t, msg_activelist);
545 
546 		LASSERT(msg->msg_onactivelist);
547 		msg->msg_onactivelist = 0;
548 		list_del(&msg->msg_activelist);
549 		lnet_msg_free(msg);
550 		count++;
551 	}
552 
553 	if (count > 0)
554 		CERROR("%d active msg on exit\n", count);
555 
556 	if (container->msc_finalizers != NULL) {
557 		LIBCFS_FREE(container->msc_finalizers,
558 			    container->msc_nfinalizers *
559 			    sizeof(*container->msc_finalizers));
560 		container->msc_finalizers = NULL;
561 	}
562 #ifdef LNET_USE_LIB_FREELIST
563 	lnet_freelist_fini(&container->msc_freelist);
564 #endif
565 	container->msc_init = 0;
566 }
567 
568 int
lnet_msg_container_setup(struct lnet_msg_container * container,int cpt)569 lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
570 {
571 	int	rc;
572 
573 	container->msc_init = 1;
574 
575 	INIT_LIST_HEAD(&container->msc_active);
576 	INIT_LIST_HEAD(&container->msc_finalizing);
577 
578 #ifdef LNET_USE_LIB_FREELIST
579 	memset(&container->msc_freelist, 0, sizeof(lnet_freelist_t));
580 
581 	rc = lnet_freelist_init(&container->msc_freelist,
582 				LNET_FL_MAX_MSGS, sizeof(lnet_msg_t));
583 	if (rc != 0) {
584 		CERROR("Failed to init freelist for message container\n");
585 		lnet_msg_container_cleanup(container);
586 		return rc;
587 	}
588 #else
589 	rc = 0;
590 #endif
591 	/* number of CPUs */
592 	container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
593 
594 	LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
595 			 container->msc_nfinalizers *
596 			 sizeof(*container->msc_finalizers));
597 
598 	if (container->msc_finalizers == NULL) {
599 		CERROR("Failed to allocate message finalizers\n");
600 		lnet_msg_container_cleanup(container);
601 		return -ENOMEM;
602 	}
603 
604 	return rc;
605 }
606 
607 void
lnet_msg_containers_destroy(void)608 lnet_msg_containers_destroy(void)
609 {
610 	struct lnet_msg_container *container;
611 	int     i;
612 
613 	if (the_lnet.ln_msg_containers == NULL)
614 		return;
615 
616 	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
617 		lnet_msg_container_cleanup(container);
618 
619 	cfs_percpt_free(the_lnet.ln_msg_containers);
620 	the_lnet.ln_msg_containers = NULL;
621 }
622 
623 int
lnet_msg_containers_create(void)624 lnet_msg_containers_create(void)
625 {
626 	struct lnet_msg_container *container;
627 	int	rc;
628 	int	i;
629 
630 	the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
631 						      sizeof(*container));
632 
633 	if (the_lnet.ln_msg_containers == NULL) {
634 		CERROR("Failed to allocate cpu-partition data for network\n");
635 		return -ENOMEM;
636 	}
637 
638 	cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
639 		rc = lnet_msg_container_setup(container, i);
640 		if (rc != 0) {
641 			lnet_msg_containers_destroy();
642 			return rc;
643 		}
644 	}
645 
646 	return 0;
647 }
648