Lines Matching +full:multi +full:- +full:socket
1 // SPDX-License-Identifier: GPL-2.0-only
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
15 * This is the "low-level" comms layer.
21 * simply 32 bit numbers to the locking module - if they need to
25 * whatever it needs for inter-node communication.
29 * up to the mid-level comms layer (which understands the
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
69 struct socket *sock; /* NULL if not connected */
98 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
150 return nodeid & (CONN_HASH_SIZE-1); in nodeid_hash()
162 if (con->nodeid == nodeid) { in __find_con()
189 con->rx_buflen = dlm_config.ci_buffer_size; in nodeid2con()
190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); in nodeid2con()
191 if (!con->rx_buf) { in nodeid2con()
196 con->nodeid = nodeid; in nodeid2con()
197 mutex_init(&con->sock_mutex); in nodeid2con()
198 INIT_LIST_HEAD(&con->writequeue); in nodeid2con()
199 spin_lock_init(&con->writequeue_lock); in nodeid2con()
200 INIT_WORK(&con->swork, process_send_sockets); in nodeid2con()
201 INIT_WORK(&con->rwork, process_recv_sockets); in nodeid2con()
202 init_waitqueue_head(&con->shutdown_wait); in nodeid2con()
205 if (con->nodeid) { in nodeid2con()
208 con->connect_action = zerocon->connect_action; in nodeid2con()
209 if (!con->rx_action) in nodeid2con()
210 con->rx_action = zerocon->rx_action; in nodeid2con()
225 kfree(con->rx_buf); in nodeid2con()
230 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
255 if (na->nodeid == nodeid) in find_node_addr()
263 switch (x->ss_family) { in addr_compare()
267 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) in addr_compare()
269 if (sinx->sin_port != siny->sin_port) in addr_compare()
276 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) in addr_compare()
278 if (sinx->sin6_port != siny->sin6_port) in addr_compare()
295 return -1; in nodeid_to_addr()
299 if (na && na->addr_count) { in nodeid_to_addr()
300 memcpy(&sas, na->addr[na->curr_addr_index], in nodeid_to_addr()
304 na->curr_addr_index++; in nodeid_to_addr()
305 if (na->curr_addr_index == na->addr_count) in nodeid_to_addr()
306 na->curr_addr_index = 0; in nodeid_to_addr()
312 return -EEXIST; in nodeid_to_addr()
314 if (!na->addr_count) in nodeid_to_addr()
315 return -ENOENT; in nodeid_to_addr()
323 if (dlm_local_addr[0]->ss_family == AF_INET) { in nodeid_to_addr()
326 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; in nodeid_to_addr()
330 ret6->sin6_addr = in6->sin6_addr; in nodeid_to_addr()
339 int rv = -EEXIST; in addr_to_nodeid()
344 if (!na->addr_count) in addr_to_nodeid()
347 for (addr_i = 0; addr_i < na->addr_count; addr_i++) { in addr_to_nodeid()
348 if (addr_compare(na->addr[addr_i], addr)) { in addr_to_nodeid()
349 *nodeid = na->nodeid; in addr_to_nodeid()
367 return -ENOMEM; in dlm_lowcomms_addr()
372 return -ENOMEM; in dlm_lowcomms_addr()
380 new_node->nodeid = nodeid; in dlm_lowcomms_addr()
381 new_node->addr[0] = new_addr; in dlm_lowcomms_addr()
382 new_node->addr_count = 1; in dlm_lowcomms_addr()
383 list_add(&new_node->list, &dlm_node_addrs); in dlm_lowcomms_addr()
388 if (na->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
392 return -ENOSPC; in dlm_lowcomms_addr()
395 na->addr[na->addr_count++] = new_addr; in dlm_lowcomms_addr()
401 /* Data available on socket or listen socket received a connect */
406 read_lock_bh(&sk->sk_callback_lock); in lowcomms_data_ready()
408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) in lowcomms_data_ready()
409 queue_work(recv_workqueue, &con->rwork); in lowcomms_data_ready()
410 read_unlock_bh(&sk->sk_callback_lock); in lowcomms_data_ready()
417 read_lock_bh(&sk->sk_callback_lock); in lowcomms_write_space()
422 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
425 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
429 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
431 read_unlock_bh(&sk->sk_callback_lock); in lowcomms_write_space()
436 if (test_bit(CF_CLOSE, &con->flags)) in lowcomms_connect_sock()
438 queue_work(send_workqueue, &con->swork); in lowcomms_connect_sock()
446 * doesn't switch socket state when entering shutdown, so we in lowcomms_state_change()
449 if (sk->sk_shutdown) { in lowcomms_state_change()
450 if (sk->sk_shutdown == RCV_SHUTDOWN) in lowcomms_state_change()
452 } else if (sk->sk_state == TCP_ESTABLISHED) { in lowcomms_state_change()
466 return -ENOMEM; in dlm_lowcomms_connect_node()
477 read_lock_bh(&sk->sk_callback_lock); in lowcomms_error_report()
485 switch (sk->sk_family) { in lowcomms_error_report()
487 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
490 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
491 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
492 sk->sk_err_soft); in lowcomms_error_report()
496 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
499 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
500 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
501 sk->sk_err_soft); in lowcomms_error_report()
505 printk_ratelimited(KERN_ERR "dlm: node %d: socket error " in lowcomms_error_report()
506 "invalid socket family %d set, " in lowcomms_error_report()
508 sk->sk_family, sk->sk_err, sk->sk_err_soft); in lowcomms_error_report()
512 read_unlock_bh(&sk->sk_callback_lock); in lowcomms_error_report()
518 static void save_listen_callbacks(struct socket *sock) in save_listen_callbacks()
520 struct sock *sk = sock->sk; in save_listen_callbacks()
522 listen_sock.sk_data_ready = sk->sk_data_ready; in save_listen_callbacks()
523 listen_sock.sk_state_change = sk->sk_state_change; in save_listen_callbacks()
524 listen_sock.sk_write_space = sk->sk_write_space; in save_listen_callbacks()
525 listen_sock.sk_error_report = sk->sk_error_report; in save_listen_callbacks()
528 static void restore_callbacks(struct socket *sock) in restore_callbacks()
530 struct sock *sk = sock->sk; in restore_callbacks()
532 write_lock_bh(&sk->sk_callback_lock); in restore_callbacks()
533 sk->sk_user_data = NULL; in restore_callbacks()
534 sk->sk_data_ready = listen_sock.sk_data_ready; in restore_callbacks()
535 sk->sk_state_change = listen_sock.sk_state_change; in restore_callbacks()
536 sk->sk_write_space = listen_sock.sk_write_space; in restore_callbacks()
537 sk->sk_error_report = listen_sock.sk_error_report; in restore_callbacks()
538 write_unlock_bh(&sk->sk_callback_lock); in restore_callbacks()
541 /* Make a socket active */
542 static void add_sock(struct socket *sock, struct connection *con) in add_sock()
544 struct sock *sk = sock->sk; in add_sock()
546 write_lock_bh(&sk->sk_callback_lock); in add_sock()
547 con->sock = sock; in add_sock()
549 sk->sk_user_data = con; in add_sock()
551 sk->sk_data_ready = lowcomms_data_ready; in add_sock()
552 sk->sk_write_space = lowcomms_write_space; in add_sock()
553 sk->sk_state_change = lowcomms_state_change; in add_sock()
554 sk->sk_allocation = GFP_NOFS; in add_sock()
555 sk->sk_error_report = lowcomms_error_report; in add_sock()
556 write_unlock_bh(&sk->sk_callback_lock); in add_sock()
564 saddr->ss_family = dlm_local_addr[0]->ss_family; in make_sockaddr()
565 if (saddr->ss_family == AF_INET) { in make_sockaddr()
567 in4_addr->sin_port = cpu_to_be16(port); in make_sockaddr()
569 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); in make_sockaddr()
572 in6_addr->sin6_port = cpu_to_be16(port); in make_sockaddr()
575 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); in make_sockaddr()
582 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); in close_connection()
584 if (tx && !closing && cancel_work_sync(&con->swork)) { in close_connection()
585 log_print("canceled swork for node %d", con->nodeid); in close_connection()
586 clear_bit(CF_WRITE_PENDING, &con->flags); in close_connection()
588 if (rx && !closing && cancel_work_sync(&con->rwork)) { in close_connection()
589 log_print("canceled rwork for node %d", con->nodeid); in close_connection()
590 clear_bit(CF_READ_PENDING, &con->flags); in close_connection()
593 mutex_lock(&con->sock_mutex); in close_connection()
594 if (con->sock) { in close_connection()
595 restore_callbacks(con->sock); in close_connection()
596 sock_release(con->sock); in close_connection()
597 con->sock = NULL; in close_connection()
599 if (con->othercon && and_other) { in close_connection()
600 /* Will only re-enter once. */ in close_connection()
601 close_connection(con->othercon, false, tx, rx); in close_connection()
604 con->rx_leftover = 0; in close_connection()
605 con->retries = 0; in close_connection()
606 mutex_unlock(&con->sock_mutex); in close_connection()
607 clear_bit(CF_CLOSING, &con->flags); in close_connection()
614 flush_work(&con->swork); in shutdown_connection()
616 mutex_lock(&con->sock_mutex); in shutdown_connection()
618 if (!con->sock) { in shutdown_connection()
619 mutex_unlock(&con->sock_mutex); in shutdown_connection()
623 set_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
624 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
625 mutex_unlock(&con->sock_mutex); in shutdown_connection()
631 ret = wait_event_timeout(con->shutdown_wait, in shutdown_connection()
632 !test_bit(CF_SHUTDOWN, &con->flags), in shutdown_connection()
644 clear_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
650 if (con->othercon) in dlm_tcp_shutdown()
651 shutdown_connection(con->othercon); in dlm_tcp_shutdown()
661 return -ENOMEM; in con_realloc_receive_buf()
664 if (con->rx_leftover) in con_realloc_receive_buf()
665 memmove(newbuf, con->rx_buf, con->rx_leftover); in con_realloc_receive_buf()
668 kfree(con->rx_buf); in con_realloc_receive_buf()
669 con->rx_buflen = newlen; in con_realloc_receive_buf()
670 con->rx_buf = newbuf; in con_realloc_receive_buf()
683 mutex_lock(&con->sock_mutex); in receive_from_sock()
685 if (con->sock == NULL) { in receive_from_sock()
686 ret = -EAGAIN; in receive_from_sock()
690 if (con->nodeid == 0) { in receive_from_sock()
691 ret = -EINVAL; in receive_from_sock()
697 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { in receive_from_sock()
706 iov.iov_base = con->rx_buf + con->rx_leftover; in receive_from_sock()
707 iov.iov_len = con->rx_buflen - con->rx_leftover; in receive_from_sock()
711 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
719 buflen = ret + con->rx_leftover; in receive_from_sock()
720 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); in receive_from_sock()
728 con->rx_leftover = buflen - ret; in receive_from_sock()
729 if (con->rx_leftover) { in receive_from_sock()
730 memmove(con->rx_buf, con->rx_buf + ret, in receive_from_sock()
731 con->rx_leftover); in receive_from_sock()
738 mutex_unlock(&con->sock_mutex); in receive_from_sock()
742 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) in receive_from_sock()
743 queue_work(recv_workqueue, &con->rwork); in receive_from_sock()
744 mutex_unlock(&con->sock_mutex); in receive_from_sock()
745 return -EAGAIN; in receive_from_sock()
748 mutex_unlock(&con->sock_mutex); in receive_from_sock()
749 if (ret != -EAGAIN) { in receive_from_sock()
754 con, con->nodeid); in receive_from_sock()
756 clear_bit(CF_SHUTDOWN, &con->flags); in receive_from_sock()
757 wake_up(&con->shutdown_wait); in receive_from_sock()
759 ret = -1; in receive_from_sock()
765 /* Listening socket is busy, accept a connection */
770 struct socket *newsock; in accept_from_sock()
778 return -1; in accept_from_sock()
781 mutex_lock_nested(&con->sock_mutex, 0); in accept_from_sock()
783 if (!con->sock) { in accept_from_sock()
784 mutex_unlock(&con->sock_mutex); in accept_from_sock()
785 return -ENOTCONN; in accept_from_sock()
788 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); in accept_from_sock()
792 /* Get the connected socket's peer */ in accept_from_sock()
794 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); in accept_from_sock()
796 result = -ECONNABORTED; in accept_from_sock()
808 mutex_unlock(&con->sock_mutex); in accept_from_sock()
809 return -1; in accept_from_sock()
813 sock_set_mark(newsock->sk, mark); in accept_from_sock()
824 result = -ENOMEM; in accept_from_sock()
827 mutex_lock_nested(&newcon->sock_mutex, 1); in accept_from_sock()
828 if (newcon->sock) { in accept_from_sock()
829 struct connection *othercon = newcon->othercon; in accept_from_sock()
834 log_print("failed to allocate incoming socket"); in accept_from_sock()
835 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
836 result = -ENOMEM; in accept_from_sock()
840 othercon->rx_buflen = dlm_config.ci_buffer_size; in accept_from_sock()
841 othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS); in accept_from_sock()
842 if (!othercon->rx_buf) { in accept_from_sock()
843 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
845 log_print("failed to allocate incoming socket receive buffer"); in accept_from_sock()
846 result = -ENOMEM; in accept_from_sock()
850 othercon->nodeid = nodeid; in accept_from_sock()
851 othercon->rx_action = receive_from_sock; in accept_from_sock()
852 mutex_init(&othercon->sock_mutex); in accept_from_sock()
853 INIT_LIST_HEAD(&othercon->writequeue); in accept_from_sock()
854 spin_lock_init(&othercon->writequeue_lock); in accept_from_sock()
855 INIT_WORK(&othercon->swork, process_send_sockets); in accept_from_sock()
856 INIT_WORK(&othercon->rwork, process_recv_sockets); in accept_from_sock()
857 init_waitqueue_head(&othercon->shutdown_wait); in accept_from_sock()
858 set_bit(CF_IS_OTHERCON, &othercon->flags); in accept_from_sock()
864 mutex_lock_nested(&othercon->sock_mutex, 2); in accept_from_sock()
865 newcon->othercon = othercon; in accept_from_sock()
868 mutex_unlock(&othercon->sock_mutex); in accept_from_sock()
871 newcon->rx_action = receive_from_sock; in accept_from_sock()
879 mutex_unlock(&newcon->sock_mutex); in accept_from_sock()
883 * between processing the accept adding the socket in accept_from_sock()
886 if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags)) in accept_from_sock()
887 queue_work(recv_workqueue, &addcon->rwork); in accept_from_sock()
888 mutex_unlock(&con->sock_mutex); in accept_from_sock()
893 mutex_unlock(&con->sock_mutex); in accept_from_sock()
897 if (result != -EAGAIN) in accept_from_sock()
904 __free_page(e->page); in free_entry()
909 * writequeue_entry_complete - try to delete and free write queue entry
917 e->offset += completed; in writequeue_entry_complete()
918 e->len -= completed; in writequeue_entry_complete()
920 if (e->len == 0 && e->users == 0) { in writequeue_entry_complete()
921 list_del(&e->list); in writequeue_entry_complete()
927 * sctp_bind_addrs - bind a SCTP socket to all our addresses
940 result = kernel_bind(con->sock, addr, addr_len); in sctp_bind_addrs()
942 result = sock_bind_add(con->sock->sk, addr, addr_len); in sctp_bind_addrs()
955 peeled-off socket for this association, so we use the listening socket
963 struct socket *sock; in sctp_connect_to_sock()
966 if (con->nodeid == 0) { in sctp_connect_to_sock()
971 dlm_comm_mark(con->nodeid, &mark); in sctp_connect_to_sock()
973 mutex_lock(&con->sock_mutex); in sctp_connect_to_sock()
975 /* Some odd races can cause double-connects, ignore them */ in sctp_connect_to_sock()
976 if (con->retries++ > MAX_CONNECT_RETRIES) in sctp_connect_to_sock()
979 if (con->sock) { in sctp_connect_to_sock()
980 log_print("node %d already connected.", con->nodeid); in sctp_connect_to_sock()
985 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); in sctp_connect_to_sock()
987 log_print("no address for nodeid %d", con->nodeid); in sctp_connect_to_sock()
991 /* Create a socket to communicate with */ in sctp_connect_to_sock()
992 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in sctp_connect_to_sock()
997 sock_set_mark(sock->sk, mark); in sctp_connect_to_sock()
999 con->rx_action = receive_from_sock; in sctp_connect_to_sock()
1000 con->connect_action = sctp_connect_to_sock; in sctp_connect_to_sock()
1009 log_print("connecting to %d", con->nodeid); in sctp_connect_to_sock()
1012 sctp_sock_set_nodelay(sock->sk); in sctp_connect_to_sock()
1015 * Make sock->ops->connect() function return in specified time, in sctp_connect_to_sock()
1019 sock_set_sndtimeo(sock->sk, 5); in sctp_connect_to_sock()
1020 result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len, in sctp_connect_to_sock()
1022 sock_set_sndtimeo(sock->sk, 0); in sctp_connect_to_sock()
1024 if (result == -EINPROGRESS) in sctp_connect_to_sock()
1030 con->sock = NULL; in sctp_connect_to_sock()
1038 if (result != -EHOSTUNREACH && in sctp_connect_to_sock()
1039 result != -ENETUNREACH && in sctp_connect_to_sock()
1040 result != -ENETDOWN && in sctp_connect_to_sock()
1041 result != -EINVAL && in sctp_connect_to_sock()
1042 result != -EPROTONOSUPPORT) { in sctp_connect_to_sock()
1043 log_print("connect %d try %d error %d", con->nodeid, in sctp_connect_to_sock()
1044 con->retries, result); in sctp_connect_to_sock()
1045 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1052 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1055 /* Connect a new socket to its peer */
1060 struct socket *sock = NULL; in tcp_connect_to_sock()
1064 if (con->nodeid == 0) { in tcp_connect_to_sock()
1069 dlm_comm_mark(con->nodeid, &mark); in tcp_connect_to_sock()
1071 mutex_lock(&con->sock_mutex); in tcp_connect_to_sock()
1072 if (con->retries++ > MAX_CONNECT_RETRIES) in tcp_connect_to_sock()
1075 /* Some odd races can cause double-connects, ignore them */ in tcp_connect_to_sock()
1076 if (con->sock) in tcp_connect_to_sock()
1079 /* Create a socket to communicate with */ in tcp_connect_to_sock()
1080 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in tcp_connect_to_sock()
1085 sock_set_mark(sock->sk, mark); in tcp_connect_to_sock()
1088 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); in tcp_connect_to_sock()
1090 log_print("no address for nodeid %d", con->nodeid); in tcp_connect_to_sock()
1094 con->rx_action = receive_from_sock; in tcp_connect_to_sock()
1095 con->connect_action = tcp_connect_to_sock; in tcp_connect_to_sock()
1096 con->shutdown_action = dlm_tcp_shutdown; in tcp_connect_to_sock()
1099 /* Bind to our cluster-known address connecting to avoid in tcp_connect_to_sock()
1103 result = sock->ops->bind(sock, (struct sockaddr *) &src_addr, in tcp_connect_to_sock()
1112 log_print("connecting to %d", con->nodeid); in tcp_connect_to_sock()
1115 tcp_sock_set_nodelay(sock->sk); in tcp_connect_to_sock()
1117 result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len, in tcp_connect_to_sock()
1119 if (result == -EINPROGRESS) in tcp_connect_to_sock()
1125 if (con->sock) { in tcp_connect_to_sock()
1126 sock_release(con->sock); in tcp_connect_to_sock()
1127 con->sock = NULL; in tcp_connect_to_sock()
1135 if (result != -EHOSTUNREACH && in tcp_connect_to_sock()
1136 result != -ENETUNREACH && in tcp_connect_to_sock()
1137 result != -ENETDOWN && in tcp_connect_to_sock()
1138 result != -EINVAL && in tcp_connect_to_sock()
1139 result != -EPROTONOSUPPORT) { in tcp_connect_to_sock()
1140 log_print("connect %d try %d error %d", con->nodeid, in tcp_connect_to_sock()
1141 con->retries, result); in tcp_connect_to_sock()
1142 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1148 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1152 static struct socket *tcp_create_listen_sock(struct connection *con, in tcp_create_listen_sock()
1155 struct socket *sock = NULL; in tcp_create_listen_sock()
1159 if (dlm_local_addr[0]->ss_family == AF_INET) in tcp_create_listen_sock()
1164 /* Create a socket to communicate with */ in tcp_create_listen_sock()
1165 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in tcp_create_listen_sock()
1168 log_print("Can't create listening comms socket"); in tcp_create_listen_sock()
1172 sock_set_mark(sock->sk, dlm_config.ci_mark); in tcp_create_listen_sock()
1175 tcp_sock_set_nodelay(sock->sk); in tcp_create_listen_sock()
1177 sock_set_reuseaddr(sock->sk); in tcp_create_listen_sock()
1179 write_lock_bh(&sock->sk->sk_callback_lock); in tcp_create_listen_sock()
1180 sock->sk->sk_user_data = con; in tcp_create_listen_sock()
1182 con->rx_action = accept_from_sock; in tcp_create_listen_sock()
1183 con->connect_action = tcp_connect_to_sock; in tcp_create_listen_sock()
1184 write_unlock_bh(&sock->sk->sk_callback_lock); in tcp_create_listen_sock()
1188 result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len); in tcp_create_listen_sock()
1193 con->sock = NULL; in tcp_create_listen_sock()
1196 sock_set_keepalive(sock->sk); in tcp_create_listen_sock()
1198 result = sock->ops->listen(sock, 5); in tcp_create_listen_sock()
1236 /* Initialise SCTP socket and bind to all interfaces */
1239 struct socket *sock = NULL; in sctp_listen_for_all()
1240 int result = -EINVAL; in sctp_listen_for_all()
1244 return -ENOMEM; in sctp_listen_for_all()
1248 result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family, in sctp_listen_for_all()
1251 log_print("Can't create comms socket, check SCTP is loaded"); in sctp_listen_for_all()
1255 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); in sctp_listen_for_all()
1256 sock_set_mark(sock->sk, dlm_config.ci_mark); in sctp_listen_for_all()
1257 sctp_sock_set_nodelay(sock->sk); in sctp_listen_for_all()
1259 write_lock_bh(&sock->sk->sk_callback_lock); in sctp_listen_for_all()
1261 sock->sk->sk_user_data = con; in sctp_listen_for_all()
1263 con->sock = sock; in sctp_listen_for_all()
1264 con->sock->sk->sk_data_ready = lowcomms_data_ready; in sctp_listen_for_all()
1265 con->rx_action = accept_from_sock; in sctp_listen_for_all()
1266 con->connect_action = sctp_connect_to_sock; in sctp_listen_for_all()
1268 write_unlock_bh(&sock->sk->sk_callback_lock); in sctp_listen_for_all()
1274 result = sock->ops->listen(sock, 5); in sctp_listen_for_all()
1276 log_print("Can't set socket listening"); in sctp_listen_for_all()
1284 con->sock = NULL; in sctp_listen_for_all()
1291 struct socket *sock = NULL; in tcp_listen_for_all()
1293 int result = -EINVAL; in tcp_listen_for_all()
1296 return -ENOMEM; in tcp_listen_for_all()
1298 /* We don't support multi-homed hosts */ in tcp_listen_for_all()
1300 log_print("TCP protocol can't handle multi-homed hosts, " in tcp_listen_for_all()
1302 return -EINVAL; in tcp_listen_for_all()
1313 result = -EADDRINUSE; in tcp_listen_for_all()
1330 entry->page = alloc_page(allocation); in new_writequeue_entry()
1331 if (!entry->page) { in new_writequeue_entry()
1336 entry->offset = 0; in new_writequeue_entry()
1337 entry->len = 0; in new_writequeue_entry()
1338 entry->end = 0; in new_writequeue_entry()
1339 entry->users = 0; in new_writequeue_entry()
1340 entry->con = con; in new_writequeue_entry()
1355 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1356 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); in dlm_lowcomms_get_buffer()
1357 if ((&e->list == &con->writequeue) || in dlm_lowcomms_get_buffer()
1358 (PAGE_SIZE - e->end < len)) { in dlm_lowcomms_get_buffer()
1361 offset = e->end; in dlm_lowcomms_get_buffer()
1362 e->end += len; in dlm_lowcomms_get_buffer()
1363 e->users++; in dlm_lowcomms_get_buffer()
1365 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1369 *ppc = page_address(e->page) + offset; in dlm_lowcomms_get_buffer()
1375 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1376 offset = e->end; in dlm_lowcomms_get_buffer()
1377 e->end += len; in dlm_lowcomms_get_buffer()
1378 e->users++; in dlm_lowcomms_get_buffer()
1379 list_add_tail(&e->list, &con->writequeue); in dlm_lowcomms_get_buffer()
1380 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1389 struct connection *con = e->con; in dlm_lowcomms_commit_buffer()
1392 spin_lock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1393 users = --e->users; in dlm_lowcomms_commit_buffer()
1396 e->len = e->end - e->offset; in dlm_lowcomms_commit_buffer()
1397 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1399 queue_work(send_workqueue, &con->swork); in dlm_lowcomms_commit_buffer()
1403 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1416 mutex_lock(&con->sock_mutex); in send_to_sock()
1417 if (con->sock == NULL) in send_to_sock()
1420 spin_lock(&con->writequeue_lock); in send_to_sock()
1422 e = list_entry(con->writequeue.next, struct writequeue_entry, in send_to_sock()
1424 if ((struct list_head *) e == &con->writequeue) in send_to_sock()
1427 len = e->len; in send_to_sock()
1428 offset = e->offset; in send_to_sock()
1429 BUG_ON(len == 0 && e->users == 0); in send_to_sock()
1430 spin_unlock(&con->writequeue_lock); in send_to_sock()
1434 ret = kernel_sendpage(con->sock, e->page, offset, len, in send_to_sock()
1436 if (ret == -EAGAIN || ret == 0) { in send_to_sock()
1437 if (ret == -EAGAIN && in send_to_sock()
1438 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1439 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1443 set_bit(SOCK_NOSPACE, &con->sock->flags); in send_to_sock()
1444 con->sock->sk->sk_write_pending++; in send_to_sock()
1458 spin_lock(&con->writequeue_lock); in send_to_sock()
1461 spin_unlock(&con->writequeue_lock); in send_to_sock()
1463 mutex_unlock(&con->sock_mutex); in send_to_sock()
1467 mutex_unlock(&con->sock_mutex); in send_to_sock()
1471 queue_work(send_workqueue, &con->swork); in send_to_sock()
1475 mutex_unlock(&con->sock_mutex); in send_to_sock()
1476 queue_work(send_workqueue, &con->swork); in send_to_sock()
1484 spin_lock(&con->writequeue_lock); in clean_one_writequeue()
1485 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1486 list_del(&e->list); in clean_one_writequeue()
1489 spin_unlock(&con->writequeue_lock); in clean_one_writequeue()
1502 set_bit(CF_CLOSE, &con->flags); in dlm_lowcomms_close()
1510 list_del(&na->list); in dlm_lowcomms_close()
1511 while (na->addr_count--) in dlm_lowcomms_close()
1512 kfree(na->addr[na->addr_count]); in dlm_lowcomms_close()
1526 clear_bit(CF_READ_PENDING, &con->flags); in process_recv_sockets()
1528 err = con->rx_action(con); in process_recv_sockets()
1537 clear_bit(CF_WRITE_PENDING, &con->flags); in process_send_sockets()
1538 if (con->sock == NULL) /* not mutex protected so check it inside too */ in process_send_sockets()
1539 con->connect_action(con); in process_send_sockets()
1540 if (!list_empty(&con->writequeue)) in process_send_sockets()
1558 return -ENOMEM; in work_start()
1566 return -ENOMEM; in work_start()
1574 mutex_lock(&con->sock_mutex); in _stop_conn()
1575 set_bit(CF_CLOSE, &con->flags); in _stop_conn()
1576 set_bit(CF_READ_PENDING, &con->flags); in _stop_conn()
1577 set_bit(CF_WRITE_PENDING, &con->flags); in _stop_conn()
1578 if (con->sock && con->sock->sk) { in _stop_conn()
1579 write_lock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1580 con->sock->sk->sk_user_data = NULL; in _stop_conn()
1581 write_unlock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1583 if (con->othercon && and_other) in _stop_conn()
1584 _stop_conn(con->othercon, false); in _stop_conn()
1585 mutex_unlock(&con->sock_mutex); in _stop_conn()
1595 if (con->shutdown_action) in shutdown_conn()
1596 con->shutdown_action(con); in shutdown_conn()
1603 kfree(con->rx_buf); in connection_release()
1611 hlist_del_rcu(&con->list); in free_conn()
1613 if (con->othercon) { in free_conn()
1614 clean_one_writequeue(con->othercon); in free_conn()
1615 call_rcu(&con->othercon->rcu, connection_release); in free_conn()
1618 call_rcu(&con->rcu, connection_release); in free_conn()
1638 ok &= test_bit(CF_READ_PENDING, &con->flags); in work_flush()
1639 ok &= test_bit(CF_WRITE_PENDING, &con->flags); in work_flush()
1640 if (con->othercon) { in work_flush()
1642 &con->othercon->flags); in work_flush()
1644 &con->othercon->flags); in work_flush()
1655 socket activity. in dlm_lowcomms_stop()
1673 int error = -EINVAL; in dlm_lowcomms_start()
1682 error = -ENOTCONN; in dlm_lowcomms_start()
1718 list_del(&na->list); in dlm_lowcomms_exit()
1719 while (na->addr_count--) in dlm_lowcomms_exit()
1720 kfree(na->addr[na->addr_count]); in dlm_lowcomms_exit()