• Home
  • Raw
  • Download

Lines Matching refs:con

99 void ceph_con_flag_clear(struct ceph_connection *con, unsigned long con_flag)  in ceph_con_flag_clear()  argument
103 clear_bit(con_flag, &con->flags); in ceph_con_flag_clear()
106 void ceph_con_flag_set(struct ceph_connection *con, unsigned long con_flag) in ceph_con_flag_set() argument
110 set_bit(con_flag, &con->flags); in ceph_con_flag_set()
113 bool ceph_con_flag_test(struct ceph_connection *con, unsigned long con_flag) in ceph_con_flag_test() argument
117 return test_bit(con_flag, &con->flags); in ceph_con_flag_test()
120 bool ceph_con_flag_test_and_clear(struct ceph_connection *con, in ceph_con_flag_test_and_clear() argument
125 return test_and_clear_bit(con_flag, &con->flags); in ceph_con_flag_test_and_clear()
128 bool ceph_con_flag_test_and_set(struct ceph_connection *con, in ceph_con_flag_test_and_set() argument
133 return test_and_set_bit(con_flag, &con->flags); in ceph_con_flag_test_and_set()
144 static void queue_con(struct ceph_connection *con);
145 static void cancel_con(struct ceph_connection *con);
147 static void con_fault(struct ceph_connection *con);
279 static void con_sock_state_init(struct ceph_connection *con) in con_sock_state_init() argument
283 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_init()
286 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_init()
290 static void con_sock_state_connecting(struct ceph_connection *con) in con_sock_state_connecting() argument
294 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING); in con_sock_state_connecting()
297 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connecting()
301 static void con_sock_state_connected(struct ceph_connection *con) in con_sock_state_connected() argument
305 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED); in con_sock_state_connected()
308 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_connected()
312 static void con_sock_state_closing(struct ceph_connection *con) in con_sock_state_closing() argument
316 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING); in con_sock_state_closing()
321 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closing()
325 static void con_sock_state_closed(struct ceph_connection *con) in con_sock_state_closed() argument
329 old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED); in con_sock_state_closed()
335 dout("%s con %p sock %d -> %d\n", __func__, con, old_state, in con_sock_state_closed()
346 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_data_ready() local
347 if (atomic_read(&con->msgr->stopping)) { in ceph_sock_data_ready()
353 con, con->state); in ceph_sock_data_ready()
354 queue_con(con); in ceph_sock_data_ready()
361 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_write_space() local
370 if (ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)) { in ceph_sock_write_space()
372 dout("%s %p queueing write work\n", __func__, con); in ceph_sock_write_space()
374 queue_con(con); in ceph_sock_write_space()
377 dout("%s %p nothing to write\n", __func__, con); in ceph_sock_write_space()
384 struct ceph_connection *con = sk->sk_user_data; in ceph_sock_state_change() local
387 con, con->state, sk->sk_state); in ceph_sock_state_change()
395 con_sock_state_closing(con); in ceph_sock_state_change()
396 ceph_con_flag_set(con, CEPH_CON_F_SOCK_CLOSED); in ceph_sock_state_change()
397 queue_con(con); in ceph_sock_state_change()
401 con_sock_state_connected(con); in ceph_sock_state_change()
402 queue_con(con); in ceph_sock_state_change()
413 struct ceph_connection *con) in set_sock_callbacks() argument
416 sk->sk_user_data = con; in set_sock_callbacks()
430 int ceph_tcp_connect(struct ceph_connection *con) in ceph_tcp_connect() argument
432 struct sockaddr_storage ss = con->peer_addr.in_addr; /* align */ in ceph_tcp_connect()
437 dout("%s con %p peer_addr %s\n", __func__, con, in ceph_tcp_connect()
438 ceph_pr_addr(&con->peer_addr)); in ceph_tcp_connect()
439 BUG_ON(con->sock); in ceph_tcp_connect()
443 ret = sock_create_kern(read_pnet(&con->msgr->net), ss.ss_family, in ceph_tcp_connect()
454 set_sock_callbacks(sock, con); in ceph_tcp_connect()
456 con_sock_state_connecting(con); in ceph_tcp_connect()
461 ceph_pr_addr(&con->peer_addr), in ceph_tcp_connect()
465 ceph_pr_addr(&con->peer_addr), ret); in ceph_tcp_connect()
470 if (ceph_test_opt(from_msgr(con->msgr), TCP_NODELAY)) in ceph_tcp_connect()
473 con->sock = sock; in ceph_tcp_connect()
480 int ceph_con_close_socket(struct ceph_connection *con) in ceph_con_close_socket() argument
484 dout("%s con %p sock %p\n", __func__, con, con->sock); in ceph_con_close_socket()
485 if (con->sock) { in ceph_con_close_socket()
486 rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR); in ceph_con_close_socket()
487 sock_release(con->sock); in ceph_con_close_socket()
488 con->sock = NULL; in ceph_con_close_socket()
497 ceph_con_flag_clear(con, CEPH_CON_F_SOCK_CLOSED); in ceph_con_close_socket()
499 con_sock_state_closed(con); in ceph_con_close_socket()
503 static void ceph_con_reset_protocol(struct ceph_connection *con) in ceph_con_reset_protocol() argument
505 dout("%s con %p\n", __func__, con); in ceph_con_reset_protocol()
507 ceph_con_close_socket(con); in ceph_con_reset_protocol()
508 if (con->in_msg) { in ceph_con_reset_protocol()
509 WARN_ON(con->in_msg->con != con); in ceph_con_reset_protocol()
510 ceph_msg_put(con->in_msg); in ceph_con_reset_protocol()
511 con->in_msg = NULL; in ceph_con_reset_protocol()
513 if (con->out_msg) { in ceph_con_reset_protocol()
514 WARN_ON(con->out_msg->con != con); in ceph_con_reset_protocol()
515 ceph_msg_put(con->out_msg); in ceph_con_reset_protocol()
516 con->out_msg = NULL; in ceph_con_reset_protocol()
519 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_reset_protocol()
520 ceph_con_v2_reset_protocol(con); in ceph_con_reset_protocol()
522 ceph_con_v1_reset_protocol(con); in ceph_con_reset_protocol()
545 void ceph_con_reset_session(struct ceph_connection *con) in ceph_con_reset_session() argument
547 dout("%s con %p\n", __func__, con); in ceph_con_reset_session()
549 WARN_ON(con->in_msg); in ceph_con_reset_session()
550 WARN_ON(con->out_msg); in ceph_con_reset_session()
551 ceph_msg_remove_list(&con->out_queue); in ceph_con_reset_session()
552 ceph_msg_remove_list(&con->out_sent); in ceph_con_reset_session()
553 con->out_seq = 0; in ceph_con_reset_session()
554 con->in_seq = 0; in ceph_con_reset_session()
555 con->in_seq_acked = 0; in ceph_con_reset_session()
557 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_reset_session()
558 ceph_con_v2_reset_session(con); in ceph_con_reset_session()
560 ceph_con_v1_reset_session(con); in ceph_con_reset_session()
566 void ceph_con_close(struct ceph_connection *con) in ceph_con_close() argument
568 mutex_lock(&con->mutex); in ceph_con_close()
569 dout("con_close %p peer %s\n", con, ceph_pr_addr(&con->peer_addr)); in ceph_con_close()
570 con->state = CEPH_CON_S_CLOSED; in ceph_con_close()
572 ceph_con_flag_clear(con, CEPH_CON_F_LOSSYTX); /* so we retry next in ceph_con_close()
574 ceph_con_flag_clear(con, CEPH_CON_F_KEEPALIVE_PENDING); in ceph_con_close()
575 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in ceph_con_close()
576 ceph_con_flag_clear(con, CEPH_CON_F_BACKOFF); in ceph_con_close()
578 ceph_con_reset_protocol(con); in ceph_con_close()
579 ceph_con_reset_session(con); in ceph_con_close()
580 cancel_con(con); in ceph_con_close()
581 mutex_unlock(&con->mutex); in ceph_con_close()
588 void ceph_con_open(struct ceph_connection *con, in ceph_con_open() argument
592 mutex_lock(&con->mutex); in ceph_con_open()
593 dout("con_open %p %s\n", con, ceph_pr_addr(addr)); in ceph_con_open()
595 WARN_ON(con->state != CEPH_CON_S_CLOSED); in ceph_con_open()
596 con->state = CEPH_CON_S_PREOPEN; in ceph_con_open()
598 con->peer_name.type = (__u8) entity_type; in ceph_con_open()
599 con->peer_name.num = cpu_to_le64(entity_num); in ceph_con_open()
601 memcpy(&con->peer_addr, addr, sizeof(*addr)); in ceph_con_open()
602 con->delay = 0; /* reset backoff memory */ in ceph_con_open()
603 mutex_unlock(&con->mutex); in ceph_con_open()
604 queue_con(con); in ceph_con_open()
611 bool ceph_con_opened(struct ceph_connection *con) in ceph_con_opened() argument
613 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_opened()
614 return ceph_con_v2_opened(con); in ceph_con_opened()
616 return ceph_con_v1_opened(con); in ceph_con_opened()
622 void ceph_con_init(struct ceph_connection *con, void *private, in ceph_con_init() argument
626 dout("con_init %p\n", con); in ceph_con_init()
627 memset(con, 0, sizeof(*con)); in ceph_con_init()
628 con->private = private; in ceph_con_init()
629 con->ops = ops; in ceph_con_init()
630 con->msgr = msgr; in ceph_con_init()
632 con_sock_state_init(con); in ceph_con_init()
634 mutex_init(&con->mutex); in ceph_con_init()
635 INIT_LIST_HEAD(&con->out_queue); in ceph_con_init()
636 INIT_LIST_HEAD(&con->out_sent); in ceph_con_init()
637 INIT_DELAYED_WORK(&con->work, ceph_con_workfn); in ceph_con_init()
639 con->state = CEPH_CON_S_CLOSED; in ceph_con_init()
662 void ceph_con_discard_sent(struct ceph_connection *con, u64 ack_seq) in ceph_con_discard_sent() argument
667 dout("%s con %p ack_seq %llu\n", __func__, con, ack_seq); in ceph_con_discard_sent()
668 while (!list_empty(&con->out_sent)) { in ceph_con_discard_sent()
669 msg = list_first_entry(&con->out_sent, struct ceph_msg, in ceph_con_discard_sent()
676 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, in ceph_con_discard_sent()
687 void ceph_con_discard_requeued(struct ceph_connection *con, u64 reconnect_seq) in ceph_con_discard_requeued() argument
692 dout("%s con %p reconnect_seq %llu\n", __func__, con, reconnect_seq); in ceph_con_discard_requeued()
693 while (!list_empty(&con->out_queue)) { in ceph_con_discard_requeued()
694 msg = list_first_entry(&con->out_queue, struct ceph_msg, in ceph_con_discard_requeued()
702 dout("%s con %p discarding msg %p seq %llu\n", __func__, con, in ceph_con_discard_requeued()
1355 void ceph_con_process_message(struct ceph_connection *con) in ceph_con_process_message() argument
1357 struct ceph_msg *msg = con->in_msg; in ceph_con_process_message()
1359 BUG_ON(con->in_msg->con != con); in ceph_con_process_message()
1360 con->in_msg = NULL; in ceph_con_process_message()
1363 if (con->peer_name.type == 0) in ceph_con_process_message()
1364 con->peer_name = msg->hdr.src; in ceph_con_process_message()
1366 con->in_seq++; in ceph_con_process_message()
1367 mutex_unlock(&con->mutex); in ceph_con_process_message()
1377 con->in_front_crc, con->in_middle_crc, con->in_data_crc); in ceph_con_process_message()
1378 con->ops->dispatch(con, msg); in ceph_con_process_message()
1380 mutex_lock(&con->mutex); in ceph_con_process_message()
1388 static int queue_con_delay(struct ceph_connection *con, unsigned long delay) in queue_con_delay() argument
1390 if (!con->ops->get(con)) { in queue_con_delay()
1391 dout("%s %p ref count 0\n", __func__, con); in queue_con_delay()
1398 dout("%s %p %lu\n", __func__, con, delay); in queue_con_delay()
1399 if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) { in queue_con_delay()
1400 dout("%s %p - already queued\n", __func__, con); in queue_con_delay()
1401 con->ops->put(con); in queue_con_delay()
1408 static void queue_con(struct ceph_connection *con) in queue_con() argument
1410 (void) queue_con_delay(con, 0); in queue_con()
1413 static void cancel_con(struct ceph_connection *con) in cancel_con() argument
1415 if (cancel_delayed_work(&con->work)) { in cancel_con()
1416 dout("%s %p\n", __func__, con); in cancel_con()
1417 con->ops->put(con); in cancel_con()
1421 static bool con_sock_closed(struct ceph_connection *con) in con_sock_closed() argument
1423 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_SOCK_CLOSED)) in con_sock_closed()
1428 con->error_msg = "socket closed (con state " #x ")"; \ in con_sock_closed()
1431 switch (con->state) { in con_sock_closed()
1453 static bool con_backoff(struct ceph_connection *con) in con_backoff() argument
1457 if (!ceph_con_flag_test_and_clear(con, CEPH_CON_F_BACKOFF)) in con_backoff()
1460 ret = queue_con_delay(con, con->delay); in con_backoff()
1463 con, con->delay); in con_backoff()
1465 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); in con_backoff()
1473 static void con_fault_finish(struct ceph_connection *con) in con_fault_finish() argument
1475 dout("%s %p\n", __func__, con); in con_fault_finish()
1481 if (con->v1.auth_retry) { in con_fault_finish()
1482 dout("auth_retry %d, invalidating\n", con->v1.auth_retry); in con_fault_finish()
1483 if (con->ops->invalidate_authorizer) in con_fault_finish()
1484 con->ops->invalidate_authorizer(con); in con_fault_finish()
1485 con->v1.auth_retry = 0; in con_fault_finish()
1488 if (con->ops->fault) in con_fault_finish()
1489 con->ops->fault(con); in con_fault_finish()
1497 struct ceph_connection *con = container_of(work, struct ceph_connection, in ceph_con_workfn() local
1501 mutex_lock(&con->mutex); in ceph_con_workfn()
1505 if ((fault = con_sock_closed(con))) { in ceph_con_workfn()
1506 dout("%s: con %p SOCK_CLOSED\n", __func__, con); in ceph_con_workfn()
1509 if (con_backoff(con)) { in ceph_con_workfn()
1510 dout("%s: con %p BACKOFF\n", __func__, con); in ceph_con_workfn()
1513 if (con->state == CEPH_CON_S_STANDBY) { in ceph_con_workfn()
1514 dout("%s: con %p STANDBY\n", __func__, con); in ceph_con_workfn()
1517 if (con->state == CEPH_CON_S_CLOSED) { in ceph_con_workfn()
1518 dout("%s: con %p CLOSED\n", __func__, con); in ceph_con_workfn()
1519 BUG_ON(con->sock); in ceph_con_workfn()
1522 if (con->state == CEPH_CON_S_PREOPEN) { in ceph_con_workfn()
1523 dout("%s: con %p PREOPEN\n", __func__, con); in ceph_con_workfn()
1524 BUG_ON(con->sock); in ceph_con_workfn()
1527 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_workfn()
1528 ret = ceph_con_v2_try_read(con); in ceph_con_workfn()
1530 ret = ceph_con_v1_try_read(con); in ceph_con_workfn()
1534 if (!con->error_msg) in ceph_con_workfn()
1535 con->error_msg = "socket error on read"; in ceph_con_workfn()
1540 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_con_workfn()
1541 ret = ceph_con_v2_try_write(con); in ceph_con_workfn()
1543 ret = ceph_con_v1_try_write(con); in ceph_con_workfn()
1547 if (!con->error_msg) in ceph_con_workfn()
1548 con->error_msg = "socket error on write"; in ceph_con_workfn()
1555 con_fault(con); in ceph_con_workfn()
1556 mutex_unlock(&con->mutex); in ceph_con_workfn()
1559 con_fault_finish(con); in ceph_con_workfn()
1561 con->ops->put(con); in ceph_con_workfn()
1568 static void con_fault(struct ceph_connection *con) in con_fault() argument
1571 con, con->state, ceph_pr_addr(&con->peer_addr)); in con_fault()
1573 pr_warn("%s%lld %s %s\n", ENTITY_NAME(con->peer_name), in con_fault()
1574 ceph_pr_addr(&con->peer_addr), con->error_msg); in con_fault()
1575 con->error_msg = NULL; in con_fault()
1577 WARN_ON(con->state == CEPH_CON_S_STANDBY || in con_fault()
1578 con->state == CEPH_CON_S_CLOSED); in con_fault()
1580 ceph_con_reset_protocol(con); in con_fault()
1582 if (ceph_con_flag_test(con, CEPH_CON_F_LOSSYTX)) { in con_fault()
1584 con->state = CEPH_CON_S_CLOSED; in con_fault()
1589 list_splice_init(&con->out_sent, &con->out_queue); in con_fault()
1593 if (list_empty(&con->out_queue) && in con_fault()
1594 !ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)) { in con_fault()
1595 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); in con_fault()
1596 ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING); in con_fault()
1597 con->state = CEPH_CON_S_STANDBY; in con_fault()
1600 con->state = CEPH_CON_S_PREOPEN; in con_fault()
1601 if (!con->delay) { in con_fault()
1602 con->delay = BASE_DELAY_INTERVAL; in con_fault()
1603 } else if (con->delay < MAX_DELAY_INTERVAL) { in con_fault()
1604 con->delay *= 2; in con_fault()
1605 if (con->delay > MAX_DELAY_INTERVAL) in con_fault()
1606 con->delay = MAX_DELAY_INTERVAL; in con_fault()
1608 ceph_con_flag_set(con, CEPH_CON_F_BACKOFF); in con_fault()
1609 queue_con(con); in con_fault()
1658 static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con) in msg_con_set() argument
1660 if (msg->con) in msg_con_set()
1661 msg->con->ops->put(msg->con); in msg_con_set()
1663 msg->con = con ? con->ops->get(con) : NULL; in msg_con_set()
1664 BUG_ON(msg->con != con); in msg_con_set()
1667 static void clear_standby(struct ceph_connection *con) in clear_standby() argument
1670 if (con->state == CEPH_CON_S_STANDBY) { in clear_standby()
1671 dout("clear_standby %p and ++connect_seq\n", con); in clear_standby()
1672 con->state = CEPH_CON_S_PREOPEN; in clear_standby()
1673 con->v1.connect_seq++; in clear_standby()
1674 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_WRITE_PENDING)); in clear_standby()
1675 WARN_ON(ceph_con_flag_test(con, CEPH_CON_F_KEEPALIVE_PENDING)); in clear_standby()
1684 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) in ceph_con_send() argument
1687 msg->hdr.src = con->msgr->inst.name; in ceph_con_send()
1691 mutex_lock(&con->mutex); in ceph_con_send()
1693 if (con->state == CEPH_CON_S_CLOSED) { in ceph_con_send()
1694 dout("con_send %p closed, dropping %p\n", con, msg); in ceph_con_send()
1696 mutex_unlock(&con->mutex); in ceph_con_send()
1700 msg_con_set(msg, con); in ceph_con_send()
1703 list_add_tail(&msg->list_head, &con->out_queue); in ceph_con_send()
1705 ENTITY_NAME(con->peer_name), le16_to_cpu(msg->hdr.type), in ceph_con_send()
1711 clear_standby(con); in ceph_con_send()
1712 mutex_unlock(&con->mutex); in ceph_con_send()
1716 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) in ceph_con_send()
1717 queue_con(con); in ceph_con_send()
1726 struct ceph_connection *con = msg->con; in ceph_msg_revoke() local
1728 if (!con) { in ceph_msg_revoke()
1733 mutex_lock(&con->mutex); in ceph_msg_revoke()
1735 WARN_ON(con->out_msg == msg); in ceph_msg_revoke()
1736 dout("%s con %p msg %p not linked\n", __func__, con, msg); in ceph_msg_revoke()
1737 mutex_unlock(&con->mutex); in ceph_msg_revoke()
1741 dout("%s con %p msg %p was linked\n", __func__, con, msg); in ceph_msg_revoke()
1745 if (con->out_msg == msg) { in ceph_msg_revoke()
1746 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_msg_revoke()
1747 dout("%s con %p msg %p was sending\n", __func__, con, msg); in ceph_msg_revoke()
1748 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_msg_revoke()
1749 ceph_con_v2_revoke(con); in ceph_msg_revoke()
1751 ceph_con_v1_revoke(con); in ceph_msg_revoke()
1752 ceph_msg_put(con->out_msg); in ceph_msg_revoke()
1753 con->out_msg = NULL; in ceph_msg_revoke()
1756 con, msg, con->out_msg); in ceph_msg_revoke()
1758 mutex_unlock(&con->mutex); in ceph_msg_revoke()
1766 struct ceph_connection *con = msg->con; in ceph_msg_revoke_incoming() local
1768 if (!con) { in ceph_msg_revoke_incoming()
1773 mutex_lock(&con->mutex); in ceph_msg_revoke_incoming()
1774 if (con->in_msg == msg) { in ceph_msg_revoke_incoming()
1775 WARN_ON(con->state != CEPH_CON_S_OPEN); in ceph_msg_revoke_incoming()
1776 dout("%s con %p msg %p was recving\n", __func__, con, msg); in ceph_msg_revoke_incoming()
1777 if (ceph_msgr2(from_msgr(con->msgr))) in ceph_msg_revoke_incoming()
1778 ceph_con_v2_revoke_incoming(con); in ceph_msg_revoke_incoming()
1780 ceph_con_v1_revoke_incoming(con); in ceph_msg_revoke_incoming()
1781 ceph_msg_put(con->in_msg); in ceph_msg_revoke_incoming()
1782 con->in_msg = NULL; in ceph_msg_revoke_incoming()
1785 con, msg, con->in_msg); in ceph_msg_revoke_incoming()
1787 mutex_unlock(&con->mutex); in ceph_msg_revoke_incoming()
1793 void ceph_con_keepalive(struct ceph_connection *con) in ceph_con_keepalive() argument
1795 dout("con_keepalive %p\n", con); in ceph_con_keepalive()
1796 mutex_lock(&con->mutex); in ceph_con_keepalive()
1797 clear_standby(con); in ceph_con_keepalive()
1798 ceph_con_flag_set(con, CEPH_CON_F_KEEPALIVE_PENDING); in ceph_con_keepalive()
1799 mutex_unlock(&con->mutex); in ceph_con_keepalive()
1801 if (!ceph_con_flag_test_and_set(con, CEPH_CON_F_WRITE_PENDING)) in ceph_con_keepalive()
1802 queue_con(con); in ceph_con_keepalive()
1806 bool ceph_con_keepalive_expired(struct ceph_connection *con, in ceph_con_keepalive_expired() argument
1810 (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) { in ceph_con_keepalive_expired()
1815 ts = timespec64_add(con->last_keepalive_ack, ts); in ceph_con_keepalive_expired()
1976 static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg) in ceph_alloc_middle() argument
2007 int ceph_con_in_msg_alloc(struct ceph_connection *con, in ceph_con_in_msg_alloc() argument
2014 BUG_ON(con->in_msg != NULL); in ceph_con_in_msg_alloc()
2015 BUG_ON(!con->ops->alloc_msg); in ceph_con_in_msg_alloc()
2017 mutex_unlock(&con->mutex); in ceph_con_in_msg_alloc()
2018 msg = con->ops->alloc_msg(con, hdr, skip); in ceph_con_in_msg_alloc()
2019 mutex_lock(&con->mutex); in ceph_con_in_msg_alloc()
2020 if (con->state != CEPH_CON_S_OPEN) { in ceph_con_in_msg_alloc()
2027 msg_con_set(msg, con); in ceph_con_in_msg_alloc()
2028 con->in_msg = msg; in ceph_con_in_msg_alloc()
2038 con->error_msg = "error allocating memory for incoming message"; in ceph_con_in_msg_alloc()
2041 memcpy(&con->in_msg->hdr, hdr, sizeof(*hdr)); in ceph_con_in_msg_alloc()
2043 if (middle_len && !con->in_msg->middle) { in ceph_con_in_msg_alloc()
2044 ret = ceph_alloc_middle(con, con->in_msg); in ceph_con_in_msg_alloc()
2046 ceph_msg_put(con->in_msg); in ceph_con_in_msg_alloc()
2047 con->in_msg = NULL; in ceph_con_in_msg_alloc()
2054 void ceph_con_get_out_msg(struct ceph_connection *con) in ceph_con_get_out_msg() argument
2058 BUG_ON(list_empty(&con->out_queue)); in ceph_con_get_out_msg()
2059 msg = list_first_entry(&con->out_queue, struct ceph_msg, list_head); in ceph_con_get_out_msg()
2060 WARN_ON(msg->con != con); in ceph_con_get_out_msg()
2066 list_move_tail(&msg->list_head, &con->out_sent); in ceph_con_get_out_msg()
2073 msg->hdr.seq = cpu_to_le64(++con->out_seq); in ceph_con_get_out_msg()
2076 if (con->ops->reencode_message) in ceph_con_get_out_msg()
2077 con->ops->reencode_message(msg); in ceph_con_get_out_msg()
2084 WARN_ON(con->out_msg); in ceph_con_get_out_msg()
2085 con->out_msg = ceph_msg_get(msg); in ceph_con_get_out_msg()