• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, Ericsson AB
5  * Copyright (c) 2004-2008, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36 
37 #include "core.h"
38 #include "config.h"
39 #include "dbg.h"
40 #include "port.h"
41 #include "addr.h"
42 #include "link.h"
43 #include "node.h"
44 #include "name_table.h"
45 #include "user_reg.h"
46 #include "msg.h"
47 #include "bcast.h"
48 
49 /* Connection management: */
50 #define PROBING_INTERVAL 3600000	/* [ms] => 1 h */
51 #define CONFIRMED 0
52 #define PROBING 1
53 
54 #define MAX_REJECT_SIZE 1024
55 
56 static struct sk_buff *msg_queue_head = NULL;
57 static struct sk_buff *msg_queue_tail = NULL;
58 
59 DEFINE_SPINLOCK(tipc_port_list_lock);
60 static DEFINE_SPINLOCK(queue_lock);
61 
62 static LIST_HEAD(ports);
63 static void port_handle_node_down(unsigned long ref);
64 static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
65 static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
66 static void port_timeout(unsigned long ref);
67 
68 
port_peernode(struct port * p_ptr)69 static u32 port_peernode(struct port *p_ptr)
70 {
71 	return msg_destnode(&p_ptr->publ.phdr);
72 }
73 
port_peerport(struct port * p_ptr)74 static u32 port_peerport(struct port *p_ptr)
75 {
76 	return msg_destport(&p_ptr->publ.phdr);
77 }
78 
port_out_seqno(struct port * p_ptr)79 static u32 port_out_seqno(struct port *p_ptr)
80 {
81 	return msg_transp_seqno(&p_ptr->publ.phdr);
82 }
83 
port_incr_out_seqno(struct port * p_ptr)84 static void port_incr_out_seqno(struct port *p_ptr)
85 {
86 	struct tipc_msg *m = &p_ptr->publ.phdr;
87 
88 	if (likely(!msg_routed(m)))
89 		return;
90 	msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
91 }
92 
93 /**
94  * tipc_multicast - send a multicast message to local and remote destinations
95  */
96 
tipc_multicast(u32 ref,struct tipc_name_seq const * seq,u32 domain,u32 num_sect,struct iovec const * msg_sect)97 int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
98 		   u32 num_sect, struct iovec const *msg_sect)
99 {
100 	struct tipc_msg *hdr;
101 	struct sk_buff *buf;
102 	struct sk_buff *ibuf = NULL;
103 	struct port_list dports = {0, NULL, };
104 	struct port *oport = tipc_port_deref(ref);
105 	int ext_targets;
106 	int res;
107 
108 	if (unlikely(!oport))
109 		return -EINVAL;
110 
111 	/* Create multicast message */
112 
113 	hdr = &oport->publ.phdr;
114 	msg_set_type(hdr, TIPC_MCAST_MSG);
115 	msg_set_nametype(hdr, seq->type);
116 	msg_set_namelower(hdr, seq->lower);
117 	msg_set_nameupper(hdr, seq->upper);
118 	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
119 	res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
120 			!oport->user_port, &buf);
121 	if (unlikely(!buf))
122 		return res;
123 
124 	/* Figure out where to send multicast message */
125 
126 	ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
127 						TIPC_NODE_SCOPE, &dports);
128 
129 	/* Send message to destinations (duplicate it only if necessary) */
130 
131 	if (ext_targets) {
132 		if (dports.count != 0) {
133 			ibuf = skb_copy(buf, GFP_ATOMIC);
134 			if (ibuf == NULL) {
135 				tipc_port_list_free(&dports);
136 				buf_discard(buf);
137 				return -ENOMEM;
138 			}
139 		}
140 		res = tipc_bclink_send_msg(buf);
141 		if ((res < 0) && (dports.count != 0)) {
142 			buf_discard(ibuf);
143 		}
144 	} else {
145 		ibuf = buf;
146 	}
147 
148 	if (res >= 0) {
149 		if (ibuf)
150 			tipc_port_recv_mcast(ibuf, &dports);
151 	} else {
152 		tipc_port_list_free(&dports);
153 	}
154 	return res;
155 }
156 
157 /**
158  * tipc_port_recv_mcast - deliver multicast message to all destination ports
159  *
160  * If there is no port list, perform a lookup to create one
161  */
162 
tipc_port_recv_mcast(struct sk_buff * buf,struct port_list * dp)163 void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
164 {
165 	struct tipc_msg* msg;
166 	struct port_list dports = {0, NULL, };
167 	struct port_list *item = dp;
168 	int cnt = 0;
169 
170 	msg = buf_msg(buf);
171 
172 	/* Create destination port list, if one wasn't supplied */
173 
174 	if (dp == NULL) {
175 		tipc_nametbl_mc_translate(msg_nametype(msg),
176 				     msg_namelower(msg),
177 				     msg_nameupper(msg),
178 				     TIPC_CLUSTER_SCOPE,
179 				     &dports);
180 		item = dp = &dports;
181 	}
182 
183 	/* Deliver a copy of message to each destination port */
184 
185 	if (dp->count != 0) {
186 		if (dp->count == 1) {
187 			msg_set_destport(msg, dp->ports[0]);
188 			tipc_port_recv_msg(buf);
189 			tipc_port_list_free(dp);
190 			return;
191 		}
192 		for (; cnt < dp->count; cnt++) {
193 			int index = cnt % PLSIZE;
194 			struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
195 
196 			if (b == NULL) {
197 				warn("Unable to deliver multicast message(s)\n");
198 				msg_dbg(msg, "LOST:");
199 				goto exit;
200 			}
201 			if ((index == 0) && (cnt != 0)) {
202 				item = item->next;
203 			}
204 			msg_set_destport(buf_msg(b),item->ports[index]);
205 			tipc_port_recv_msg(b);
206 		}
207 	}
208 exit:
209 	buf_discard(buf);
210 	tipc_port_list_free(dp);
211 }
212 
213 /**
214  * tipc_createport_raw - create a generic TIPC port
215  *
216  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
217  */
218 
tipc_createport_raw(void * usr_handle,u32 (* dispatcher)(struct tipc_port *,struct sk_buff *),void (* wakeup)(struct tipc_port *),const u32 importance)219 struct tipc_port *tipc_createport_raw(void *usr_handle,
220 			u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
221 			void (*wakeup)(struct tipc_port *),
222 			const u32 importance)
223 {
224 	struct port *p_ptr;
225 	struct tipc_msg *msg;
226 	u32 ref;
227 
228 	p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
229 	if (!p_ptr) {
230 		warn("Port creation failed, no memory\n");
231 		return NULL;
232 	}
233 	ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
234 	if (!ref) {
235 		warn("Port creation failed, reference table exhausted\n");
236 		kfree(p_ptr);
237 		return NULL;
238 	}
239 
240 	p_ptr->publ.usr_handle = usr_handle;
241 	p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
242 	p_ptr->publ.ref = ref;
243 	msg = &p_ptr->publ.phdr;
244 	msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
245 	msg_set_origport(msg, ref);
246 	p_ptr->last_in_seqno = 41;
247 	p_ptr->sent = 1;
248 	INIT_LIST_HEAD(&p_ptr->wait_list);
249 	INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
250 	p_ptr->congested_link = NULL;
251 	p_ptr->dispatcher = dispatcher;
252 	p_ptr->wakeup = wakeup;
253 	p_ptr->user_port = NULL;
254 	k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
255 	spin_lock_bh(&tipc_port_list_lock);
256 	INIT_LIST_HEAD(&p_ptr->publications);
257 	INIT_LIST_HEAD(&p_ptr->port_list);
258 	list_add_tail(&p_ptr->port_list, &ports);
259 	spin_unlock_bh(&tipc_port_list_lock);
260 	return &(p_ptr->publ);
261 }
262 
tipc_deleteport(u32 ref)263 int tipc_deleteport(u32 ref)
264 {
265 	struct port *p_ptr;
266 	struct sk_buff *buf = NULL;
267 
268 	tipc_withdraw(ref, 0, NULL);
269 	p_ptr = tipc_port_lock(ref);
270 	if (!p_ptr)
271 		return -EINVAL;
272 
273 	tipc_ref_discard(ref);
274 	tipc_port_unlock(p_ptr);
275 
276 	k_cancel_timer(&p_ptr->timer);
277 	if (p_ptr->publ.connected) {
278 		buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
279 		tipc_nodesub_unsubscribe(&p_ptr->subscription);
280 	}
281 	if (p_ptr->user_port) {
282 		tipc_reg_remove_port(p_ptr->user_port);
283 		kfree(p_ptr->user_port);
284 	}
285 
286 	spin_lock_bh(&tipc_port_list_lock);
287 	list_del(&p_ptr->port_list);
288 	list_del(&p_ptr->wait_list);
289 	spin_unlock_bh(&tipc_port_list_lock);
290 	k_term_timer(&p_ptr->timer);
291 	kfree(p_ptr);
292 	dbg("Deleted port %u\n", ref);
293 	tipc_net_route_msg(buf);
294 	return 0;
295 }
296 
297 /**
298  * tipc_get_port() - return port associated with 'ref'
299  *
300  * Note: Port is not locked.
301  */
302 
tipc_get_port(const u32 ref)303 struct tipc_port *tipc_get_port(const u32 ref)
304 {
305 	return (struct tipc_port *)tipc_ref_deref(ref);
306 }
307 
308 /**
309  * tipc_get_handle - return user handle associated to port 'ref'
310  */
311 
tipc_get_handle(const u32 ref)312 void *tipc_get_handle(const u32 ref)
313 {
314 	struct port *p_ptr;
315 	void * handle;
316 
317 	p_ptr = tipc_port_lock(ref);
318 	if (!p_ptr)
319 		return NULL;
320 	handle = p_ptr->publ.usr_handle;
321 	tipc_port_unlock(p_ptr);
322 	return handle;
323 }
324 
port_unreliable(struct port * p_ptr)325 static int port_unreliable(struct port *p_ptr)
326 {
327 	return msg_src_droppable(&p_ptr->publ.phdr);
328 }
329 
tipc_portunreliable(u32 ref,unsigned int * isunreliable)330 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
331 {
332 	struct port *p_ptr;
333 
334 	p_ptr = tipc_port_lock(ref);
335 	if (!p_ptr)
336 		return -EINVAL;
337 	*isunreliable = port_unreliable(p_ptr);
338 	tipc_port_unlock(p_ptr);
339 	return 0;
340 }
341 
tipc_set_portunreliable(u32 ref,unsigned int isunreliable)342 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
343 {
344 	struct port *p_ptr;
345 
346 	p_ptr = tipc_port_lock(ref);
347 	if (!p_ptr)
348 		return -EINVAL;
349 	msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
350 	tipc_port_unlock(p_ptr);
351 	return 0;
352 }
353 
port_unreturnable(struct port * p_ptr)354 static int port_unreturnable(struct port *p_ptr)
355 {
356 	return msg_dest_droppable(&p_ptr->publ.phdr);
357 }
358 
tipc_portunreturnable(u32 ref,unsigned int * isunrejectable)359 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
360 {
361 	struct port *p_ptr;
362 
363 	p_ptr = tipc_port_lock(ref);
364 	if (!p_ptr)
365 		return -EINVAL;
366 	*isunrejectable = port_unreturnable(p_ptr);
367 	tipc_port_unlock(p_ptr);
368 	return 0;
369 }
370 
tipc_set_portunreturnable(u32 ref,unsigned int isunrejectable)371 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
372 {
373 	struct port *p_ptr;
374 
375 	p_ptr = tipc_port_lock(ref);
376 	if (!p_ptr)
377 		return -EINVAL;
378 	msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
379 	tipc_port_unlock(p_ptr);
380 	return 0;
381 }
382 
383 /*
384  * port_build_proto_msg(): build a port level protocol
385  * or a connection abortion message. Called with
386  * tipc_port lock on.
387  */
port_build_proto_msg(u32 destport,u32 destnode,u32 origport,u32 orignode,u32 usr,u32 type,u32 err,u32 seqno,u32 ack)388 static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
389 					    u32 origport, u32 orignode,
390 					    u32 usr, u32 type, u32 err,
391 					    u32 seqno, u32 ack)
392 {
393 	struct sk_buff *buf;
394 	struct tipc_msg *msg;
395 
396 	buf = buf_acquire(LONG_H_SIZE);
397 	if (buf) {
398 		msg = buf_msg(buf);
399 		msg_init(msg, usr, type, LONG_H_SIZE, destnode);
400 		msg_set_errcode(msg, err);
401 		msg_set_destport(msg, destport);
402 		msg_set_origport(msg, origport);
403 		msg_set_orignode(msg, orignode);
404 		msg_set_transp_seqno(msg, seqno);
405 		msg_set_msgcnt(msg, ack);
406 		msg_dbg(msg, "PORT>SEND>:");
407 	}
408 	return buf;
409 }
410 
tipc_reject_msg(struct sk_buff * buf,u32 err)411 int tipc_reject_msg(struct sk_buff *buf, u32 err)
412 {
413 	struct tipc_msg *msg = buf_msg(buf);
414 	struct sk_buff *rbuf;
415 	struct tipc_msg *rmsg;
416 	int hdr_sz;
417 	u32 imp = msg_importance(msg);
418 	u32 data_sz = msg_data_sz(msg);
419 
420 	if (data_sz > MAX_REJECT_SIZE)
421 		data_sz = MAX_REJECT_SIZE;
422 	if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
423 		imp++;
424 	msg_dbg(msg, "port->rej: ");
425 
426 	/* discard rejected message if it shouldn't be returned to sender */
427 	if (msg_errcode(msg) || msg_dest_droppable(msg)) {
428 		buf_discard(buf);
429 		return data_sz;
430 	}
431 
432 	/* construct rejected message */
433 	if (msg_mcast(msg))
434 		hdr_sz = MCAST_H_SIZE;
435 	else
436 		hdr_sz = LONG_H_SIZE;
437 	rbuf = buf_acquire(data_sz + hdr_sz);
438 	if (rbuf == NULL) {
439 		buf_discard(buf);
440 		return data_sz;
441 	}
442 	rmsg = buf_msg(rbuf);
443 	msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg));
444 	msg_set_errcode(rmsg, err);
445 	msg_set_destport(rmsg, msg_origport(msg));
446 	msg_set_origport(rmsg, msg_destport(msg));
447 	if (msg_short(msg)) {
448 		msg_set_orignode(rmsg, tipc_own_addr);
449 		/* leave name type & instance as zeroes */
450 	} else {
451 		msg_set_orignode(rmsg, msg_destnode(msg));
452 		msg_set_nametype(rmsg, msg_nametype(msg));
453 		msg_set_nameinst(rmsg, msg_nameinst(msg));
454 	}
455 	msg_set_size(rmsg, data_sz + hdr_sz);
456 	skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);
457 
458 	/* send self-abort message when rejecting on a connected port */
459 	if (msg_connected(msg)) {
460 		struct sk_buff *abuf = NULL;
461 		struct port *p_ptr = tipc_port_lock(msg_destport(msg));
462 
463 		if (p_ptr) {
464 			if (p_ptr->publ.connected)
465 				abuf = port_build_self_abort_msg(p_ptr, err);
466 			tipc_port_unlock(p_ptr);
467 		}
468 		tipc_net_route_msg(abuf);
469 	}
470 
471 	/* send rejected message */
472 	buf_discard(buf);
473 	tipc_net_route_msg(rbuf);
474 	return data_sz;
475 }
476 
tipc_port_reject_sections(struct port * p_ptr,struct tipc_msg * hdr,struct iovec const * msg_sect,u32 num_sect,int err)477 int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
478 			      struct iovec const *msg_sect, u32 num_sect,
479 			      int err)
480 {
481 	struct sk_buff *buf;
482 	int res;
483 
484 	res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
485 			!p_ptr->user_port, &buf);
486 	if (!buf)
487 		return res;
488 
489 	return tipc_reject_msg(buf, err);
490 }
491 
port_timeout(unsigned long ref)492 static void port_timeout(unsigned long ref)
493 {
494 	struct port *p_ptr = tipc_port_lock(ref);
495 	struct sk_buff *buf = NULL;
496 
497 	if (!p_ptr)
498 		return;
499 
500 	if (!p_ptr->publ.connected) {
501 		tipc_port_unlock(p_ptr);
502 		return;
503 	}
504 
505 	/* Last probe answered ? */
506 	if (p_ptr->probing_state == PROBING) {
507 		buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
508 	} else {
509 		buf = port_build_proto_msg(port_peerport(p_ptr),
510 					   port_peernode(p_ptr),
511 					   p_ptr->publ.ref,
512 					   tipc_own_addr,
513 					   CONN_MANAGER,
514 					   CONN_PROBE,
515 					   TIPC_OK,
516 					   port_out_seqno(p_ptr),
517 					   0);
518 		port_incr_out_seqno(p_ptr);
519 		p_ptr->probing_state = PROBING;
520 		k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
521 	}
522 	tipc_port_unlock(p_ptr);
523 	tipc_net_route_msg(buf);
524 }
525 
526 
port_handle_node_down(unsigned long ref)527 static void port_handle_node_down(unsigned long ref)
528 {
529 	struct port *p_ptr = tipc_port_lock(ref);
530 	struct sk_buff* buf = NULL;
531 
532 	if (!p_ptr)
533 		return;
534 	buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
535 	tipc_port_unlock(p_ptr);
536 	tipc_net_route_msg(buf);
537 }
538 
539 
port_build_self_abort_msg(struct port * p_ptr,u32 err)540 static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
541 {
542 	u32 imp = msg_importance(&p_ptr->publ.phdr);
543 
544 	if (!p_ptr->publ.connected)
545 		return NULL;
546 	if (imp < TIPC_CRITICAL_IMPORTANCE)
547 		imp++;
548 	return port_build_proto_msg(p_ptr->publ.ref,
549 				    tipc_own_addr,
550 				    port_peerport(p_ptr),
551 				    port_peernode(p_ptr),
552 				    imp,
553 				    TIPC_CONN_MSG,
554 				    err,
555 				    p_ptr->last_in_seqno + 1,
556 				    0);
557 }
558 
559 
port_build_peer_abort_msg(struct port * p_ptr,u32 err)560 static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
561 {
562 	u32 imp = msg_importance(&p_ptr->publ.phdr);
563 
564 	if (!p_ptr->publ.connected)
565 		return NULL;
566 	if (imp < TIPC_CRITICAL_IMPORTANCE)
567 		imp++;
568 	return port_build_proto_msg(port_peerport(p_ptr),
569 				    port_peernode(p_ptr),
570 				    p_ptr->publ.ref,
571 				    tipc_own_addr,
572 				    imp,
573 				    TIPC_CONN_MSG,
574 				    err,
575 				    port_out_seqno(p_ptr),
576 				    0);
577 }
578 
tipc_port_recv_proto_msg(struct sk_buff * buf)579 void tipc_port_recv_proto_msg(struct sk_buff *buf)
580 {
581 	struct tipc_msg *msg = buf_msg(buf);
582 	struct port *p_ptr = tipc_port_lock(msg_destport(msg));
583 	u32 err = TIPC_OK;
584 	struct sk_buff *r_buf = NULL;
585 	struct sk_buff *abort_buf = NULL;
586 
587 	msg_dbg(msg, "PORT<RECV<:");
588 
589 	if (!p_ptr) {
590 		err = TIPC_ERR_NO_PORT;
591 	} else if (p_ptr->publ.connected) {
592 		if (port_peernode(p_ptr) != msg_orignode(msg))
593 			err = TIPC_ERR_NO_PORT;
594 		if (port_peerport(p_ptr) != msg_origport(msg))
595 			err = TIPC_ERR_NO_PORT;
596 		if (!err && msg_routed(msg)) {
597 			u32 seqno = msg_transp_seqno(msg);
598 			u32 myno =  ++p_ptr->last_in_seqno;
599 			if (seqno != myno) {
600 				err = TIPC_ERR_NO_PORT;
601 				abort_buf = port_build_self_abort_msg(p_ptr, err);
602 			}
603 		}
604 		if (msg_type(msg) == CONN_ACK) {
605 			int wakeup = tipc_port_congested(p_ptr) &&
606 				     p_ptr->publ.congested &&
607 				     p_ptr->wakeup;
608 			p_ptr->acked += msg_msgcnt(msg);
609 			if (tipc_port_congested(p_ptr))
610 				goto exit;
611 			p_ptr->publ.congested = 0;
612 			if (!wakeup)
613 				goto exit;
614 			p_ptr->wakeup(&p_ptr->publ);
615 			goto exit;
616 		}
617 	} else if (p_ptr->publ.published) {
618 		err = TIPC_ERR_NO_PORT;
619 	}
620 	if (err) {
621 		r_buf = port_build_proto_msg(msg_origport(msg),
622 					     msg_orignode(msg),
623 					     msg_destport(msg),
624 					     tipc_own_addr,
625 					     TIPC_HIGH_IMPORTANCE,
626 					     TIPC_CONN_MSG,
627 					     err,
628 					     0,
629 					     0);
630 		goto exit;
631 	}
632 
633 	/* All is fine */
634 	if (msg_type(msg) == CONN_PROBE) {
635 		r_buf = port_build_proto_msg(msg_origport(msg),
636 					     msg_orignode(msg),
637 					     msg_destport(msg),
638 					     tipc_own_addr,
639 					     CONN_MANAGER,
640 					     CONN_PROBE_REPLY,
641 					     TIPC_OK,
642 					     port_out_seqno(p_ptr),
643 					     0);
644 	}
645 	p_ptr->probing_state = CONFIRMED;
646 	port_incr_out_seqno(p_ptr);
647 exit:
648 	if (p_ptr)
649 		tipc_port_unlock(p_ptr);
650 	tipc_net_route_msg(r_buf);
651 	tipc_net_route_msg(abort_buf);
652 	buf_discard(buf);
653 }
654 
port_print(struct port * p_ptr,struct print_buf * buf,int full_id)655 static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
656 {
657 	struct publication *publ;
658 
659 	if (full_id)
660 		tipc_printf(buf, "<%u.%u.%u:%u>:",
661 			    tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
662 			    tipc_node(tipc_own_addr), p_ptr->publ.ref);
663 	else
664 		tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
665 
666 	if (p_ptr->publ.connected) {
667 		u32 dport = port_peerport(p_ptr);
668 		u32 destnode = port_peernode(p_ptr);
669 
670 		tipc_printf(buf, " connected to <%u.%u.%u:%u>",
671 			    tipc_zone(destnode), tipc_cluster(destnode),
672 			    tipc_node(destnode), dport);
673 		if (p_ptr->publ.conn_type != 0)
674 			tipc_printf(buf, " via {%u,%u}",
675 				    p_ptr->publ.conn_type,
676 				    p_ptr->publ.conn_instance);
677 	}
678 	else if (p_ptr->publ.published) {
679 		tipc_printf(buf, " bound to");
680 		list_for_each_entry(publ, &p_ptr->publications, pport_list) {
681 			if (publ->lower == publ->upper)
682 				tipc_printf(buf, " {%u,%u}", publ->type,
683 					    publ->lower);
684 			else
685 				tipc_printf(buf, " {%u,%u,%u}", publ->type,
686 					    publ->lower, publ->upper);
687 		}
688 	}
689 	tipc_printf(buf, "\n");
690 }
691 
692 #define MAX_PORT_QUERY 32768
693 
tipc_port_get_ports(void)694 struct sk_buff *tipc_port_get_ports(void)
695 {
696 	struct sk_buff *buf;
697 	struct tlv_desc *rep_tlv;
698 	struct print_buf pb;
699 	struct port *p_ptr;
700 	int str_len;
701 
702 	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
703 	if (!buf)
704 		return NULL;
705 	rep_tlv = (struct tlv_desc *)buf->data;
706 
707 	tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
708 	spin_lock_bh(&tipc_port_list_lock);
709 	list_for_each_entry(p_ptr, &ports, port_list) {
710 		spin_lock_bh(p_ptr->publ.lock);
711 		port_print(p_ptr, &pb, 0);
712 		spin_unlock_bh(p_ptr->publ.lock);
713 	}
714 	spin_unlock_bh(&tipc_port_list_lock);
715 	str_len = tipc_printbuf_validate(&pb);
716 
717 	skb_put(buf, TLV_SPACE(str_len));
718 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
719 
720 	return buf;
721 }
722 
723 #if 0
724 
725 #define MAX_PORT_STATS 2000
726 
727 struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
728 {
729 	u32 ref;
730 	struct port *p_ptr;
731 	struct sk_buff *buf;
732 	struct tlv_desc *rep_tlv;
733 	struct print_buf pb;
734 	int str_len;
735 
736 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
737 		return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
738 
739 	ref = *(u32 *)TLV_DATA(req_tlv_area);
740 	ref = ntohl(ref);
741 
742 	p_ptr = tipc_port_lock(ref);
743 	if (!p_ptr)
744 		return cfg_reply_error_string("port not found");
745 
746 	buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
747 	if (!buf) {
748 		tipc_port_unlock(p_ptr);
749 		return NULL;
750 	}
751 	rep_tlv = (struct tlv_desc *)buf->data;
752 
753 	tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
754 	port_print(p_ptr, &pb, 1);
755 	/* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
756 	tipc_port_unlock(p_ptr);
757 	str_len = tipc_printbuf_validate(&pb);
758 
759 	skb_put(buf, TLV_SPACE(str_len));
760 	TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
761 
762 	return buf;
763 }
764 
765 #endif
766 
tipc_port_reinit(void)767 void tipc_port_reinit(void)
768 {
769 	struct port *p_ptr;
770 	struct tipc_msg *msg;
771 
772 	spin_lock_bh(&tipc_port_list_lock);
773 	list_for_each_entry(p_ptr, &ports, port_list) {
774 		msg = &p_ptr->publ.phdr;
775 		if (msg_orignode(msg) == tipc_own_addr)
776 			break;
777 		msg_set_prevnode(msg, tipc_own_addr);
778 		msg_set_orignode(msg, tipc_own_addr);
779 	}
780 	spin_unlock_bh(&tipc_port_list_lock);
781 }
782 
783 
784 /*
785  *  port_dispatcher_sigh(): Signal handler for messages destinated
786  *                          to the tipc_port interface.
787  */
788 
port_dispatcher_sigh(void * dummy)789 static void port_dispatcher_sigh(void *dummy)
790 {
791 	struct sk_buff *buf;
792 
793 	spin_lock_bh(&queue_lock);
794 	buf = msg_queue_head;
795 	msg_queue_head = NULL;
796 	spin_unlock_bh(&queue_lock);
797 
798 	while (buf) {
799 		struct port *p_ptr;
800 		struct user_port *up_ptr;
801 		struct tipc_portid orig;
802 		struct tipc_name_seq dseq;
803 		void *usr_handle;
804 		int connected;
805 		int published;
806 		u32 message_type;
807 
808 		struct sk_buff *next = buf->next;
809 		struct tipc_msg *msg = buf_msg(buf);
810 		u32 dref = msg_destport(msg);
811 
812 		message_type = msg_type(msg);
813 		if (message_type > TIPC_DIRECT_MSG)
814 			goto reject;	/* Unsupported message type */
815 
816 		p_ptr = tipc_port_lock(dref);
817 		if (!p_ptr)
818 			goto reject;	/* Port deleted while msg in queue */
819 
820 		orig.ref = msg_origport(msg);
821 		orig.node = msg_orignode(msg);
822 		up_ptr = p_ptr->user_port;
823 		usr_handle = up_ptr->usr_handle;
824 		connected = p_ptr->publ.connected;
825 		published = p_ptr->publ.published;
826 
827 		if (unlikely(msg_errcode(msg)))
828 			goto err;
829 
830 		switch (message_type) {
831 
832 		case TIPC_CONN_MSG:{
833 				tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
834 				u32 peer_port = port_peerport(p_ptr);
835 				u32 peer_node = port_peernode(p_ptr);
836 
837 				tipc_port_unlock(p_ptr);
838 				if (unlikely(!cb))
839 					goto reject;
840 				if (unlikely(!connected)) {
841 					if (tipc_connect2port(dref, &orig))
842 						goto reject;
843 				} else if ((msg_origport(msg) != peer_port) ||
844 					   (msg_orignode(msg) != peer_node))
845 					goto reject;
846 				if (unlikely(++p_ptr->publ.conn_unacked >=
847 					     TIPC_FLOW_CONTROL_WIN))
848 					tipc_acknowledge(dref,
849 							 p_ptr->publ.conn_unacked);
850 				skb_pull(buf, msg_hdr_sz(msg));
851 				cb(usr_handle, dref, &buf, msg_data(msg),
852 				   msg_data_sz(msg));
853 				break;
854 			}
855 		case TIPC_DIRECT_MSG:{
856 				tipc_msg_event cb = up_ptr->msg_cb;
857 
858 				tipc_port_unlock(p_ptr);
859 				if (unlikely(!cb || connected))
860 					goto reject;
861 				skb_pull(buf, msg_hdr_sz(msg));
862 				cb(usr_handle, dref, &buf, msg_data(msg),
863 				   msg_data_sz(msg), msg_importance(msg),
864 				   &orig);
865 				break;
866 			}
867 		case TIPC_MCAST_MSG:
868 		case TIPC_NAMED_MSG:{
869 				tipc_named_msg_event cb = up_ptr->named_msg_cb;
870 
871 				tipc_port_unlock(p_ptr);
872 				if (unlikely(!cb || connected || !published))
873 					goto reject;
874 				dseq.type =  msg_nametype(msg);
875 				dseq.lower = msg_nameinst(msg);
876 				dseq.upper = (message_type == TIPC_NAMED_MSG)
877 					? dseq.lower : msg_nameupper(msg);
878 				skb_pull(buf, msg_hdr_sz(msg));
879 				cb(usr_handle, dref, &buf, msg_data(msg),
880 				   msg_data_sz(msg), msg_importance(msg),
881 				   &orig, &dseq);
882 				break;
883 			}
884 		}
885 		if (buf)
886 			buf_discard(buf);
887 		buf = next;
888 		continue;
889 err:
890 		switch (message_type) {
891 
892 		case TIPC_CONN_MSG:{
893 				tipc_conn_shutdown_event cb =
894 					up_ptr->conn_err_cb;
895 				u32 peer_port = port_peerport(p_ptr);
896 				u32 peer_node = port_peernode(p_ptr);
897 
898 				tipc_port_unlock(p_ptr);
899 				if (!cb || !connected)
900 					break;
901 				if ((msg_origport(msg) != peer_port) ||
902 				    (msg_orignode(msg) != peer_node))
903 					break;
904 				tipc_disconnect(dref);
905 				skb_pull(buf, msg_hdr_sz(msg));
906 				cb(usr_handle, dref, &buf, msg_data(msg),
907 				   msg_data_sz(msg), msg_errcode(msg));
908 				break;
909 			}
910 		case TIPC_DIRECT_MSG:{
911 				tipc_msg_err_event cb = up_ptr->err_cb;
912 
913 				tipc_port_unlock(p_ptr);
914 				if (!cb || connected)
915 					break;
916 				skb_pull(buf, msg_hdr_sz(msg));
917 				cb(usr_handle, dref, &buf, msg_data(msg),
918 				   msg_data_sz(msg), msg_errcode(msg), &orig);
919 				break;
920 			}
921 		case TIPC_MCAST_MSG:
922 		case TIPC_NAMED_MSG:{
923 				tipc_named_msg_err_event cb =
924 					up_ptr->named_err_cb;
925 
926 				tipc_port_unlock(p_ptr);
927 				if (!cb || connected)
928 					break;
929 				dseq.type =  msg_nametype(msg);
930 				dseq.lower = msg_nameinst(msg);
931 				dseq.upper = (message_type == TIPC_NAMED_MSG)
932 					? dseq.lower : msg_nameupper(msg);
933 				skb_pull(buf, msg_hdr_sz(msg));
934 				cb(usr_handle, dref, &buf, msg_data(msg),
935 				   msg_data_sz(msg), msg_errcode(msg), &dseq);
936 				break;
937 			}
938 		}
939 		if (buf)
940 			buf_discard(buf);
941 		buf = next;
942 		continue;
943 reject:
944 		tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
945 		buf = next;
946 	}
947 }
948 
949 /*
950  *  port_dispatcher(): Dispatcher for messages destinated
951  *  to the tipc_port interface. Called with port locked.
952  */
953 
port_dispatcher(struct tipc_port * dummy,struct sk_buff * buf)954 static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
955 {
956 	buf->next = NULL;
957 	spin_lock_bh(&queue_lock);
958 	if (msg_queue_head) {
959 		msg_queue_tail->next = buf;
960 		msg_queue_tail = buf;
961 	} else {
962 		msg_queue_tail = msg_queue_head = buf;
963 		tipc_k_signal((Handler)port_dispatcher_sigh, 0);
964 	}
965 	spin_unlock_bh(&queue_lock);
966 	return 0;
967 }
968 
969 /*
970  * Wake up port after congestion: Called with port locked,
971  *
972  */
973 
port_wakeup_sh(unsigned long ref)974 static void port_wakeup_sh(unsigned long ref)
975 {
976 	struct port *p_ptr;
977 	struct user_port *up_ptr;
978 	tipc_continue_event cb = NULL;
979 	void *uh = NULL;
980 
981 	p_ptr = tipc_port_lock(ref);
982 	if (p_ptr) {
983 		up_ptr = p_ptr->user_port;
984 		if (up_ptr) {
985 			cb = up_ptr->continue_event_cb;
986 			uh = up_ptr->usr_handle;
987 		}
988 		tipc_port_unlock(p_ptr);
989 	}
990 	if (cb)
991 		cb(uh, ref);
992 }
993 
994 
port_wakeup(struct tipc_port * p_ptr)995 static void port_wakeup(struct tipc_port *p_ptr)
996 {
997 	tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
998 }
999 
tipc_acknowledge(u32 ref,u32 ack)1000 void tipc_acknowledge(u32 ref, u32 ack)
1001 {
1002 	struct port *p_ptr;
1003 	struct sk_buff *buf = NULL;
1004 
1005 	p_ptr = tipc_port_lock(ref);
1006 	if (!p_ptr)
1007 		return;
1008 	if (p_ptr->publ.connected) {
1009 		p_ptr->publ.conn_unacked -= ack;
1010 		buf = port_build_proto_msg(port_peerport(p_ptr),
1011 					   port_peernode(p_ptr),
1012 					   ref,
1013 					   tipc_own_addr,
1014 					   CONN_MANAGER,
1015 					   CONN_ACK,
1016 					   TIPC_OK,
1017 					   port_out_seqno(p_ptr),
1018 					   ack);
1019 	}
1020 	tipc_port_unlock(p_ptr);
1021 	tipc_net_route_msg(buf);
1022 }
1023 
1024 /*
1025  * tipc_createport(): user level call. Will add port to
1026  *                    registry if non-zero user_ref.
1027  */
1028 
tipc_createport(u32 user_ref,void * usr_handle,unsigned int importance,tipc_msg_err_event error_cb,tipc_named_msg_err_event named_error_cb,tipc_conn_shutdown_event conn_error_cb,tipc_msg_event msg_cb,tipc_named_msg_event named_msg_cb,tipc_conn_msg_event conn_msg_cb,tipc_continue_event continue_event_cb,u32 * portref)1029 int tipc_createport(u32 user_ref,
1030 		    void *usr_handle,
1031 		    unsigned int importance,
1032 		    tipc_msg_err_event error_cb,
1033 		    tipc_named_msg_err_event named_error_cb,
1034 		    tipc_conn_shutdown_event conn_error_cb,
1035 		    tipc_msg_event msg_cb,
1036 		    tipc_named_msg_event named_msg_cb,
1037 		    tipc_conn_msg_event conn_msg_cb,
1038 		    tipc_continue_event continue_event_cb,/* May be zero */
1039 		    u32 *portref)
1040 {
1041 	struct user_port *up_ptr;
1042 	struct port *p_ptr;
1043 
1044 	up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1045 	if (!up_ptr) {
1046 		warn("Port creation failed, no memory\n");
1047 		return -ENOMEM;
1048 	}
1049 	p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher,
1050 						   port_wakeup, importance);
1051 	if (!p_ptr) {
1052 		kfree(up_ptr);
1053 		return -ENOMEM;
1054 	}
1055 
1056 	p_ptr->user_port = up_ptr;
1057 	up_ptr->user_ref = user_ref;
1058 	up_ptr->usr_handle = usr_handle;
1059 	up_ptr->ref = p_ptr->publ.ref;
1060 	up_ptr->err_cb = error_cb;
1061 	up_ptr->named_err_cb = named_error_cb;
1062 	up_ptr->conn_err_cb = conn_error_cb;
1063 	up_ptr->msg_cb = msg_cb;
1064 	up_ptr->named_msg_cb = named_msg_cb;
1065 	up_ptr->conn_msg_cb = conn_msg_cb;
1066 	up_ptr->continue_event_cb = continue_event_cb;
1067 	INIT_LIST_HEAD(&up_ptr->uport_list);
1068 	tipc_reg_add_port(up_ptr);
1069 	*portref = p_ptr->publ.ref;
1070 	tipc_port_unlock(p_ptr);
1071 	return 0;
1072 }
1073 
tipc_ownidentity(u32 ref,struct tipc_portid * id)1074 int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1075 {
1076 	id->ref = ref;
1077 	id->node = tipc_own_addr;
1078 	return 0;
1079 }
1080 
tipc_portimportance(u32 ref,unsigned int * importance)1081 int tipc_portimportance(u32 ref, unsigned int *importance)
1082 {
1083 	struct port *p_ptr;
1084 
1085 	p_ptr = tipc_port_lock(ref);
1086 	if (!p_ptr)
1087 		return -EINVAL;
1088 	*importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1089 	tipc_port_unlock(p_ptr);
1090 	return 0;
1091 }
1092 
tipc_set_portimportance(u32 ref,unsigned int imp)1093 int tipc_set_portimportance(u32 ref, unsigned int imp)
1094 {
1095 	struct port *p_ptr;
1096 
1097 	if (imp > TIPC_CRITICAL_IMPORTANCE)
1098 		return -EINVAL;
1099 
1100 	p_ptr = tipc_port_lock(ref);
1101 	if (!p_ptr)
1102 		return -EINVAL;
1103 	msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1104 	tipc_port_unlock(p_ptr);
1105 	return 0;
1106 }
1107 
1108 
tipc_publish(u32 ref,unsigned int scope,struct tipc_name_seq const * seq)1109 int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1110 {
1111 	struct port *p_ptr;
1112 	struct publication *publ;
1113 	u32 key;
1114 	int res = -EINVAL;
1115 
1116 	p_ptr = tipc_port_lock(ref);
1117 	if (!p_ptr)
1118 		return -EINVAL;
1119 
1120 	dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1121 	    "lower = %u, upper = %u\n",
1122 	    ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1123 	if (p_ptr->publ.connected)
1124 		goto exit;
1125 	if (seq->lower > seq->upper)
1126 		goto exit;
1127 	if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1128 		goto exit;
1129 	key = ref + p_ptr->pub_count + 1;
1130 	if (key == ref) {
1131 		res = -EADDRINUSE;
1132 		goto exit;
1133 	}
1134 	publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1135 				    scope, p_ptr->publ.ref, key);
1136 	if (publ) {
1137 		list_add(&publ->pport_list, &p_ptr->publications);
1138 		p_ptr->pub_count++;
1139 		p_ptr->publ.published = 1;
1140 		res = 0;
1141 	}
1142 exit:
1143 	tipc_port_unlock(p_ptr);
1144 	return res;
1145 }
1146 
tipc_withdraw(u32 ref,unsigned int scope,struct tipc_name_seq const * seq)1147 int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1148 {
1149 	struct port *p_ptr;
1150 	struct publication *publ;
1151 	struct publication *tpubl;
1152 	int res = -EINVAL;
1153 
1154 	p_ptr = tipc_port_lock(ref);
1155 	if (!p_ptr)
1156 		return -EINVAL;
1157 	if (!seq) {
1158 		list_for_each_entry_safe(publ, tpubl,
1159 					 &p_ptr->publications, pport_list) {
1160 			tipc_nametbl_withdraw(publ->type, publ->lower,
1161 					      publ->ref, publ->key);
1162 		}
1163 		res = 0;
1164 	} else {
1165 		list_for_each_entry_safe(publ, tpubl,
1166 					 &p_ptr->publications, pport_list) {
1167 			if (publ->scope != scope)
1168 				continue;
1169 			if (publ->type != seq->type)
1170 				continue;
1171 			if (publ->lower != seq->lower)
1172 				continue;
1173 			if (publ->upper != seq->upper)
1174 				break;
1175 			tipc_nametbl_withdraw(publ->type, publ->lower,
1176 					      publ->ref, publ->key);
1177 			res = 0;
1178 			break;
1179 		}
1180 	}
1181 	if (list_empty(&p_ptr->publications))
1182 		p_ptr->publ.published = 0;
1183 	tipc_port_unlock(p_ptr);
1184 	return res;
1185 }
1186 
tipc_connect2port(u32 ref,struct tipc_portid const * peer)1187 int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1188 {
1189 	struct port *p_ptr;
1190 	struct tipc_msg *msg;
1191 	int res = -EINVAL;
1192 
1193 	p_ptr = tipc_port_lock(ref);
1194 	if (!p_ptr)
1195 		return -EINVAL;
1196 	if (p_ptr->publ.published || p_ptr->publ.connected)
1197 		goto exit;
1198 	if (!peer->ref)
1199 		goto exit;
1200 
1201 	msg = &p_ptr->publ.phdr;
1202 	msg_set_destnode(msg, peer->node);
1203 	msg_set_destport(msg, peer->ref);
1204 	msg_set_orignode(msg, tipc_own_addr);
1205 	msg_set_origport(msg, p_ptr->publ.ref);
1206 	msg_set_transp_seqno(msg, 42);
1207 	msg_set_type(msg, TIPC_CONN_MSG);
1208 	if (!may_route(peer->node))
1209 		msg_set_hdr_sz(msg, SHORT_H_SIZE);
1210 	else
1211 		msg_set_hdr_sz(msg, LONG_H_SIZE);
1212 
1213 	p_ptr->probing_interval = PROBING_INTERVAL;
1214 	p_ptr->probing_state = CONFIRMED;
1215 	p_ptr->publ.connected = 1;
1216 	k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1217 
1218 	tipc_nodesub_subscribe(&p_ptr->subscription,peer->node,
1219 			  (void *)(unsigned long)ref,
1220 			  (net_ev_handler)port_handle_node_down);
1221 	res = 0;
1222 exit:
1223 	tipc_port_unlock(p_ptr);
1224 	p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1225 	return res;
1226 }
1227 
1228 /**
1229  * tipc_disconnect_port - disconnect port from peer
1230  *
1231  * Port must be locked.
1232  */
1233 
tipc_disconnect_port(struct tipc_port * tp_ptr)1234 int tipc_disconnect_port(struct tipc_port *tp_ptr)
1235 {
1236 	int res;
1237 
1238 	if (tp_ptr->connected) {
1239 		tp_ptr->connected = 0;
1240 		/* let timer expire on it's own to avoid deadlock! */
1241 		tipc_nodesub_unsubscribe(
1242 			&((struct port *)tp_ptr)->subscription);
1243 		res = 0;
1244 	} else {
1245 		res = -ENOTCONN;
1246 	}
1247 	return res;
1248 }
1249 
1250 /*
1251  * tipc_disconnect(): Disconnect port form peer.
1252  *                    This is a node local operation.
1253  */
1254 
tipc_disconnect(u32 ref)1255 int tipc_disconnect(u32 ref)
1256 {
1257 	struct port *p_ptr;
1258 	int res;
1259 
1260 	p_ptr = tipc_port_lock(ref);
1261 	if (!p_ptr)
1262 		return -EINVAL;
1263 	res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1264 	tipc_port_unlock(p_ptr);
1265 	return res;
1266 }
1267 
1268 /*
1269  * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1270  */
tipc_shutdown(u32 ref)1271 int tipc_shutdown(u32 ref)
1272 {
1273 	struct port *p_ptr;
1274 	struct sk_buff *buf = NULL;
1275 
1276 	p_ptr = tipc_port_lock(ref);
1277 	if (!p_ptr)
1278 		return -EINVAL;
1279 
1280 	if (p_ptr->publ.connected) {
1281 		u32 imp = msg_importance(&p_ptr->publ.phdr);
1282 		if (imp < TIPC_CRITICAL_IMPORTANCE)
1283 			imp++;
1284 		buf = port_build_proto_msg(port_peerport(p_ptr),
1285 					   port_peernode(p_ptr),
1286 					   ref,
1287 					   tipc_own_addr,
1288 					   imp,
1289 					   TIPC_CONN_MSG,
1290 					   TIPC_CONN_SHUTDOWN,
1291 					   port_out_seqno(p_ptr),
1292 					   0);
1293 	}
1294 	tipc_port_unlock(p_ptr);
1295 	tipc_net_route_msg(buf);
1296 	return tipc_disconnect(ref);
1297 }
1298 
tipc_isconnected(u32 ref,int * isconnected)1299 int tipc_isconnected(u32 ref, int *isconnected)
1300 {
1301 	struct port *p_ptr;
1302 
1303 	p_ptr = tipc_port_lock(ref);
1304 	if (!p_ptr)
1305 		return -EINVAL;
1306 	*isconnected = p_ptr->publ.connected;
1307 	tipc_port_unlock(p_ptr);
1308 	return 0;
1309 }
1310 
tipc_peer(u32 ref,struct tipc_portid * peer)1311 int tipc_peer(u32 ref, struct tipc_portid *peer)
1312 {
1313 	struct port *p_ptr;
1314 	int res;
1315 
1316 	p_ptr = tipc_port_lock(ref);
1317 	if (!p_ptr)
1318 		return -EINVAL;
1319 	if (p_ptr->publ.connected) {
1320 		peer->ref = port_peerport(p_ptr);
1321 		peer->node = port_peernode(p_ptr);
1322 		res = 0;
1323 	} else
1324 		res = -ENOTCONN;
1325 	tipc_port_unlock(p_ptr);
1326 	return res;
1327 }
1328 
tipc_ref_valid(u32 ref)1329 int tipc_ref_valid(u32 ref)
1330 {
1331 	/* Works irrespective of type */
1332 	return !!tipc_ref_deref(ref);
1333 }
1334 
1335 
1336 /*
1337  *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1338  *                        message for this node.
1339  */
1340 
tipc_port_recv_sections(struct port * sender,unsigned int num_sect,struct iovec const * msg_sect)1341 int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
1342 		       struct iovec const *msg_sect)
1343 {
1344 	struct sk_buff *buf;
1345 	int res;
1346 
1347 	res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1348 			MAX_MSG_SIZE, !sender->user_port, &buf);
1349 	if (likely(buf))
1350 		tipc_port_recv_msg(buf);
1351 	return res;
1352 }
1353 
1354 /**
1355  * tipc_send - send message sections on connection
1356  */
1357 
tipc_send(u32 ref,unsigned int num_sect,struct iovec const * msg_sect)1358 int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1359 {
1360 	struct port *p_ptr;
1361 	u32 destnode;
1362 	int res;
1363 
1364 	p_ptr = tipc_port_deref(ref);
1365 	if (!p_ptr || !p_ptr->publ.connected)
1366 		return -EINVAL;
1367 
1368 	p_ptr->publ.congested = 1;
1369 	if (!tipc_port_congested(p_ptr)) {
1370 		destnode = port_peernode(p_ptr);
1371 		if (likely(destnode != tipc_own_addr))
1372 			res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1373 							   destnode);
1374 		else
1375 			res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1376 
1377 		if (likely(res != -ELINKCONG)) {
1378 			port_incr_out_seqno(p_ptr);
1379 			p_ptr->publ.congested = 0;
1380 			p_ptr->sent++;
1381 			return res;
1382 		}
1383 	}
1384 	if (port_unreliable(p_ptr)) {
1385 		p_ptr->publ.congested = 0;
1386 		/* Just calculate msg length and return */
1387 		return msg_calc_data_size(msg_sect, num_sect);
1388 	}
1389 	return -ELINKCONG;
1390 }
1391 
1392 /**
1393  * tipc_send_buf - send message buffer on connection
1394  */
1395 
tipc_send_buf(u32 ref,struct sk_buff * buf,unsigned int dsz)1396 int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1397 {
1398 	struct port *p_ptr;
1399 	struct tipc_msg *msg;
1400 	u32 destnode;
1401 	u32 hsz;
1402 	u32 sz;
1403 	u32 res;
1404 
1405 	p_ptr = tipc_port_deref(ref);
1406 	if (!p_ptr || !p_ptr->publ.connected)
1407 		return -EINVAL;
1408 
1409 	msg = &p_ptr->publ.phdr;
1410 	hsz = msg_hdr_sz(msg);
1411 	sz = hsz + dsz;
1412 	msg_set_size(msg, sz);
1413 	if (skb_cow(buf, hsz))
1414 		return -ENOMEM;
1415 
1416 	skb_push(buf, hsz);
1417 	skb_copy_to_linear_data(buf, msg, hsz);
1418 	destnode = msg_destnode(msg);
1419 	p_ptr->publ.congested = 1;
1420 	if (!tipc_port_congested(p_ptr)) {
1421 		if (likely(destnode != tipc_own_addr))
1422 			res = tipc_send_buf_fast(buf, destnode);
1423 		else {
1424 			tipc_port_recv_msg(buf);
1425 			res = sz;
1426 		}
1427 		if (likely(res != -ELINKCONG)) {
1428 			port_incr_out_seqno(p_ptr);
1429 			p_ptr->sent++;
1430 			p_ptr->publ.congested = 0;
1431 			return res;
1432 		}
1433 	}
1434 	if (port_unreliable(p_ptr)) {
1435 		p_ptr->publ.congested = 0;
1436 		return dsz;
1437 	}
1438 	return -ELINKCONG;
1439 }
1440 
1441 /**
1442  * tipc_forward2name - forward message sections to port name
1443  */
1444 
tipc_forward2name(u32 ref,struct tipc_name const * name,u32 domain,u32 num_sect,struct iovec const * msg_sect,struct tipc_portid const * orig,unsigned int importance)1445 int tipc_forward2name(u32 ref,
1446 		      struct tipc_name const *name,
1447 		      u32 domain,
1448 		      u32 num_sect,
1449 		      struct iovec const *msg_sect,
1450 		      struct tipc_portid const *orig,
1451 		      unsigned int importance)
1452 {
1453 	struct port *p_ptr;
1454 	struct tipc_msg *msg;
1455 	u32 destnode = domain;
1456 	u32 destport = 0;
1457 	int res;
1458 
1459 	p_ptr = tipc_port_deref(ref);
1460 	if (!p_ptr || p_ptr->publ.connected)
1461 		return -EINVAL;
1462 
1463 	msg = &p_ptr->publ.phdr;
1464 	msg_set_type(msg, TIPC_NAMED_MSG);
1465 	msg_set_orignode(msg, orig->node);
1466 	msg_set_origport(msg, orig->ref);
1467 	msg_set_hdr_sz(msg, LONG_H_SIZE);
1468 	msg_set_nametype(msg, name->type);
1469 	msg_set_nameinst(msg, name->instance);
1470 	msg_set_lookup_scope(msg, addr_scope(domain));
1471 	if (importance <= TIPC_CRITICAL_IMPORTANCE)
1472 		msg_set_importance(msg,importance);
1473 	destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1474 	msg_set_destnode(msg, destnode);
1475 	msg_set_destport(msg, destport);
1476 
1477 	if (likely(destport || destnode)) {
1478 		p_ptr->sent++;
1479 		if (likely(destnode == tipc_own_addr))
1480 			return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1481 		res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1482 						   destnode);
1483 		if (likely(res != -ELINKCONG))
1484 			return res;
1485 		if (port_unreliable(p_ptr)) {
1486 			/* Just calculate msg length and return */
1487 			return msg_calc_data_size(msg_sect, num_sect);
1488 		}
1489 		return -ELINKCONG;
1490 	}
1491 	return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1492 					 TIPC_ERR_NO_NAME);
1493 }
1494 
1495 /**
1496  * tipc_send2name - send message sections to port name
1497  */
1498 
tipc_send2name(u32 ref,struct tipc_name const * name,unsigned int domain,unsigned int num_sect,struct iovec const * msg_sect)1499 int tipc_send2name(u32 ref,
1500 		   struct tipc_name const *name,
1501 		   unsigned int domain,
1502 		   unsigned int num_sect,
1503 		   struct iovec const *msg_sect)
1504 {
1505 	struct tipc_portid orig;
1506 
1507 	orig.ref = ref;
1508 	orig.node = tipc_own_addr;
1509 	return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1510 				 TIPC_PORT_IMPORTANCE);
1511 }
1512 
1513 /**
1514  * tipc_forward_buf2name - forward message buffer to port name
1515  */
1516 
tipc_forward_buf2name(u32 ref,struct tipc_name const * name,u32 domain,struct sk_buff * buf,unsigned int dsz,struct tipc_portid const * orig,unsigned int importance)1517 int tipc_forward_buf2name(u32 ref,
1518 			  struct tipc_name const *name,
1519 			  u32 domain,
1520 			  struct sk_buff *buf,
1521 			  unsigned int dsz,
1522 			  struct tipc_portid const *orig,
1523 			  unsigned int importance)
1524 {
1525 	struct port *p_ptr;
1526 	struct tipc_msg *msg;
1527 	u32 destnode = domain;
1528 	u32 destport = 0;
1529 	int res;
1530 
1531 	p_ptr = (struct port *)tipc_ref_deref(ref);
1532 	if (!p_ptr || p_ptr->publ.connected)
1533 		return -EINVAL;
1534 
1535 	msg = &p_ptr->publ.phdr;
1536 	if (importance <= TIPC_CRITICAL_IMPORTANCE)
1537 		msg_set_importance(msg, importance);
1538 	msg_set_type(msg, TIPC_NAMED_MSG);
1539 	msg_set_orignode(msg, orig->node);
1540 	msg_set_origport(msg, orig->ref);
1541 	msg_set_nametype(msg, name->type);
1542 	msg_set_nameinst(msg, name->instance);
1543 	msg_set_lookup_scope(msg, addr_scope(domain));
1544 	msg_set_hdr_sz(msg, LONG_H_SIZE);
1545 	msg_set_size(msg, LONG_H_SIZE + dsz);
1546 	destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1547 	msg_set_destnode(msg, destnode);
1548 	msg_set_destport(msg, destport);
1549 	msg_dbg(msg, "forw2name ==> ");
1550 	if (skb_cow(buf, LONG_H_SIZE))
1551 		return -ENOMEM;
1552 	skb_push(buf, LONG_H_SIZE);
1553 	skb_copy_to_linear_data(buf, msg, LONG_H_SIZE);
1554 	msg_dbg(buf_msg(buf),"PREP:");
1555 	if (likely(destport || destnode)) {
1556 		p_ptr->sent++;
1557 		if (destnode == tipc_own_addr)
1558 			return tipc_port_recv_msg(buf);
1559 		res = tipc_send_buf_fast(buf, destnode);
1560 		if (likely(res != -ELINKCONG))
1561 			return res;
1562 		if (port_unreliable(p_ptr))
1563 			return dsz;
1564 		return -ELINKCONG;
1565 	}
1566 	return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1567 }
1568 
1569 /**
1570  * tipc_send_buf2name - send message buffer to port name
1571  */
1572 
tipc_send_buf2name(u32 ref,struct tipc_name const * dest,u32 domain,struct sk_buff * buf,unsigned int dsz)1573 int tipc_send_buf2name(u32 ref,
1574 		       struct tipc_name const *dest,
1575 		       u32 domain,
1576 		       struct sk_buff *buf,
1577 		       unsigned int dsz)
1578 {
1579 	struct tipc_portid orig;
1580 
1581 	orig.ref = ref;
1582 	orig.node = tipc_own_addr;
1583 	return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1584 				     TIPC_PORT_IMPORTANCE);
1585 }
1586 
1587 /**
1588  * tipc_forward2port - forward message sections to port identity
1589  */
1590 
tipc_forward2port(u32 ref,struct tipc_portid const * dest,unsigned int num_sect,struct iovec const * msg_sect,struct tipc_portid const * orig,unsigned int importance)1591 int tipc_forward2port(u32 ref,
1592 		      struct tipc_portid const *dest,
1593 		      unsigned int num_sect,
1594 		      struct iovec const *msg_sect,
1595 		      struct tipc_portid const *orig,
1596 		      unsigned int importance)
1597 {
1598 	struct port *p_ptr;
1599 	struct tipc_msg *msg;
1600 	int res;
1601 
1602 	p_ptr = tipc_port_deref(ref);
1603 	if (!p_ptr || p_ptr->publ.connected)
1604 		return -EINVAL;
1605 
1606 	msg = &p_ptr->publ.phdr;
1607 	msg_set_type(msg, TIPC_DIRECT_MSG);
1608 	msg_set_orignode(msg, orig->node);
1609 	msg_set_origport(msg, orig->ref);
1610 	msg_set_destnode(msg, dest->node);
1611 	msg_set_destport(msg, dest->ref);
1612 	msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1613 	if (importance <= TIPC_CRITICAL_IMPORTANCE)
1614 		msg_set_importance(msg, importance);
1615 	p_ptr->sent++;
1616 	if (dest->node == tipc_own_addr)
1617 		return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1618 	res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1619 	if (likely(res != -ELINKCONG))
1620 		return res;
1621 	if (port_unreliable(p_ptr)) {
1622 		/* Just calculate msg length and return */
1623 		return msg_calc_data_size(msg_sect, num_sect);
1624 	}
1625 	return -ELINKCONG;
1626 }
1627 
1628 /**
1629  * tipc_send2port - send message sections to port identity
1630  */
1631 
tipc_send2port(u32 ref,struct tipc_portid const * dest,unsigned int num_sect,struct iovec const * msg_sect)1632 int tipc_send2port(u32 ref,
1633 		   struct tipc_portid const *dest,
1634 		   unsigned int num_sect,
1635 		   struct iovec const *msg_sect)
1636 {
1637 	struct tipc_portid orig;
1638 
1639 	orig.ref = ref;
1640 	orig.node = tipc_own_addr;
1641 	return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig,
1642 				 TIPC_PORT_IMPORTANCE);
1643 }
1644 
1645 /**
1646  * tipc_forward_buf2port - forward message buffer to port identity
1647  */
tipc_forward_buf2port(u32 ref,struct tipc_portid const * dest,struct sk_buff * buf,unsigned int dsz,struct tipc_portid const * orig,unsigned int importance)1648 int tipc_forward_buf2port(u32 ref,
1649 			  struct tipc_portid const *dest,
1650 			  struct sk_buff *buf,
1651 			  unsigned int dsz,
1652 			  struct tipc_portid const *orig,
1653 			  unsigned int importance)
1654 {
1655 	struct port *p_ptr;
1656 	struct tipc_msg *msg;
1657 	int res;
1658 
1659 	p_ptr = (struct port *)tipc_ref_deref(ref);
1660 	if (!p_ptr || p_ptr->publ.connected)
1661 		return -EINVAL;
1662 
1663 	msg = &p_ptr->publ.phdr;
1664 	msg_set_type(msg, TIPC_DIRECT_MSG);
1665 	msg_set_orignode(msg, orig->node);
1666 	msg_set_origport(msg, orig->ref);
1667 	msg_set_destnode(msg, dest->node);
1668 	msg_set_destport(msg, dest->ref);
1669 	msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1670 	if (importance <= TIPC_CRITICAL_IMPORTANCE)
1671 		msg_set_importance(msg, importance);
1672 	msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1673 	if (skb_cow(buf, DIR_MSG_H_SIZE))
1674 		return -ENOMEM;
1675 
1676 	skb_push(buf, DIR_MSG_H_SIZE);
1677 	skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1678 	msg_dbg(msg, "buf2port: ");
1679 	p_ptr->sent++;
1680 	if (dest->node == tipc_own_addr)
1681 		return tipc_port_recv_msg(buf);
1682 	res = tipc_send_buf_fast(buf, dest->node);
1683 	if (likely(res != -ELINKCONG))
1684 		return res;
1685 	if (port_unreliable(p_ptr))
1686 		return dsz;
1687 	return -ELINKCONG;
1688 }
1689 
1690 /**
1691  * tipc_send_buf2port - send message buffer to port identity
1692  */
1693 
tipc_send_buf2port(u32 ref,struct tipc_portid const * dest,struct sk_buff * buf,unsigned int dsz)1694 int tipc_send_buf2port(u32 ref,
1695 		       struct tipc_portid const *dest,
1696 		       struct sk_buff *buf,
1697 		       unsigned int dsz)
1698 {
1699 	struct tipc_portid orig;
1700 
1701 	orig.ref = ref;
1702 	orig.node = tipc_own_addr;
1703 	return tipc_forward_buf2port(ref, dest, buf, dsz, &orig,
1704 				     TIPC_PORT_IMPORTANCE);
1705 }
1706 
1707