1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <iomanip>
7 #include <sstream>
8
9 #include <boost/asio/write.hpp>
10
11 #include <vsomeip/defines.hpp>
12 #include <vsomeip/internal/logger.hpp>
13
14 #include "../include/endpoint_host.hpp"
15 #include "../include/local_client_endpoint_impl.hpp"
16 #include "../include/local_server_endpoint_impl.hpp"
17 #include "../../routing/include/routing_host.hpp"
18 #include "../../security/include/security.hpp"
19
20 // Credentials
21 #ifndef _WIN32
22 #include "../include/credentials.hpp"
23 #endif
24
25 namespace vsomeip_v3 {
26
local_client_endpoint_impl(const std::shared_ptr<endpoint_host> & _endpoint_host,const std::shared_ptr<routing_host> & _routing_host,const endpoint_type & _remote,boost::asio::io_service & _io,const std::shared_ptr<configuration> & _configuration)27 local_client_endpoint_impl::local_client_endpoint_impl(
28 const std::shared_ptr<endpoint_host>& _endpoint_host,
29 const std::shared_ptr<routing_host>& _routing_host,
30 const endpoint_type& _remote,
31 boost::asio::io_service &_io,
32 const std::shared_ptr<configuration>& _configuration)
33 : local_client_endpoint_base_impl(_endpoint_host, _routing_host, _remote,
34 _remote, _io,
35 _configuration->get_max_message_size_local(),
36 _configuration->get_endpoint_queue_limit_local(),
37 _configuration),
38 // Using _remote for the local(!) endpoint is ok,
39 // because we have no bind for local endpoints!
40 recv_buffer_(VSOMEIP_ASSIGN_CLIENT_ACK_COMMAND_SIZE + 8,0) {
41 is_supporting_magic_cookies_ = false;
42 }
43
~local_client_endpoint_impl()44 local_client_endpoint_impl::~local_client_endpoint_impl() {
45
46 }
47
is_local() const48 bool local_client_endpoint_impl::is_local() const {
49 return true;
50 }
51
restart(bool _force)52 void local_client_endpoint_impl::restart(bool _force) {
53 if (!_force && state_ == cei_state_e::CONNECTING) {
54 return;
55 }
56 state_ = cei_state_e::CONNECTING;
57 {
58 std::lock_guard<std::mutex> its_lock(mutex_);
59 sending_blocked_ = false;
60 queue_.clear();
61 queue_size_ = 0;
62 }
63 {
64 std::lock_guard<std::mutex> its_lock(socket_mutex_);
65 shutdown_and_close_socket_unlocked(true);
66 }
67 was_not_connected_ = true;
68 reconnect_counter_ = 0;
69 start_connect_timer();
70 }
71
start()72 void local_client_endpoint_impl::start() {
73 connect();
74 }
75
stop()76 void local_client_endpoint_impl::stop() {
77 {
78 std::lock_guard<std::mutex> its_lock(mutex_);
79 sending_blocked_ = true;
80 }
81 {
82 std::lock_guard<std::mutex> its_lock(connect_timer_mutex_);
83 boost::system::error_code ec;
84 connect_timer_.cancel(ec);
85 }
86 connect_timeout_ = VSOMEIP_DEFAULT_CONNECT_TIMEOUT;
87
88 bool is_open(false);
89 {
90 std::lock_guard<std::mutex> its_lock(socket_mutex_);
91 is_open = socket_->is_open();
92 }
93 if (is_open) {
94 bool send_queue_empty(false);
95 std::uint32_t times_slept(0);
96
97 while (times_slept <= 50) {
98 mutex_.lock();
99 send_queue_empty = (queue_.size() == 0);
100 mutex_.unlock();
101 if (send_queue_empty) {
102 break;
103 } else {
104 std::this_thread::sleep_for(std::chrono::milliseconds(10));
105 times_slept++;
106 }
107 }
108 }
109 shutdown_and_close_socket(false);
110 }
111
connect()112 void local_client_endpoint_impl::connect() {
113 boost::system::error_code its_connect_error;
114 {
115 std::lock_guard<std::mutex> its_lock(socket_mutex_);
116 boost::system::error_code its_error;
117 socket_->open(remote_.protocol(), its_error);
118
119 if (!its_error || its_error == boost::asio::error::already_open) {
120 socket_->set_option(boost::asio::socket_base::reuse_address(true), its_error);
121 if (its_error) {
122 VSOMEIP_WARNING << "local_client_endpoint_impl::connect: "
123 << "couldn't enable SO_REUSEADDR: " << its_error.message();
124 }
125 state_ = cei_state_e::CONNECTING;
126 socket_->connect(remote_, its_connect_error);
127
128 // Credentials
129 #ifndef _WIN32
130 if (!its_connect_error) {
131 auto its_host = endpoint_host_.lock();
132 if (its_host) {
133 credentials::send_credentials(socket_->native_handle(),
134 its_host->get_client());
135 }
136 } else {
137 VSOMEIP_WARNING << "local_client_endpoint::connect: Couldn't "
138 << "connect to: " << remote_.path() << " ("
139 << its_connect_error.message() << " / " << std::dec
140 << its_connect_error.value() << ")";
141 }
142 #endif
143
144 } else {
145 VSOMEIP_WARNING << "local_client_endpoint::connect: Error opening socket: "
146 << its_error.message() << " (" << std::dec << its_error.value()
147 << ")";
148 its_connect_error = its_error;
149 }
150 }
151 // call connect_cbk asynchronously
152 try {
153 strand_.post(
154 std::bind(&client_endpoint_impl::connect_cbk, shared_from_this(),
155 its_connect_error));
156 } catch (const std::exception &e) {
157 VSOMEIP_ERROR << "local_client_endpoint_impl::connect: " << e.what();
158 }
159 }
160
receive()161 void local_client_endpoint_impl::receive() {
162 std::lock_guard<std::mutex> its_lock(socket_mutex_);
163 if (socket_->is_open()) {
164 socket_->async_receive(
165 boost::asio::buffer(recv_buffer_),
166 strand_.wrap(
167 std::bind(
168 &local_client_endpoint_impl::receive_cbk,
169 std::dynamic_pointer_cast<
170 local_client_endpoint_impl
171 >(shared_from_this()),
172 std::placeholders::_1,
173 std::placeholders::_2
174 )
175 )
176 );
177 }
178 }
179
180 // this overrides client_endpoint_impl::send to disable the pull method
181 // for local communication
send(const uint8_t * _data,uint32_t _size)182 bool local_client_endpoint_impl::send(const uint8_t *_data, uint32_t _size) {
183 std::lock_guard<std::mutex> its_lock(mutex_);
184 bool ret(true);
185 const bool queue_size_zero_on_entry(queue_.empty());
186 if (endpoint_impl::sending_blocked_ ||
187 check_message_size(nullptr, _size) != cms_ret_e::MSG_OK ||
188 !check_packetizer_space(_size) ||
189 !check_queue_limit(_data, _size)) {
190 ret = false;
191 } else {
192 #if 0
193 std::stringstream msg;
194 msg << "lce::send: ";
195 for (uint32_t i = 0; i < _size; i++)
196 msg << std::hex << std::setw(2) << std::setfill('0')
197 << (int)_data[i] << " ";
198 VSOMEIP_INFO << msg.str();
199 #endif
200 train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
201 queue_train(queue_size_zero_on_entry);
202 }
203 return ret;
204 }
205
send_queued(message_buffer_ptr_t _buffer)206 void local_client_endpoint_impl::send_queued(message_buffer_ptr_t _buffer) {
207 static const byte_t its_start_tag[] = { 0x67, 0x37, 0x6D, 0x07 };
208 static const byte_t its_end_tag[] = { 0x07, 0x6D, 0x37, 0x67 };
209 std::vector<boost::asio::const_buffer> bufs;
210
211 bufs.push_back(boost::asio::buffer(its_start_tag));
212 bufs.push_back(boost::asio::buffer(*_buffer));
213 bufs.push_back(boost::asio::buffer(its_end_tag));
214
215 {
216 std::lock_guard<std::mutex> its_lock(socket_mutex_);
217 boost::asio::async_write(
218 *socket_,
219 bufs,
220 std::bind(
221 &client_endpoint_impl::send_cbk,
222 std::dynamic_pointer_cast<
223 local_client_endpoint_impl
224 >(shared_from_this()),
225 std::placeholders::_1,
226 std::placeholders::_2,
227 _buffer
228 )
229 );
230 }
231 }
232
get_configured_times_from_endpoint(service_t _service,method_t _method,std::chrono::nanoseconds * _debouncing,std::chrono::nanoseconds * _maximum_retention) const233 void local_client_endpoint_impl::get_configured_times_from_endpoint(
234 service_t _service, method_t _method,
235 std::chrono::nanoseconds *_debouncing,
236 std::chrono::nanoseconds *_maximum_retention) const {
237 (void)_service;
238 (void)_method;
239 (void)_debouncing;
240 (void)_maximum_retention;
241 VSOMEIP_ERROR << "local_client_endpoint_impl::get_configured_times_from_endpoint called.";
242 }
243
send_magic_cookie()244 void local_client_endpoint_impl::send_magic_cookie() {
245 }
246
receive_cbk(boost::system::error_code const & _error,std::size_t _bytes)247 void local_client_endpoint_impl::receive_cbk(
248 boost::system::error_code const &_error, std::size_t _bytes) {
249 if (_error) {
250 if (_error == boost::asio::error::operation_aborted) {
251 // endpoint was stopped
252 return;
253 } else if (_error == boost::asio::error::connection_reset
254 || _error == boost::asio::error::eof
255 || _error == boost::asio::error::bad_descriptor) {
256 VSOMEIP_TRACE << "local_client_endpoint:"
257 " connection_reseted/EOF/bad_descriptor";
258 } else if (_error) {
259 VSOMEIP_ERROR << "Local endpoint received message ("
260 << _error.message() << ")";
261 }
262 error_handler_t handler;
263 {
264 std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
265 handler = error_handler_;
266 }
267 if (handler)
268 handler();
269 } else {
270
271 #if 0
272 std::stringstream msg;
273 msg << "lce<" << this << ">::recv: ";
274 for (std::size_t i = 0; i < recv_buffer_.size(); i++)
275 msg << std::setw(2) << std::setfill('0') << std::hex
276 << (int)recv_buffer_[i] << " ";
277 VSOMEIP_INFO << msg.str();
278 #endif
279
280 // We only handle a single message here. Check whether the message
281 // format matches what we do expect.
282 if (_bytes == VSOMEIP_ASSIGN_CLIENT_ACK_COMMAND_SIZE + 8
283 && recv_buffer_[0] == 0x67 && recv_buffer_[1] == 0x37
284 && recv_buffer_[2] == 0x6d && recv_buffer_[3] == 0x07
285 && recv_buffer_[4] == VSOMEIP_ASSIGN_CLIENT_ACK
286 && recv_buffer_[13] == 0x07 && recv_buffer_[14] == 0x6d
287 && recv_buffer_[15] == 0x37 && recv_buffer_[16] == 0x67) {
288
289 auto its_routing_host = routing_host_.lock();
290 if (its_routing_host)
291 its_routing_host->on_message(&recv_buffer_[4],
292 static_cast<length_t>(recv_buffer_.size() - 8), this);
293 }
294
295 receive();
296 }
297 }
298
get_remote_address(boost::asio::ip::address & _address) const299 bool local_client_endpoint_impl::get_remote_address(
300 boost::asio::ip::address &_address) const {
301 (void)_address;
302 return false;
303 }
304
get_remote_port() const305 std::uint16_t local_client_endpoint_impl::get_remote_port() const {
306 return 0;
307 }
308
set_local_port()309 void local_client_endpoint_impl::set_local_port() {
310 // local_port_ is set to zero in ctor of client_endpoint_impl -> do nothing
311 }
312
print_status()313 void local_client_endpoint_impl::print_status() {
314 #ifndef _WIN32
315 std::string its_path = remote_.path();
316 #else
317 std::string its_path("");
318 #endif
319 std::size_t its_data_size(0);
320 std::size_t its_queue_size(0);
321 {
322 std::lock_guard<std::mutex> its_lock(mutex_);
323 its_queue_size = queue_.size();
324 its_data_size = queue_size_;
325 }
326
327 VSOMEIP_INFO << "status lce: " << its_path << " queue: "
328 << its_queue_size << " data: " << its_data_size;
329 }
330
get_remote_information() const331 std::string local_client_endpoint_impl::get_remote_information() const {
332 #ifdef _WIN32
333 boost::system::error_code ec;
334 return remote_.address().to_string(ec) + ":"
335 + std::to_string(remote_.port());
336 #else
337 return remote_.path();
338 #endif
339 }
340
341
check_packetizer_space(std::uint32_t _size)342 bool local_client_endpoint_impl::check_packetizer_space(std::uint32_t _size) {
343 if (train_.buffer_->size() + _size < train_.buffer_->size()) {
344 VSOMEIP_ERROR << "Overflow in packetizer addition ~> abort sending!";
345 return false;
346 }
347 if (train_.buffer_->size() + _size > max_message_size_
348 && !train_.buffer_->empty()) {
349 queue_.push_back(train_.buffer_);
350 queue_size_ += train_.buffer_->size();
351 train_.buffer_ = std::make_shared<message_buffer_t>();
352 }
353 return true;
354 }
355
is_reliable() const356 bool local_client_endpoint_impl::is_reliable() const {
357 return false;
358 }
359
get_max_allowed_reconnects() const360 std::uint32_t local_client_endpoint_impl::get_max_allowed_reconnects() const {
361 return 13;
362 }
363
send(const std::vector<byte_t> & _cmd_header,const byte_t * _data,uint32_t _size)364 bool local_client_endpoint_impl::send(const std::vector<byte_t>& _cmd_header,
365 const byte_t *_data, uint32_t _size) {
366 std::lock_guard<std::mutex> its_lock(mutex_);
367 bool ret(true);
368 const bool queue_size_zero_on_entry(queue_.empty());
369
370 const std::uint32_t its_complete_size = static_cast<std::uint32_t>(
371 _cmd_header.size() + _size);
372 if (endpoint_impl::sending_blocked_ ||
373 check_message_size(nullptr, its_complete_size) != cms_ret_e::MSG_OK ||
374 !check_packetizer_space(its_complete_size)||
375 !check_queue_limit(_data, its_complete_size)) {
376 ret = false;
377 } else {
378 #if 0
379 std::stringstream msg;
380 msg << "lce::send: ";
381 for (uint32_t i = 0; i < _size; i++)
382 msg << std::hex << std::setw(2) << std::setfill('0')
383 << (int)_data[i] << " ";
384 VSOMEIP_INFO << msg.str();
385 #endif
386 train_.buffer_->reserve(its_complete_size);
387 train_.buffer_->insert(train_.buffer_->end(), _cmd_header.begin(), _cmd_header.end());
388 train_.buffer_->insert(train_.buffer_->end(), _data, _data + _size);
389 queue_train(queue_size_zero_on_entry);
390 }
391 return ret;
392 }
393
tp_segmentation_enabled(service_t _service,method_t _method) const394 bool local_client_endpoint_impl::tp_segmentation_enabled(
395 service_t _service, method_t _method) const {
396 (void)_service;
397 (void)_method;
398 return false;
399 }
400
max_allowed_reconnects_reached()401 void local_client_endpoint_impl::max_allowed_reconnects_reached() {
402 VSOMEIP_ERROR << "local_client_endpoint::max_allowed_reconnects_reached: "
403 << get_remote_information();
404 error_handler_t handler;
405 {
406 std::lock_guard<std::mutex> its_lock(error_handler_mutex_);
407 handler = error_handler_;
408 }
409 if (handler)
410 handler();
411 }
412
413 } // namespace vsomeip_v3
414