Lines Matching refs:con
85 void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
108 struct connection *con; member
156 struct connection *con; in __find_con() local
161 hlist_for_each_entry_rcu(con, &connection_hash[r], list) { in __find_con()
162 if (con->nodeid == nodeid) { in __find_con()
164 return con; in __find_con()
178 struct connection *con, *tmp; in nodeid2con() local
181 con = __find_con(nodeid); in nodeid2con()
182 if (con || !alloc) in nodeid2con()
183 return con; in nodeid2con()
185 con = kzalloc(sizeof(*con), alloc); in nodeid2con()
186 if (!con) in nodeid2con()
189 con->rx_buflen = dlm_config.ci_buffer_size; in nodeid2con()
190 con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS); in nodeid2con()
191 if (!con->rx_buf) { in nodeid2con()
192 kfree(con); in nodeid2con()
196 con->nodeid = nodeid; in nodeid2con()
197 mutex_init(&con->sock_mutex); in nodeid2con()
198 INIT_LIST_HEAD(&con->writequeue); in nodeid2con()
199 spin_lock_init(&con->writequeue_lock); in nodeid2con()
200 INIT_WORK(&con->swork, process_send_sockets); in nodeid2con()
201 INIT_WORK(&con->rwork, process_recv_sockets); in nodeid2con()
202 init_waitqueue_head(&con->shutdown_wait); in nodeid2con()
205 if (con->nodeid) { in nodeid2con()
208 con->connect_action = zerocon->connect_action; in nodeid2con()
209 if (!con->rx_action) in nodeid2con()
210 con->rx_action = zerocon->rx_action; in nodeid2con()
225 kfree(con->rx_buf); in nodeid2con()
226 kfree(con); in nodeid2con()
230 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
233 return con; in nodeid2con()
240 struct connection *con; in foreach_conn() local
244 hlist_for_each_entry_rcu(con, &connection_hash[i], list) in foreach_conn()
245 conn_func(con); in foreach_conn()
404 struct connection *con; in lowcomms_data_ready() local
407 con = sock2con(sk); in lowcomms_data_ready()
408 if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags)) in lowcomms_data_ready()
409 queue_work(recv_workqueue, &con->rwork); in lowcomms_data_ready()
415 struct connection *con; in lowcomms_write_space() local
418 con = sock2con(sk); in lowcomms_write_space()
419 if (!con) in lowcomms_write_space()
422 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
424 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
425 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
426 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
429 queue_work(send_workqueue, &con->swork); in lowcomms_write_space()
434 static inline void lowcomms_connect_sock(struct connection *con) in lowcomms_connect_sock() argument
436 if (test_bit(CF_CLOSE, &con->flags)) in lowcomms_connect_sock()
438 queue_work(send_workqueue, &con->swork); in lowcomms_connect_sock()
459 struct connection *con; in dlm_lowcomms_connect_node() local
464 con = nodeid2con(nodeid, GFP_NOFS); in dlm_lowcomms_connect_node()
465 if (!con) in dlm_lowcomms_connect_node()
467 lowcomms_connect_sock(con); in dlm_lowcomms_connect_node()
473 struct connection *con; in lowcomms_error_report() local
478 con = sock2con(sk); in lowcomms_error_report()
479 if (con == NULL) in lowcomms_error_report()
490 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
499 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
542 static void add_sock(struct socket *sock, struct connection *con) in add_sock() argument
547 con->sock = sock; in add_sock()
549 sk->sk_user_data = con; in add_sock()
579 static void close_connection(struct connection *con, bool and_other, in close_connection() argument
582 bool closing = test_and_set_bit(CF_CLOSING, &con->flags); in close_connection()
584 if (tx && !closing && cancel_work_sync(&con->swork)) { in close_connection()
585 log_print("canceled swork for node %d", con->nodeid); in close_connection()
586 clear_bit(CF_WRITE_PENDING, &con->flags); in close_connection()
588 if (rx && !closing && cancel_work_sync(&con->rwork)) { in close_connection()
589 log_print("canceled rwork for node %d", con->nodeid); in close_connection()
590 clear_bit(CF_READ_PENDING, &con->flags); in close_connection()
593 mutex_lock(&con->sock_mutex); in close_connection()
594 if (con->sock) { in close_connection()
595 restore_callbacks(con->sock); in close_connection()
596 sock_release(con->sock); in close_connection()
597 con->sock = NULL; in close_connection()
599 if (con->othercon && and_other) { in close_connection()
601 close_connection(con->othercon, false, tx, rx); in close_connection()
604 con->rx_leftover = 0; in close_connection()
605 con->retries = 0; in close_connection()
606 mutex_unlock(&con->sock_mutex); in close_connection()
607 clear_bit(CF_CLOSING, &con->flags); in close_connection()
610 static void shutdown_connection(struct connection *con) in shutdown_connection() argument
614 flush_work(&con->swork); in shutdown_connection()
616 mutex_lock(&con->sock_mutex); in shutdown_connection()
618 if (!con->sock) { in shutdown_connection()
619 mutex_unlock(&con->sock_mutex); in shutdown_connection()
623 set_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
624 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
625 mutex_unlock(&con->sock_mutex); in shutdown_connection()
628 con, ret); in shutdown_connection()
631 ret = wait_event_timeout(con->shutdown_wait, in shutdown_connection()
632 !test_bit(CF_SHUTDOWN, &con->flags), in shutdown_connection()
636 con); in shutdown_connection()
644 clear_bit(CF_SHUTDOWN, &con->flags); in shutdown_connection()
645 close_connection(con, false, true, true); in shutdown_connection()
648 static void dlm_tcp_shutdown(struct connection *con) in dlm_tcp_shutdown() argument
650 if (con->othercon) in dlm_tcp_shutdown()
651 shutdown_connection(con->othercon); in dlm_tcp_shutdown()
652 shutdown_connection(con); in dlm_tcp_shutdown()
655 static int con_realloc_receive_buf(struct connection *con, int newlen) in con_realloc_receive_buf() argument
664 if (con->rx_leftover) in con_realloc_receive_buf()
665 memmove(newbuf, con->rx_buf, con->rx_leftover); in con_realloc_receive_buf()
668 kfree(con->rx_buf); in con_realloc_receive_buf()
669 con->rx_buflen = newlen; in con_realloc_receive_buf()
670 con->rx_buf = newbuf; in con_realloc_receive_buf()
676 static int receive_from_sock(struct connection *con) in receive_from_sock() argument
683 mutex_lock(&con->sock_mutex); in receive_from_sock()
685 if (con->sock == NULL) { in receive_from_sock()
690 if (con->nodeid == 0) { in receive_from_sock()
697 if (con->rx_buflen != buflen && con->rx_leftover <= buflen) { in receive_from_sock()
698 ret = con_realloc_receive_buf(con, buflen); in receive_from_sock()
706 iov.iov_base = con->rx_buf + con->rx_leftover; in receive_from_sock()
707 iov.iov_len = con->rx_buflen - con->rx_leftover; in receive_from_sock()
711 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
719 buflen = ret + con->rx_leftover; in receive_from_sock()
720 ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen); in receive_from_sock()
728 con->rx_leftover = buflen - ret; in receive_from_sock()
729 if (con->rx_leftover) { in receive_from_sock()
730 memmove(con->rx_buf, con->rx_buf + ret, in receive_from_sock()
731 con->rx_leftover); in receive_from_sock()
738 mutex_unlock(&con->sock_mutex); in receive_from_sock()
742 if (!test_and_set_bit(CF_READ_PENDING, &con->flags)) in receive_from_sock()
743 queue_work(recv_workqueue, &con->rwork); in receive_from_sock()
744 mutex_unlock(&con->sock_mutex); in receive_from_sock()
748 mutex_unlock(&con->sock_mutex); in receive_from_sock()
751 close_connection(con, false, true, false); in receive_from_sock()
754 con, con->nodeid); in receive_from_sock()
756 clear_bit(CF_SHUTDOWN, &con->flags); in receive_from_sock()
757 wake_up(&con->shutdown_wait); in receive_from_sock()
766 static int accept_from_sock(struct connection *con) in accept_from_sock() argument
781 mutex_lock_nested(&con->sock_mutex, 0); in accept_from_sock()
783 if (!con->sock) { in accept_from_sock()
784 mutex_unlock(&con->sock_mutex); in accept_from_sock()
788 result = kernel_accept(con->sock, &newsock, O_NONBLOCK); in accept_from_sock()
808 mutex_unlock(&con->sock_mutex); in accept_from_sock()
888 mutex_unlock(&con->sock_mutex); in accept_from_sock()
893 mutex_unlock(&con->sock_mutex); in accept_from_sock()
929 static int sctp_bind_addrs(struct connection *con, uint16_t port) in sctp_bind_addrs() argument
940 result = kernel_bind(con->sock, addr, addr_len); in sctp_bind_addrs()
942 result = sock_bind_add(con->sock->sk, addr, addr_len); in sctp_bind_addrs()
958 static void sctp_connect_to_sock(struct connection *con) in sctp_connect_to_sock() argument
966 if (con->nodeid == 0) { in sctp_connect_to_sock()
971 dlm_comm_mark(con->nodeid, &mark); in sctp_connect_to_sock()
973 mutex_lock(&con->sock_mutex); in sctp_connect_to_sock()
976 if (con->retries++ > MAX_CONNECT_RETRIES) in sctp_connect_to_sock()
979 if (con->sock) { in sctp_connect_to_sock()
980 log_print("node %d already connected.", con->nodeid); in sctp_connect_to_sock()
985 result = nodeid_to_addr(con->nodeid, &daddr, NULL, true); in sctp_connect_to_sock()
987 log_print("no address for nodeid %d", con->nodeid); in sctp_connect_to_sock()
999 con->rx_action = receive_from_sock; in sctp_connect_to_sock()
1000 con->connect_action = sctp_connect_to_sock; in sctp_connect_to_sock()
1001 add_sock(sock, con); in sctp_connect_to_sock()
1004 if (sctp_bind_addrs(con, 0)) in sctp_connect_to_sock()
1009 log_print("connecting to %d", con->nodeid); in sctp_connect_to_sock()
1030 con->sock = NULL; in sctp_connect_to_sock()
1043 log_print("connect %d try %d error %d", con->nodeid, in sctp_connect_to_sock()
1044 con->retries, result); in sctp_connect_to_sock()
1045 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1047 lowcomms_connect_sock(con); in sctp_connect_to_sock()
1052 mutex_unlock(&con->sock_mutex); in sctp_connect_to_sock()
1056 static void tcp_connect_to_sock(struct connection *con) in tcp_connect_to_sock() argument
1064 if (con->nodeid == 0) { in tcp_connect_to_sock()
1069 dlm_comm_mark(con->nodeid, &mark); in tcp_connect_to_sock()
1071 mutex_lock(&con->sock_mutex); in tcp_connect_to_sock()
1072 if (con->retries++ > MAX_CONNECT_RETRIES) in tcp_connect_to_sock()
1076 if (con->sock) in tcp_connect_to_sock()
1088 result = nodeid_to_addr(con->nodeid, &saddr, NULL, false); in tcp_connect_to_sock()
1090 log_print("no address for nodeid %d", con->nodeid); in tcp_connect_to_sock()
1094 con->rx_action = receive_from_sock; in tcp_connect_to_sock()
1095 con->connect_action = tcp_connect_to_sock; in tcp_connect_to_sock()
1096 con->shutdown_action = dlm_tcp_shutdown; in tcp_connect_to_sock()
1097 add_sock(sock, con); in tcp_connect_to_sock()
1112 log_print("connecting to %d", con->nodeid); in tcp_connect_to_sock()
1125 if (con->sock) { in tcp_connect_to_sock()
1126 sock_release(con->sock); in tcp_connect_to_sock()
1127 con->sock = NULL; in tcp_connect_to_sock()
1140 log_print("connect %d try %d error %d", con->nodeid, in tcp_connect_to_sock()
1141 con->retries, result); in tcp_connect_to_sock()
1142 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1144 lowcomms_connect_sock(con); in tcp_connect_to_sock()
1148 mutex_unlock(&con->sock_mutex); in tcp_connect_to_sock()
1152 static struct socket *tcp_create_listen_sock(struct connection *con, in tcp_create_listen_sock() argument
1180 sock->sk->sk_user_data = con; in tcp_create_listen_sock()
1182 con->rx_action = accept_from_sock; in tcp_create_listen_sock()
1183 con->connect_action = tcp_connect_to_sock; in tcp_create_listen_sock()
1193 con->sock = NULL; in tcp_create_listen_sock()
1241 struct connection *con = nodeid2con(0, GFP_NOFS); in sctp_listen_for_all() local
1243 if (!con) in sctp_listen_for_all()
1261 sock->sk->sk_user_data = con; in sctp_listen_for_all()
1263 con->sock = sock; in sctp_listen_for_all()
1264 con->sock->sk->sk_data_ready = lowcomms_data_ready; in sctp_listen_for_all()
1265 con->rx_action = accept_from_sock; in sctp_listen_for_all()
1266 con->connect_action = sctp_connect_to_sock; in sctp_listen_for_all()
1271 if (sctp_bind_addrs(con, dlm_config.ci_tcp_port)) in sctp_listen_for_all()
1284 con->sock = NULL; in sctp_listen_for_all()
1292 struct connection *con = nodeid2con(0, GFP_NOFS); in tcp_listen_for_all() local
1295 if (!con) in tcp_listen_for_all()
1307 sock = tcp_create_listen_sock(con, dlm_local_addr[0]); in tcp_listen_for_all()
1309 add_sock(sock, con); in tcp_listen_for_all()
1321 static struct writequeue_entry *new_writequeue_entry(struct connection *con, in new_writequeue_entry() argument
1340 entry->con = con; in new_writequeue_entry()
1347 struct connection *con; in dlm_lowcomms_get_buffer() local
1351 con = nodeid2con(nodeid, allocation); in dlm_lowcomms_get_buffer()
1352 if (!con) in dlm_lowcomms_get_buffer()
1355 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1356 e = list_entry(con->writequeue.prev, struct writequeue_entry, list); in dlm_lowcomms_get_buffer()
1357 if ((&e->list == &con->writequeue) || in dlm_lowcomms_get_buffer()
1365 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1373 e = new_writequeue_entry(con, allocation); in dlm_lowcomms_get_buffer()
1375 spin_lock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1379 list_add_tail(&e->list, &con->writequeue); in dlm_lowcomms_get_buffer()
1380 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_get_buffer()
1389 struct connection *con = e->con; in dlm_lowcomms_commit_buffer() local
1392 spin_lock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1397 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1399 queue_work(send_workqueue, &con->swork); in dlm_lowcomms_commit_buffer()
1403 spin_unlock(&con->writequeue_lock); in dlm_lowcomms_commit_buffer()
1408 static void send_to_sock(struct connection *con) in send_to_sock() argument
1416 mutex_lock(&con->sock_mutex); in send_to_sock()
1417 if (con->sock == NULL) in send_to_sock()
1420 spin_lock(&con->writequeue_lock); in send_to_sock()
1422 e = list_entry(con->writequeue.next, struct writequeue_entry, in send_to_sock()
1424 if ((struct list_head *) e == &con->writequeue) in send_to_sock()
1430 spin_unlock(&con->writequeue_lock); in send_to_sock()
1434 ret = kernel_sendpage(con->sock, e->page, offset, len, in send_to_sock()
1438 test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1439 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1443 set_bit(SOCK_NOSPACE, &con->sock->flags); in send_to_sock()
1444 con->sock->sk->sk_write_pending++; in send_to_sock()
1458 spin_lock(&con->writequeue_lock); in send_to_sock()
1461 spin_unlock(&con->writequeue_lock); in send_to_sock()
1463 mutex_unlock(&con->sock_mutex); in send_to_sock()
1467 mutex_unlock(&con->sock_mutex); in send_to_sock()
1468 close_connection(con, false, false, true); in send_to_sock()
1471 queue_work(send_workqueue, &con->swork); in send_to_sock()
1475 mutex_unlock(&con->sock_mutex); in send_to_sock()
1476 queue_work(send_workqueue, &con->swork); in send_to_sock()
1480 static void clean_one_writequeue(struct connection *con) in clean_one_writequeue() argument
1484 spin_lock(&con->writequeue_lock); in clean_one_writequeue()
1485 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1489 spin_unlock(&con->writequeue_lock); in clean_one_writequeue()
1496 struct connection *con; in dlm_lowcomms_close() local
1500 con = nodeid2con(nodeid, 0); in dlm_lowcomms_close()
1501 if (con) { in dlm_lowcomms_close()
1502 set_bit(CF_CLOSE, &con->flags); in dlm_lowcomms_close()
1503 close_connection(con, true, true, true); in dlm_lowcomms_close()
1504 clean_one_writequeue(con); in dlm_lowcomms_close()
1523 struct connection *con = container_of(work, struct connection, rwork); in process_recv_sockets() local
1526 clear_bit(CF_READ_PENDING, &con->flags); in process_recv_sockets()
1528 err = con->rx_action(con); in process_recv_sockets()
1535 struct connection *con = container_of(work, struct connection, swork); in process_send_sockets() local
1537 clear_bit(CF_WRITE_PENDING, &con->flags); in process_send_sockets()
1538 if (con->sock == NULL) /* not mutex protected so check it inside too */ in process_send_sockets()
1539 con->connect_action(con); in process_send_sockets()
1540 if (!list_empty(&con->writequeue)) in process_send_sockets()
1541 send_to_sock(con); in process_send_sockets()
1572 static void _stop_conn(struct connection *con, bool and_other) in _stop_conn() argument
1574 mutex_lock(&con->sock_mutex); in _stop_conn()
1575 set_bit(CF_CLOSE, &con->flags); in _stop_conn()
1576 set_bit(CF_READ_PENDING, &con->flags); in _stop_conn()
1577 set_bit(CF_WRITE_PENDING, &con->flags); in _stop_conn()
1578 if (con->sock && con->sock->sk) { in _stop_conn()
1579 write_lock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1580 con->sock->sk->sk_user_data = NULL; in _stop_conn()
1581 write_unlock_bh(&con->sock->sk->sk_callback_lock); in _stop_conn()
1583 if (con->othercon && and_other) in _stop_conn()
1584 _stop_conn(con->othercon, false); in _stop_conn()
1585 mutex_unlock(&con->sock_mutex); in _stop_conn()
1588 static void stop_conn(struct connection *con) in stop_conn() argument
1590 _stop_conn(con, true); in stop_conn()
1593 static void shutdown_conn(struct connection *con) in shutdown_conn() argument
1595 if (con->shutdown_action) in shutdown_conn()
1596 con->shutdown_action(con); in shutdown_conn()
1601 struct connection *con = container_of(rcu, struct connection, rcu); in connection_release() local
1603 kfree(con->rx_buf); in connection_release()
1604 kfree(con); in connection_release()
1607 static void free_conn(struct connection *con) in free_conn() argument
1609 close_connection(con, true, true, true); in free_conn()
1611 hlist_del_rcu(&con->list); in free_conn()
1613 if (con->othercon) { in free_conn()
1614 clean_one_writequeue(con->othercon); in free_conn()
1615 call_rcu(&con->othercon->rcu, connection_release); in free_conn()
1617 clean_one_writequeue(con); in free_conn()
1618 call_rcu(&con->rcu, connection_release); in free_conn()
1625 struct connection *con; in work_flush() local
1636 hlist_for_each_entry_rcu(con, &connection_hash[i], in work_flush()
1638 ok &= test_bit(CF_READ_PENDING, &con->flags); in work_flush()
1639 ok &= test_bit(CF_WRITE_PENDING, &con->flags); in work_flush()
1640 if (con->othercon) { in work_flush()
1642 &con->othercon->flags); in work_flush()
1644 &con->othercon->flags); in work_flush()
1674 struct connection *con; in dlm_lowcomms_start() local
1705 con = nodeid2con(0,0); in dlm_lowcomms_start()
1706 if (con) in dlm_lowcomms_start()
1707 free_conn(con); in dlm_lowcomms_start()