• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <iomanip>
7 #include <sstream>
8 
9 #include <boost/asio/write.hpp>
10 
11 #include <vsomeip/defines.hpp>
12 #include <vsomeip/internal/logger.hpp>
13 
14 #include "../include/endpoint_host.hpp"
15 #include "../include/local_client_endpoint_impl.hpp"
16 #include "../include/local_server_endpoint_impl.hpp"
17 #include "../../routing/include/routing_host.hpp"
18 #include "../../security/include/security.hpp"
19 
20 // Credentials
21 #ifndef _WIN32
22 #include "../include/credentials.hpp"
23 #endif
24 
25 namespace vsomeip_v3 {
26 
local_client_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _remote,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)27 local_client_endpoint_impl::local_client_endpoint_impl(
28         const std::shared_ptr<endpoint_host>& _endpoint_host,
29         const std::shared_ptr<routing_host>& _routing_host,
30         const endpoint_type& _remote,
31         boost::asio::io_service &_io,
32         const std::shared_ptr<configuration>& _configuration)
33     : local_client_endpoint_base_impl(_endpoint_host, _routing_host, _remote,
34                                       _remote, _io,
35                                       _configuration->get_max_message_size_local(),
36                                       _configuration->get_endpoint_queue_limit_local(),
37                                       _configuration),
38                                       // Using _remote for the local(!) endpoint is ok,
39                                       // because we have no bind for local endpoints!
40       recv_buffer_(VSOMEIP_ASSIGN_CLIENT_ACK_COMMAND_SIZE + 8,0) {
41     is_supporting_magic_cookies_ = false;
42 }
43 
~local_client_endpoint_impl()44 local_client_endpoint_impl::~local_client_endpoint_impl() {
45 
46 }
47 
is_local() const48 bool local_client_endpoint_impl::is_local() const {
49     return true;
50 }
51 
restart(bool _force)52 void local_client_endpoint_impl::restart(bool _force) {
53     if (!_force && state_ == cei_state_e::CONNECTING) {
54         return;
55     }
56     state_ = cei_state_e::CONNECTING;
57     {
58         std::lock_guard<std::mutex> its_lock(mutex_);
59         sending_blocked_ = false;
60         queue_.clear();
61         queue_size_ = 0;
62     }
63     {
64         std::lock_guard<std::mutex> its_lock(socket_mutex_);
65         shutdown_and_close_socket_unlocked(true);
66     }
67     was_not_connected_ = true;
68     reconnect_counter_ = 0;
69     start_connect_timer();
70 }
71 
start()72 void local_client_endpoint_impl::start() {
73     connect();
74 }
75 
stop()76 void local_client_endpoint_impl::stop() {
77     {
78         std::lock_guard<std::mutex> its_lock(mutex_);
79         sending_blocked_ = true;
80     }
81     {
82         std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
83         boost::system::error_code ec;
84         connect_timer_.cancel(ec);
85     }
86     connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
87 
88     bool is_open(false);
89     {
90         std::lock_guard<std::mutex> its_lock(socket_mutex_);
91         is_open = socket_->is_open();
92     }
93     if (is_open) {
94         bool send_queue_empty(false);
95         std::uint32_t times_slept(0);
96 
97         while (times_slept <= 50) {
98             mutex_.lock();
99             send_queue_empty = (queue_.size() == 0);
100             mutex_.unlock();
101             if (send_queue_empty) {
102                 break;
103             } else {
104                 std::this_thread::sleep_for(std::chrono::milliseconds(10));
105                 times_slept++;
106             }
107         }
108     }
109     shutdown_and_close_socket(false);
110 }
111 
connect()112 void local_client_endpoint_impl::connect() {
113     boost::system::error_code its_connect_error;
114     {
115         std::lock_guard<std::mutex> its_lock(socket_mutex_);
116         boost::system::error_code its_error;
117         socket_->open(remote_.protocol(), its_error);
118 
119         if (!its_error || its_error == boost::asio::error::already_open) {
120             socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
121             if (its_error) {
122                 VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
123                         << "couldn't enable SO_REUSEADDR: " << its_error.message();
124             }
125             state_ = cei_state_e::CONNECTING;
126             socket_->connect(remote_, its_connect_error);
127 
128 // Credentials
129 #ifndef _WIN32
130             if (!its_connect_error) {
131                 auto its_host = endpoint_host_.lock();
132                 if (its_host) {
133                     credentials::send_credentials(socket_->native_handle(),
134                             its_host->get_client());
135                 }
136             } else {
137                 VSOMEIP_WARNING << "local_client_endpoint::connect: Couldn't "
138                         << "connect to: " << remote_.path() << " ("
139                         << its_connect_error.message() << " / " << std::dec
140                         << its_connect_error.value() << ")";
141             }
142 #endif
143 
144         } else {
145             VSOMEIP_WARNING << "local_client_endpoint::connect: Error opening socket: "
146                     << its_error.message() << " (" << std::dec << its_error.value()
147                     << ")";
148             its_connect_error = its_error;
149         }
150     }
151     // call connect_cbk asynchronously
152     try {
153         strand_.post(
154                 std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
155                         its_connect_error));
156     } catch (const std::exception &e) {
157         VSOMEIP_ERROR << "local_client_endpoint_impl::connect: " << e.what();
158     }
159 }
160 
receive()161 void local_client_endpoint_impl::receive() {
162     std::lock_guard<std::mutex> its_lock(socket_mutex_);
163     if (socket_->is_open()) {
164         socket_->async_receive(
165             boost::asio::buffer(recv_buffer_),
166             strand_.wrap(
167                 std::bind(
168                     &local_client_endpoint_impl::receive_cbk,
169                     std::dynamic_pointer_cast<
170                         local_client_endpoint_impl
171                     >(shared_from_this()),
172                     std::placeholders::_1,
173                     std::placeholders::_2
174                 )
175             )
176         );
177     }
178 }
179 
180 // this overrides client_endpoint_impl::send to disable the pull method
181 // for local communication
send(const uint8_t * _data,uint32_t _size)182 bool local_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) {
183     std::lock_guard<std::mutex> its_lock(mutex_);
184     bool ret(true);
185     const bool queue_size_zero_on_entry(queue_.empty());
186     if (endpoint_impl::sending_blocked_ ||
187         check_message_size(nullptr, _size) != cms_ret_e::MSG_OK ||
188         !check_packetizer_space(_size) ||
189         !check_queue_limit(_data, _size)) {
190         ret = false;
191     } else {
192 #if 0
193         std::stringstream msg;
194         msg << "lce::send: ";
195         for (uint32_t i = 0; i < _size; i++)
196             msg << std::hex << std::setw(2) << std::setfill('0')
197                 << (int)_data[i] << " ";
198         VSOMEIP_INFO << msg.str();
199 #endif
200         train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
201         queue_train(queue_size_zero_on_entry);
202     }
203     return ret;
204 }
205 
send_queued(message_buffer_ptr_t _buffer)206 void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
207     static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
208     static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
209     std::vector<boost::asio::const_buffer> bufs;
210 
211     bufs.push_back(boost::asio::buffer(its_start_tag));
212     bufs.push_back(boost::asio::buffer(*_buffer));
213     bufs.push_back(boost::asio::buffer(its_end_tag));
214 
215     {
216         std::lock_guard<std::mutex> its_lock(socket_mutex_);
217         boost::asio::async_write(
218             *socket_,
219             bufs,
220             std::bind(
221                 &client_endpoint_impl::send_cbk,
222                 std::dynamic_pointer_cast<
223                     local_client_endpoint_impl
224                 >(shared_from_this()),
225                 std::placeholders::_1,
226                 std::placeholders::_2,
227                 _buffer
228             )
229         );
230     }
231 }
232 
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const233 void local_client_endpoint_impl::get_configured_times_from_endpoint(
234         service_t _service, method_t _method,
235         std::chrono::nanoseconds *_debouncing,
236         std::chrono::nanoseconds *_maximum_retention) const {
237     (void)_service;
238     (void)_method;
239     (void)_debouncing;
240     (void)_maximum_retention;
241     VSOMEIP_ERROR << "local_client_endpoint_impl::get_configured_times_from_endpoint called.";
242 }
243 
send_magic_cookie()244 void local_client_endpoint_impl::send_magic_cookie() {
245 }
246 
receive_cbk(boost::system::error_code const & _error,std::size_t _bytes)247 void local_client_endpoint_impl::receive_cbk(
248         boost::system::error_code const &_error, std::size_t _bytes) {
249     if (_error) {
250         if (_error == boost::asio::error::operation_aborted) {
251             // endpoint was stopped
252             return;
253         } else if (_error == boost::asio::error::connection_reset
254                 || _error == boost::asio::error::eof
255                 || _error == boost::asio::error::bad_descriptor) {
256             VSOMEIP_TRACE << "local_client_endpoint:"
257                     " connection_reseted/EOF/bad_descriptor";
258         } else if (_error) {
259             VSOMEIP_ERROR << "Local endpoint received message ("
260                           << _error.message() << ")";
261         }
262         error_handler_t handler;
263         {
264             std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
265             handler = error_handler_;
266         }
267         if (handler)
268             handler();
269     } else {
270 
271 #if 0
272         std::stringstream msg;
273         msg << "lce<" << this << ">::recv: ";
274         for (std::size_t i = 0; i < recv_buffer_.size(); i++)
275             msg << std::setw(2) << std::setfill('0') << std::hex
276                 << (int)recv_buffer_[i] << " ";
277         VSOMEIP_INFO << msg.str();
278 #endif
279 
280         // We only handle a single message here. Check whether the message
281         // format matches what we do expect.
282         if (_bytes == VSOMEIP_ASSIGN_CLIENT_ACK_COMMAND_SIZE + 8
283                 && recv_buffer_[0] == 0x67 && recv_buffer_[1] == 0x37
284                 && recv_buffer_[2] == 0x6d && recv_buffer_[3] == 0x07
285                 && recv_buffer_[4] == VSOMEIP_ASSIGN_CLIENT_ACK
286                 && recv_buffer_[13] == 0x07 && recv_buffer_[14] == 0x6d
287                 && recv_buffer_[15] == 0x37 && recv_buffer_[16] == 0x67) {
288 
289             auto its_routing_host = routing_host_.lock();
290             if (its_routing_host)
291                 its_routing_host->on_message(&recv_buffer_[4],
292                         static_cast<length_t>(recv_buffer_.size() - 8), this);
293         }
294 
295         receive();
296     }
297 }
298 
get_remote_address(boost::asio::ip::address & _address) const299 bool local_client_endpoint_impl::get_remote_address(
300         boost::asio::ip::address &_address) const {
301     (void)_address;
302     return false;
303 }
304 
get_remote_port() const305 std::uint16_t local_client_endpoint_impl::get_remote_port() const {
306     return 0;
307 }
308 
set_local_port()309 void local_client_endpoint_impl::set_local_port() {
310     // local_port_ is set to zero in ctor of client_endpoint_impl -> do nothing
311 }
312 
print_status()313 void local_client_endpoint_impl::print_status() {
314 #ifndef _WIN32
315     std::string its_path = remote_.path();
316 #else
317     std::string its_path("");
318 #endif
319     std::size_t its_data_size(0);
320     std::size_t its_queue_size(0);
321     {
322         std::lock_guard<std::mutex> its_lock(mutex_);
323         its_queue_size = queue_.size();
324         its_data_size = queue_size_;
325     }
326 
327     VSOMEIP_INFO << "status lce: " << its_path  << " queue: "
328             << its_queue_size << " data: " << its_data_size;
329 }
330 
get_remote_information() const331 std::string local_client_endpoint_impl::get_remote_information() const {
332 #ifdef _WIN32
333     boost::system::error_code ec;
334     return remote_.address().to_string(ec) + ":"
335             + std::to_string(remote_.port());
336 #else
337     return remote_.path();
338 #endif
339 }
340 
341 
check_packetizer_space(std::uint32_t _size)342 bool local_client_endpoint_impl::check_packetizer_space(std::uint32_t _size) {
343     if (train_.buffer_->size() + _size < train_.buffer_->size()) {
344         VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
345         return false;
346     }
347     if (train_.buffer_->size() + _size > max_message_size_
348             && !train_.buffer_->empty()) {
349         queue_.push_back(train_.buffer_);
350         queue_size_ += train_.buffer_->size();
351         train_.buffer_ = std::make_shared<message_buffer_t>();
352     }
353     return true;
354 }
355 
is_reliable() const356 bool local_client_endpoint_impl::is_reliable() const {
357     return false;
358 }
359 
get_max_allowed_reconnects() const360 std::uint32_t local_client_endpoint_impl::get_max_allowed_reconnects() const {
361     return 13;
362 }
363 
send(const std::vector<byte_t> & _cmd_header,const byte_t * _data,uint32_t _size)364 bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
365                                       const byte_t *_data, uint32_t _size) {
366     std::lock_guard<std::mutex> its_lock(mutex_);
367     bool ret(true);
368     const bool queue_size_zero_on_entry(queue_.empty());
369 
370     const std::uint32_t its_complete_size = static_cast<std::uint32_t>(
371                                                     _cmd_header.size() + _size);
372     if (endpoint_impl::sending_blocked_ ||
373         check_message_size(nullptr, its_complete_size) != cms_ret_e::MSG_OK ||
374         !check_packetizer_space(its_complete_size)||
375         !check_queue_limit(_data, its_complete_size)) {
376         ret = false;
377     } else {
378 #if 0
379         std::stringstream msg;
380         msg << "lce::send: ";
381         for (uint32_t i = 0; i < _size; i++)
382         msg << std::hex << std::setw(2) << std::setfill('0')
383         << (int)_data[i] << " ";
384         VSOMEIP_INFO << msg.str();
385 #endif
386         train_.buffer_->reserve(its_complete_size);
387         train_.buffer_->insert(train_.buffer_->end(), _cmd_header.begin(), _cmd_header.end());
388         train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
389         queue_train(queue_size_zero_on_entry);
390     }
391     return ret;
392 }
393 
tp_segmentation_enabled(service_t _service,method_t _method) const394 bool local_client_endpoint_impl::tp_segmentation_enabled(
395         service_t _service, method_t _method) const {
396     (void)_service;
397     (void)_method;
398     return false;
399 }
400 
max_allowed_reconnects_reached()401 void local_client_endpoint_impl::max_allowed_reconnects_reached() {
402     VSOMEIP_ERROR << "local_client_endpoint::max_allowed_reconnects_reached: "
403             << get_remote_information();
404     error_handler_t handler;
405     {
406         std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
407         handler = error_handler_;
408     }
409     if (handler)
410         handler();
411 }
412 
413 } // namespace vsomeip_v3
414