// Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include "../include/endpoint_definition.hpp" #include "../include/endpoint_host.hpp" #include "../../routing/include/routing_host.hpp" #include "../include/tcp_server_endpoint_impl.hpp" #include "../../utility/include/utility.hpp" #include "../../utility/include/byteorder.hpp" namespace ip = boost::asio::ip; namespace vsomeip_v3 { tcp_server_endpoint_impl::tcp_server_endpoint_impl( const std::shared_ptr& _endpoint_host, const std::shared_ptr& _routing_host, const endpoint_type& _local, boost::asio::io_service &_io, const std::shared_ptr& _configuration) : tcp_server_endpoint_base_impl(_endpoint_host, _routing_host, _local, _io, _configuration->get_max_message_size_reliable(_local.address().to_string(), _local.port()), _configuration->get_endpoint_queue_limit(_local.address().to_string(), _local.port()), _configuration), acceptor_(_io), buffer_shrink_threshold_(configuration_->get_buffer_shrink_threshold()), local_port_(_local.port()), // send timeout after 2/3 of configured ttl, warning after 1/3 send_timeout_(configuration_->get_sd_ttl() * 666) { is_supporting_magic_cookies_ = true; boost::system::error_code ec; acceptor_.open(_local.protocol(), ec); boost::asio::detail::throw_error(ec, "acceptor open"); acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); boost::asio::detail::throw_error(ec, "acceptor set_option"); #ifndef _WIN32 // If specified, bind to device std::string its_device(configuration_->get_device()); if (its_device != "") { if (setsockopt(acceptor_.native_handle(), SOL_SOCKET, SO_BINDTODEVICE, its_device.c_str(), (socklen_t)its_device.size()) == -1) { VSOMEIP_WARNING << "TCP Server: Could not bind to device \"" << its_device << "\""; } } #endif acceptor_.bind(_local, ec); boost::asio::detail::throw_error(ec, "acceptor bind"); acceptor_.listen(boost::asio::socket_base::max_connections, ec); boost::asio::detail::throw_error(ec, "acceptor listen"); } tcp_server_endpoint_impl::~tcp_server_endpoint_impl() { } bool tcp_server_endpoint_impl::is_local() const { return false; } void tcp_server_endpoint_impl::start() { std::lock_guard its_lock(acceptor_mutex_); if (acceptor_.is_open()) { connection::ptr new_connection = connection::create( std::dynamic_pointer_cast( shared_from_this()), max_message_size_, buffer_shrink_threshold_, has_enabled_magic_cookies_, service_, send_timeout_); { std::unique_lock its_socket_lock(new_connection->get_socket_lock()); acceptor_.async_accept(new_connection->get_socket(), std::bind(&tcp_server_endpoint_impl::accept_cbk, std::dynamic_pointer_cast( shared_from_this()), new_connection, std::placeholders::_1)); } } } void tcp_server_endpoint_impl::stop() { server_endpoint_impl::stop(); { std::lock_guard its_lock(acceptor_mutex_); if(acceptor_.is_open()) { boost::system::error_code its_error; acceptor_.close(its_error); } } { std::lock_guard its_lock(connections_mutex_); for (const auto &c : connections_) { c.second->stop(); } connections_.clear(); } } bool tcp_server_endpoint_impl::send_to( const std::shared_ptr _target, const byte_t *_data, uint32_t _size) { std::lock_guard its_lock(mutex_); endpoint_type its_target(_target->get_address(), _target->get_port()); return send_intern(its_target, _data, _size); } bool tcp_server_endpoint_impl::send_error( const std::shared_ptr _target, const byte_t *_data, uint32_t _size) { bool ret(false); std::lock_guard its_lock(mutex_); const endpoint_type its_target(_target->get_address(), _target->get_port()); const queue_iterator_type target_queue_iterator(find_or_create_queue_unlocked(its_target)); auto& its_qpair = target_queue_iterator->second; const bool queue_size_zero_on_entry(its_qpair.second.empty()); if (check_message_size(nullptr, _size, its_target) == endpoint_impl::cms_ret_e::MSG_OK && check_queue_limit(_data, _size, its_qpair.first)) { its_qpair.second.emplace_back( std::make_shared(_data, _data + _size)); its_qpair.first += _size; if (queue_size_zero_on_entry) { // no writing in progress send_queued(target_queue_iterator); } ret = true; } return ret; } void tcp_server_endpoint_impl::send_queued(const queue_iterator_type _queue_iterator) { connection::ptr its_connection; { std::lock_guard its_lock(connections_mutex_); auto connection_iterator = connections_.find(_queue_iterator->first); if (connection_iterator != connections_.end()) { its_connection = connection_iterator->second; } else { VSOMEIP_INFO << "Didn't find connection: " << _queue_iterator->first.address().to_string() << ":" << std::dec << static_cast(_queue_iterator->first.port()) << " dropping outstanding messages (" << std::dec << _queue_iterator->second.second.size() << ")."; if (_queue_iterator->second.second.size()) { std::set its_services; // check all outstanding messages of this connection // whether stop handlers need to be called for (const auto &its_buffer : _queue_iterator->second.second) { if (its_buffer && its_buffer->size() > VSOMEIP_SESSION_POS_MAX) { service_t its_service = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); its_services.insert(its_service); } } for (auto its_service : its_services) { auto found_cbk = prepare_stop_handlers_.find(its_service); if (found_cbk != prepare_stop_handlers_.end()) { VSOMEIP_INFO << "Calling prepare stop handler " << "for service: 0x" << std::hex << std::setw(4) << std::setfill('0') << its_service; auto handler = found_cbk->second; auto ptr = this->shared_from_this(); service_.post([ptr, handler, its_service](){ handler(ptr, its_service); }); prepare_stop_handlers_.erase(found_cbk); } } } queues_.erase(_queue_iterator->first); } } if (its_connection) { its_connection->send_queued(_queue_iterator); } } void tcp_server_endpoint_impl::get_configured_times_from_endpoint( service_t _service, method_t _method, std::chrono::nanoseconds *_debouncing, std::chrono::nanoseconds *_maximum_retention) const { configuration_->get_configured_timing_responses(_service, tcp_server_endpoint_base_impl::local_.address().to_string(), tcp_server_endpoint_base_impl::local_.port(), _method, _debouncing, _maximum_retention); } bool tcp_server_endpoint_impl::is_established(const std::shared_ptr& _endpoint) { bool is_connected = false; endpoint_type endpoint(_endpoint->get_address(), _endpoint->get_port()); { std::lock_guard its_lock(connections_mutex_); auto connection_iterator = connections_.find(endpoint); if (connection_iterator != connections_.end()) { is_connected = true; } else { VSOMEIP_INFO << "Didn't find TCP connection: Subscription " << "rejected for: " << endpoint.address().to_string() << ":" << std::dec << static_cast(endpoint.port()); } } return is_connected; } bool tcp_server_endpoint_impl::get_default_target(service_t, tcp_server_endpoint_impl::endpoint_type &) const { return false; } void tcp_server_endpoint_impl::remove_connection( tcp_server_endpoint_impl::connection *_connection) { std::lock_guard its_lock(connections_mutex_); for (auto it = connections_.begin(); it != connections_.end();) { if (it->second.get() == _connection) { it = connections_.erase(it); break; } else { ++it; } } } void tcp_server_endpoint_impl::accept_cbk(const connection::ptr& _connection, boost::system::error_code const &_error) { if (!_error) { boost::system::error_code its_error; endpoint_type remote; { std::unique_lock its_socket_lock(_connection->get_socket_lock()); socket_type &new_connection_socket = _connection->get_socket(); remote = new_connection_socket.remote_endpoint(its_error); _connection->set_remote_info(remote); // Nagle algorithm off new_connection_socket.set_option(ip::tcp::no_delay(true), its_error); new_connection_socket.set_option(boost::asio::socket_base::keep_alive(true), its_error); if (its_error) { VSOMEIP_WARNING << "tcp_server_endpoint::connect: couldn't enable " << "keep_alive: " << its_error.message(); } } if (!its_error) { { std::lock_guard its_lock(connections_mutex_); connections_[remote] = _connection; } _connection->start(); } } if (_error != boost::asio::error::bad_descriptor && _error != boost::asio::error::operation_aborted && _error != boost::asio::error::no_descriptors) { start(); } else if (_error == boost::asio::error::no_descriptors) { VSOMEIP_ERROR<< "tcp_server_endpoint_impl::accept_cbk: " << _error.message() << " (" << std::dec << _error.value() << ") Will try to accept again in 1000ms"; std::shared_ptr its_timer = std::make_shared(service_, std::chrono::milliseconds(1000)); auto its_ep = std::dynamic_pointer_cast( shared_from_this()); its_timer->async_wait([its_timer, its_ep] (const boost::system::error_code& _error) { if (!_error) { its_ep->start(); } }); } } std::uint16_t tcp_server_endpoint_impl::get_local_port() const { return local_port_; } void tcp_server_endpoint_impl::set_local_port(std::uint16_t _port) { (void)_port; } bool tcp_server_endpoint_impl::is_reliable() const { return true; } /////////////////////////////////////////////////////////////////////////////// // class tcp_service_impl::connection /////////////////////////////////////////////////////////////////////////////// tcp_server_endpoint_impl::connection::connection( const std::weak_ptr& _server, std::uint32_t _max_message_size, std::uint32_t _recv_buffer_size_initial, std::uint32_t _buffer_shrink_threshold, bool _magic_cookies_enabled, boost::asio::io_service &_io_service, std::chrono::milliseconds _send_timeout) : socket_(_io_service), server_(_server), max_message_size_(_max_message_size), recv_buffer_size_initial_(_recv_buffer_size_initial), recv_buffer_(_recv_buffer_size_initial, 0), recv_buffer_size_(0), missing_capacity_(0), shrink_count_(0), buffer_shrink_threshold_(_buffer_shrink_threshold), remote_port_(0), magic_cookies_enabled_(_magic_cookies_enabled), last_cookie_sent_(std::chrono::steady_clock::now() - std::chrono::seconds(11)), send_timeout_(_send_timeout), send_timeout_warning_(_send_timeout / 2) { } tcp_server_endpoint_impl::connection::ptr tcp_server_endpoint_impl::connection::create( const std::weak_ptr& _server, std::uint32_t _max_message_size, std::uint32_t _buffer_shrink_threshold, bool _magic_cookies_enabled, boost::asio::io_service & _io_service, std::chrono::milliseconds _send_timeout) { const std::uint32_t its_initial_receveive_buffer_size = VSOMEIP_SOMEIP_HEADER_SIZE + 8 + MAGIC_COOKIE_SIZE + 8; return ptr(new connection(_server, _max_message_size, its_initial_receveive_buffer_size, _buffer_shrink_threshold, _magic_cookies_enabled, _io_service, _send_timeout)); } tcp_server_endpoint_impl::socket_type & tcp_server_endpoint_impl::connection::get_socket() { return socket_; } std::unique_lock tcp_server_endpoint_impl::connection::get_socket_lock() { return std::unique_lock(socket_mutex_); } void tcp_server_endpoint_impl::connection::start() { receive(); } void tcp_server_endpoint_impl::connection::receive() { std::lock_guard its_lock(socket_mutex_); if(socket_.is_open()) { const std::size_t its_capacity(recv_buffer_.capacity()); size_t buffer_size = its_capacity - recv_buffer_size_; try { if (missing_capacity_) { if (missing_capacity_ > MESSAGE_SIZE_UNLIMITED) { VSOMEIP_ERROR << "Missing receive buffer capacity exceeds allowed maximum!"; return; } const std::size_t its_required_capacity(recv_buffer_size_ + missing_capacity_); if (its_capacity < its_required_capacity) { recv_buffer_.reserve(its_required_capacity); recv_buffer_.resize(its_required_capacity, 0x0); if (recv_buffer_.size() > 1048576) { VSOMEIP_INFO << "tse: recv_buffer size is: " << recv_buffer_.size() << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } } buffer_size = missing_capacity_; missing_capacity_ = 0; } else if (buffer_shrink_threshold_ && shrink_count_ > buffer_shrink_threshold_ && recv_buffer_size_ == 0) { recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); buffer_size = recv_buffer_size_initial_; shrink_count_ = 0; } } catch (const std::exception &e) { handle_recv_buffer_exception(e); // don't start receiving again return; } socket_.async_receive(boost::asio::buffer(&recv_buffer_[recv_buffer_size_], buffer_size), std::bind(&tcp_server_endpoint_impl::connection::receive_cbk, shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } } void tcp_server_endpoint_impl::connection::stop() { std::lock_guard its_lock(socket_mutex_); if (socket_.is_open()) { boost::system::error_code its_error; socket_.shutdown(socket_.shutdown_both, its_error); socket_.close(its_error); } } void tcp_server_endpoint_impl::connection::send_queued( const queue_iterator_type _queue_iterator) { std::shared_ptr its_server(server_.lock()); if (!its_server) { VSOMEIP_TRACE << "tcp_server_endpoint_impl::connection::send_queued " " couldn't lock server_"; return; } message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); const service_t its_service = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SERVICE_POS_MIN], (*its_buffer)[VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_METHOD_POS_MIN], (*its_buffer)[VSOMEIP_METHOD_POS_MAX]); const client_t its_client = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_CLIENT_POS_MIN], (*its_buffer)[VSOMEIP_CLIENT_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( (*its_buffer)[VSOMEIP_SESSION_POS_MIN], (*its_buffer)[VSOMEIP_SESSION_POS_MAX]); if (magic_cookies_enabled_) { const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); if (std::chrono::duration_cast( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { if (send_magic_cookie(its_buffer)) { last_cookie_sent_ = now; _queue_iterator->second.first += sizeof(SERVICE_COOKIE); } } } { std::lock_guard its_lock(socket_mutex_); { std::lock_guard its_sent_lock(its_server->sent_mutex_); its_server->is_sending_ = true; } boost::asio::async_write(socket_, boost::asio::buffer(*its_buffer), std::bind(&tcp_server_endpoint_impl::connection::write_completion_condition, shared_from_this(), std::placeholders::_1, std::placeholders::_2, its_buffer->size(), its_service, its_method, its_client, its_session, std::chrono::steady_clock::now()), std::bind(&tcp_server_endpoint_base_impl::send_cbk, its_server, _queue_iterator, std::placeholders::_1, std::placeholders::_2)); } } void tcp_server_endpoint_impl::connection::send_queued_sync( const queue_iterator_type _queue_iterator) { message_buffer_ptr_t its_buffer = _queue_iterator->second.second.front(); if (magic_cookies_enabled_) { const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); if (std::chrono::duration_cast( now - last_cookie_sent_) > std::chrono::milliseconds(10000)) { if (send_magic_cookie(its_buffer)) { last_cookie_sent_ = now; _queue_iterator->second.first += sizeof(SERVICE_COOKIE); } } } try { std::lock_guard its_lock(socket_mutex_); boost::asio::write(socket_, boost::asio::buffer(*its_buffer)); } catch (const boost::system::system_error &e) { if (e.code() != boost::asio::error::broken_pipe) { VSOMEIP_ERROR << "tcp_server_endpoint_impl::connection::" << __func__ << " " << e.what(); } } } bool tcp_server_endpoint_impl::connection::send_magic_cookie( message_buffer_ptr_t &_buffer) { if (max_message_size_ == MESSAGE_SIZE_UNLIMITED || max_message_size_ - _buffer->size() >= VSOMEIP_SOMEIP_HEADER_SIZE + VSOMEIP_SOMEIP_MAGIC_COOKIE_SIZE) { _buffer->insert(_buffer->begin(), SERVICE_COOKIE, SERVICE_COOKIE + sizeof(SERVICE_COOKIE)); return true; } return false; } bool tcp_server_endpoint_impl::connection::is_magic_cookie(size_t _offset) const { return (0 == std::memcmp(CLIENT_COOKIE, &recv_buffer_[_offset], sizeof(CLIENT_COOKIE))); } void tcp_server_endpoint_impl::connection::receive_cbk( boost::system::error_code const &_error, std::size_t _bytes) { if (_error == boost::asio::error::operation_aborted) { // endpoint was stopped return; } std::shared_ptr its_server(server_.lock()); if (!its_server) { VSOMEIP_ERROR << "tcp_server_endpoint_impl::connection::receive_cbk " " couldn't lock server_"; return; } #if 0 std::stringstream msg; for (std::size_t i = 0; i < _bytes + recv_buffer_size_; ++i) msg << std::hex << std::setw(2) << std::setfill('0') << (int) recv_buffer_[i] << " "; VSOMEIP_INFO << msg.str(); #endif std::shared_ptr its_host = its_server->routing_host_.lock(); if (its_host) { if (!_error && 0 < _bytes) { if (recv_buffer_size_ + _bytes < recv_buffer_size_) { VSOMEIP_ERROR << "receive buffer overflow in tcp client endpoint ~> abort!"; return; } recv_buffer_size_ += _bytes; size_t its_iteration_gap = 0; bool has_full_message; do { uint64_t read_message_size = utility::get_message_size(&recv_buffer_[its_iteration_gap], recv_buffer_size_); if (read_message_size > MESSAGE_SIZE_UNLIMITED) { VSOMEIP_ERROR << "Message size exceeds allowed maximum!"; return; } uint32_t current_message_size = static_cast(read_message_size); has_full_message = (current_message_size > VSOMEIP_RETURN_CODE_POS && current_message_size <= recv_buffer_size_); if (has_full_message) { bool needs_forwarding(true); if (is_magic_cookie(its_iteration_gap)) { magic_cookies_enabled_ = true; } else { if (magic_cookies_enabled_) { uint32_t its_offset = its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_); if (its_offset < current_message_size) { { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing." << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } if (!is_magic_cookie(its_iteration_gap)) { auto its_endpoint_host = its_server->endpoint_host_.lock(); if (its_endpoint_host) { its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap], static_cast(recv_buffer_size_),its_server.get(), remote_address_, remote_port_); } } current_message_size = its_offset; needs_forwarding = false; } } } if (needs_forwarding) { if (utility::is_request( recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) { const client_t its_client = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_CLIENT_POS_MAX]); if (its_client != MAGIC_COOKIE_CLIENT) { const service_t its_service = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_SERVICE_POS_MAX]); const method_t its_method = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_METHOD_POS_MAX]); const session_t its_session = VSOMEIP_BYTES_TO_WORD( recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MIN], recv_buffer_[its_iteration_gap + VSOMEIP_SESSION_POS_MAX]); std::lock_guard its_requests_guard(its_server->requests_mutex_); its_server->requests_[its_client] [std::make_tuple(its_service, its_method, its_session)] = remote_; } } if (!magic_cookies_enabled_) { its_host->on_message(&recv_buffer_[its_iteration_gap], current_message_size, its_server.get(), boost::asio::ip::address(), VSOMEIP_ROUTING_CLIENT, std::make_pair(ANY_UID, ANY_GID), remote_address_, remote_port_); } else { // Only call on_message without a magic cookie in front of the buffer! if (!is_magic_cookie(its_iteration_gap)) { its_host->on_message(&recv_buffer_[its_iteration_gap], current_message_size, its_server.get(), boost::asio::ip::address(), VSOMEIP_ROUTING_CLIENT, std::make_pair(ANY_UID, ANY_GID), remote_address_, remote_port_); } } } calculate_shrink_count(); missing_capacity_ = 0; recv_buffer_size_ -= current_message_size; its_iteration_gap += current_message_size; } else if (magic_cookies_enabled_ && recv_buffer_size_ > 0) { uint32_t its_offset = its_server->find_magic_cookie(&recv_buffer_[its_iteration_gap], recv_buffer_size_); if (its_offset < recv_buffer_size_) { { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "Detected Magic Cookie within message data. Resyncing." << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } if (!is_magic_cookie(its_iteration_gap)) { auto its_endpoint_host = its_server->endpoint_host_.lock(); if (its_endpoint_host) { its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap], static_cast(recv_buffer_size_), its_server.get(), remote_address_, remote_port_); } } recv_buffer_size_ -= its_offset; its_iteration_gap += its_offset; has_full_message = true; // trigger next loop if (!is_magic_cookie(its_iteration_gap)) { auto its_endpoint_host = its_server->endpoint_host_.lock(); if (its_endpoint_host) { its_endpoint_host->on_error(&recv_buffer_[its_iteration_gap], static_cast(recv_buffer_size_), its_server.get(), remote_address_, remote_port_); } } } } if (!has_full_message) { if (recv_buffer_size_ > VSOMEIP_RETURN_CODE_POS && (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION || !utility::is_valid_message_type(static_cast(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS])) || !utility::is_valid_return_code(static_cast(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS])) )) { if (recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS] != VSOMEIP_PROTOCOL_VERSION) { { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "tse: Wrong protocol version: 0x" << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_PROTOCOL_VERSION_POS]) << " local: " << get_address_port_local() << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } // ensure to send back a error message w/ wrong protocol version its_host->on_message(&recv_buffer_[its_iteration_gap], VSOMEIP_SOMEIP_HEADER_SIZE + 8, its_server.get(), boost::asio::ip::address(), VSOMEIP_ROUTING_CLIENT, std::make_pair(ANY_UID, ANY_GID), remote_address_, remote_port_); } else if (!utility::is_valid_message_type(static_cast( recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]))) { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "tse: Invalid message type: 0x" << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_MESSAGE_TYPE_POS]) << " local: " << get_address_port_local() << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } else if (!utility::is_valid_return_code(static_cast( recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]))) { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "tse: Invalid return code: 0x" << std::hex << std::setw(2) << std::setfill('0') << std::uint32_t(recv_buffer_[its_iteration_gap + VSOMEIP_RETURN_CODE_POS]) << " local: " << get_address_port_local() << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } wait_until_sent(boost::asio::error::operation_aborted); return; } else if (max_message_size_ != MESSAGE_SIZE_UNLIMITED && current_message_size > max_message_size_) { recv_buffer_size_ = 0; recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); if (magic_cookies_enabled_) { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "Received a TCP message which exceeds " << "maximum message size (" << std::dec << current_message_size << " > " << std::dec << max_message_size_ << "). Magic Cookies are enabled: " << "Resetting receiver. local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } else { { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "Received a TCP message which exceeds " << "maximum message size (" << std::dec << current_message_size << " > " << std::dec << max_message_size_ << ") Magic cookies are disabled: " << "Connection will be closed! local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } wait_until_sent(boost::asio::error::operation_aborted); return; } } else if (current_message_size > recv_buffer_size_) { missing_capacity_ = current_message_size - static_cast(recv_buffer_size_); } else if (VSOMEIP_SOMEIP_HEADER_SIZE > recv_buffer_size_) { missing_capacity_ = VSOMEIP_SOMEIP_HEADER_SIZE - static_cast(recv_buffer_size_); } else if (magic_cookies_enabled_ && recv_buffer_size_ > 0) { // no need to check for magic cookie here again: has_full_message // would have been set to true if there was one present in the data recv_buffer_size_ = 0; recv_buffer_.resize(recv_buffer_size_initial_, 0x0); recv_buffer_.shrink_to_fit(); missing_capacity_ = 0; std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "Didn't find magic cookie in broken" << " data, trying to resync." << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } else { { std::lock_guard its_lock(socket_mutex_); VSOMEIP_ERROR << "tse::c<" << this << ">rcb: recv_buffer_size is: " << std::dec << recv_buffer_size_ << " but couldn't read " "out message_size. recv_buffer_capacity: " << recv_buffer_.capacity() << " its_iteration_gap: " << its_iteration_gap << "local: " << get_address_port_local() << " remote: " << get_address_port_remote() << ". Closing connection due to missing/broken data TCP stream."; } wait_until_sent(boost::asio::error::operation_aborted); return; } } } while (has_full_message && recv_buffer_size_); if (its_iteration_gap) { // Copy incomplete message to front for next receive_cbk iteration for (size_t i = 0; i < recv_buffer_size_; ++i) { recv_buffer_[i] = recv_buffer_[i + its_iteration_gap]; } // Still more capacity needed after shifting everything to front? if (missing_capacity_ && missing_capacity_ <= recv_buffer_.capacity() - recv_buffer_size_) { missing_capacity_ = 0; } } receive(); } } if (_error == boost::asio::error::eof || _error == boost::asio::error::connection_reset || _error == boost::asio::error::timed_out) { if(_error == boost::asio::error::timed_out) { std::lock_guard its_lock(socket_mutex_); VSOMEIP_WARNING << "tcp_server_endpoint receive_cbk: " << _error.message() << " local: " << get_address_port_local() << " remote: " << get_address_port_remote(); } wait_until_sent(boost::asio::error::operation_aborted); } } void tcp_server_endpoint_impl::connection::calculate_shrink_count() { if (buffer_shrink_threshold_) { if (recv_buffer_.capacity() != recv_buffer_size_initial_) { if (recv_buffer_size_ < (recv_buffer_.capacity() >> 1)) { shrink_count_++; } else { shrink_count_ = 0; } } } } void tcp_server_endpoint_impl::connection::set_remote_info( const endpoint_type &_remote) { remote_ = _remote; remote_address_ = _remote.address(); remote_port_ = _remote.port(); } const std::string tcp_server_endpoint_impl::connection::get_address_port_remote() const { std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; its_address_port += remote_address_.to_string(ec); its_address_port += ":"; its_address_port += std::to_string(remote_port_); return its_address_port; } const std::string tcp_server_endpoint_impl::connection::get_address_port_local() const { std::string its_address_port; its_address_port.reserve(21); boost::system::error_code ec; if (socket_.is_open()) { endpoint_type its_local_endpoint = socket_.local_endpoint(ec); if (!ec) { its_address_port += its_local_endpoint.address().to_string(ec); its_address_port += ":"; its_address_port += std::to_string(its_local_endpoint.port()); } } return its_address_port; } void tcp_server_endpoint_impl::connection::handle_recv_buffer_exception( const std::exception &_e) { std::stringstream its_message; its_message <<"tcp_server_endpoint_impl::connection catched exception" << _e.what() << " local: " << get_address_port_local() << " remote: " << get_address_port_remote() << " shutting down connection. Start of buffer: "; for (std::size_t i = 0; i < recv_buffer_size_ && i < 16; i++) { its_message << std::setw(2) << std::setfill('0') << std::hex << (int) (recv_buffer_[i]) << " "; } its_message << " Last 16 Bytes captured: "; for (int i = 15; recv_buffer_size_ > 15 && i >= 0; i--) { its_message << std::setw(2) << std::setfill('0') << std::hex << (int) (recv_buffer_[static_cast(i)]) << " "; } VSOMEIP_ERROR << its_message.str(); recv_buffer_.clear(); if (socket_.is_open()) { boost::system::error_code its_error; socket_.shutdown(socket_.shutdown_both, its_error); socket_.close(its_error); } std::shared_ptr its_server = server_.lock(); if (its_server) { its_server->remove_connection(this); } } std::size_t tcp_server_endpoint_impl::connection::get_recv_buffer_capacity() const { return recv_buffer_.capacity(); } std::size_t tcp_server_endpoint_impl::connection::write_completion_condition( const boost::system::error_code& _error, std::size_t _bytes_transferred, std::size_t _bytes_to_send, service_t _service, method_t _method, client_t _client, session_t _session, const std::chrono::steady_clock::time_point _start) { if (_error) { VSOMEIP_ERROR << "tse::write_completion_condition: " << _error.message() << "(" << std::dec << _error.value() << ") bytes transferred: " << std::dec << _bytes_transferred << " bytes to sent: " << std::dec << _bytes_to_send << " " << "remote:" << get_address_port_remote() << " (" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _method << "." << std::hex << std::setw(4) << std::setfill('0') << _session << "]"; stop_and_remove_connection(); return 0; } const std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now(); const std::chrono::milliseconds passed = std::chrono::duration_cast(now - _start); if (passed > send_timeout_warning_) { if (passed > send_timeout_) { VSOMEIP_ERROR << "tse::write_completion_condition: " << _error.message() << "(" << std::dec << _error.value() << ") took longer than " << std::dec << send_timeout_.count() << "ms bytes transferred: " << std::dec << _bytes_transferred << " bytes to sent: " << std::dec << _bytes_to_send << " remote:" << get_address_port_remote() << " (" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _method << "." << std::hex << std::setw(4) << std::setfill('0') << _session << "]"; } else { VSOMEIP_WARNING << "tse::write_completion_condition: " << _error.message() << "(" << std::dec << _error.value() << ") took longer than " << std::dec << send_timeout_warning_.count() << "ms bytes transferred: " << std::dec << _bytes_transferred << " bytes to sent: " << std::dec << _bytes_to_send << " remote:" << get_address_port_remote() << " (" << std::hex << std::setw(4) << std::setfill('0') << _client <<"): [" << std::hex << std::setw(4) << std::setfill('0') << _service << "." << std::hex << std::setw(4) << std::setfill('0') << _method << "." << std::hex << std::setw(4) << std::setfill('0') << _session << "]"; } } return _bytes_to_send - _bytes_transferred; } void tcp_server_endpoint_impl::connection::stop_and_remove_connection() { std::shared_ptr its_server(server_.lock()); if (!its_server) { VSOMEIP_ERROR << "tse::connection::stop_and_remove_connection " " couldn't lock server_"; return; } { std::lock_guard its_lock(its_server->connections_mutex_); stop(); } its_server->remove_connection(this); } // Dummies void tcp_server_endpoint_impl::receive() { // intentionally left empty } void tcp_server_endpoint_impl::print_status() { std::lock_guard its_lock(mutex_); connections_t its_connections; { std::lock_guard its_lock(connections_mutex_); its_connections = connections_; } VSOMEIP_INFO << "status tse: " << std::dec << local_port_ << " connections: " << std::dec << its_connections.size() << " queues: " << std::dec << queues_.size(); for (const auto &c : its_connections) { std::size_t its_data_size(0); std::size_t its_queue_size(0); std::size_t its_recv_size(0); { std::unique_lock c_s_lock(c.second->get_socket_lock()); its_recv_size = c.second->get_recv_buffer_capacity(); } auto found_queue = queues_.find(c.first); if (found_queue != queues_.end()) { its_queue_size = found_queue->second.second.size(); its_data_size = found_queue->second.first; } VSOMEIP_INFO << "status tse: client: " << c.second->get_address_port_remote() << " queue: " << std::dec << its_queue_size << " data: " << std::dec << its_data_size << " recv_buffer: " << std::dec << its_recv_size; } } std::string tcp_server_endpoint_impl::get_remote_information( const queue_iterator_type _queue_iterator) const { boost::system::error_code ec; return _queue_iterator->first.address().to_string(ec) + ":" + std::to_string(_queue_iterator->first.port()); } std::string tcp_server_endpoint_impl::get_remote_information( const endpoint_type& _remote) const { boost::system::error_code ec; return _remote.address().to_string(ec) + ":" + std::to_string(_remote.port()); } bool tcp_server_endpoint_impl::tp_segmentation_enabled(service_t _service, method_t _method) const { (void)_service; (void)_method; return false; } void tcp_server_endpoint_impl::connection::wait_until_sent(const boost::system::error_code &_error) { std::shared_ptr its_server(server_.lock()); std::unique_lock its_sent_lock(its_server->sent_mutex_); if (!its_server->is_sending_ || !_error) { its_sent_lock.unlock(); if (!_error) VSOMEIP_WARNING << __func__ << ": Maximum wait time for send operation exceeded for tse."; { std::lock_guard its_lock(its_server->connections_mutex_); stop(); } its_server->remove_connection(this); } else { std::chrono::milliseconds its_timeout(VSOMEIP_MAX_TCP_SENT_WAIT_TIME); boost::system::error_code ec; its_server->sent_timer_.expires_from_now(its_timeout, ec); its_server->sent_timer_.async_wait(std::bind(&tcp_server_endpoint_impl::connection::wait_until_sent, std::dynamic_pointer_cast(shared_from_this()), std::placeholders::_1)); } } } // namespace vsomeip_v3