1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5
6 #include <chrono>
7 #include <condition_variable>
8 #include <iomanip>
9 #include <iostream>
10 #include <sstream>
11 #include <thread>
12 #include <map>
13 #include <algorithm>
14 #include <atomic>
15
16 #include <gtest/gtest.h>
17
18 #include <vsomeip/vsomeip.hpp>
19 #include <vsomeip/internal/logger.hpp>
20
21 #include "subscribe_notify_test_globals.hpp"
22
23
24 class subscribe_notify_test_service {
25 public:
subscribe_notify_test_service(struct subscribe_notify_test::service_info _service_info,std::array<subscribe_notify_test::service_info,7> _service_infos,vsomeip::reliability_type_e _reliability_type)26 subscribe_notify_test_service(struct subscribe_notify_test::service_info _service_info,
27 std::array<subscribe_notify_test::service_info, 7> _service_infos,
28 vsomeip::reliability_type_e _reliability_type) :
29 service_info_(_service_info),
30 service_infos_(_service_infos),
31 app_(vsomeip::runtime::get()->create_application()),
32 wait_until_registered_(true),
33 wait_until_other_services_available_(true),
34 wait_until_notified_from_other_services_(true),
35 offer_thread_(std::bind(&subscribe_notify_test_service::run, this)),
36 wait_for_stop_(true),
37 stop_thread_(std::bind(&subscribe_notify_test_service::wait_for_stop, this)),
38 wait_for_notify_(true),
39 notify_thread_(std::bind(&subscribe_notify_test_service::notify, this)),
40 subscription_state_handler_called_(0),
41 subscription_error_occured_(false),
42 reliability_type_(_reliability_type) {
43 if (!app_->init()) {
44 ADD_FAILURE() << "Couldn't initialize application";
45 return;
46 }
47
48 app_->register_state_handler(
49 std::bind(&subscribe_notify_test_service::on_state, this,
50 std::placeholders::_1));
51 app_->register_message_handler(service_info_.service_id,
52 service_info_.instance_id, service_info_.method_id,
53 std::bind(&subscribe_notify_test_service::on_request, this,
54 std::placeholders::_1));
55 app_->register_message_handler(vsomeip::ANY_SERVICE,
56 vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD,
57 std::bind(&subscribe_notify_test_service::on_message, this,
58 std::placeholders::_1));
59
60 // offer event
61 std::set<vsomeip::eventgroup_t> its_eventgroups;
62 its_eventgroups.insert(service_info_.eventgroup_id);
63 app_->offer_event(service_info_.service_id, service_info_.instance_id,
64 service_info_.event_id, its_eventgroups,
65 vsomeip::event_type_e::ET_FIELD, std::chrono::milliseconds::zero(),
66 false, true, nullptr, reliability_type_);
67
68
69 // register availability for all other services and request their event.
70 for(const auto& i : service_infos_) {
71 if ((i.service_id == service_info_.service_id
72 && i.instance_id == service_info_.instance_id)
73 || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
74 continue;
75 }
76 app_->request_service(i.service_id, i.instance_id);
77 app_->register_availability_handler(i.service_id, i.instance_id,
78 std::bind(&subscribe_notify_test_service::on_availability, this,
79 std::placeholders::_1, std::placeholders::_2,
80 std::placeholders::_3));
81
82 auto handler = std::bind(&subscribe_notify_test_service::on_subscription_state_change, this,
83 std::placeholders::_1, std::placeholders::_2,
84 std::placeholders::_3, std::placeholders::_4, std::placeholders::_5);
85 app_->register_subscription_status_handler(i.service_id, i.instance_id, i.eventgroup_id, vsomeip::ANY_EVENT, handler);
86
87
88 std::set<vsomeip::eventgroup_t> its_eventgroups;
89 its_eventgroups.insert(i.eventgroup_id);
90 app_->request_event(i.service_id, i.instance_id, i.event_id, its_eventgroups, vsomeip::event_type_e::ET_FIELD, reliability_type_);
91
92 other_services_available_[std::make_pair(i.service_id, i.instance_id)] = false;
93 other_services_received_notification_[std::make_pair(i.service_id, i.method_id)] = 0;
94 }
95
96 // register subscription handler to detect whether or not all other
97 // other services have subscribed
98 app_->register_subscription_handler(service_info_.service_id,
99 service_info_.instance_id, service_info_.eventgroup_id,
100 std::bind(&subscribe_notify_test_service::on_subscription, this,
101 std::placeholders::_1, std::placeholders::_2,
102 std::placeholders::_3, std::placeholders::_4));
103
104 app_->start();
105 }
106
~subscribe_notify_test_service()107 ~subscribe_notify_test_service() {
108 offer_thread_.join();
109 stop_thread_.join();
110 }
111
offer()112 void offer() {
113 app_->offer_service(service_info_.service_id, service_info_.instance_id);
114 }
115
stop_offer()116 void stop_offer() {
117 app_->stop_offer_event(service_info_.service_id, service_info_.instance_id, service_info_.event_id);
118 app_->stop_offer_service(service_info_.service_id, service_info_.instance_id);
119 }
120
on_state(vsomeip::state_type_e _state)121 void on_state(vsomeip::state_type_e _state) {
122 VSOMEIP_INFO << "Application " << app_->get_name() << " is "
123 << (_state == vsomeip::state_type_e::ST_REGISTERED ?
124 "registered." : "deregistered.");
125
126 if (_state == vsomeip::state_type_e::ST_REGISTERED) {
127 std::lock_guard<std::mutex> its_lock(mutex_);
128 wait_until_registered_ = false;
129 condition_.notify_one();
130 }
131 }
132
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)133 void on_availability(vsomeip::service_t _service,
134 vsomeip::instance_t _instance, bool _is_available) {
135 if(_is_available) {
136 auto its_service = other_services_available_.find(std::make_pair(_service, _instance));
137 if(its_service != other_services_available_.end()) {
138 if(its_service->second != _is_available) {
139 its_service->second = true;
140 VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
141 << service_info_.service_id << "] Service ["
142 << std::setw(4) << std::setfill('0') << std::hex << _service << "." << _instance
143 << "] is available.";
144
145 }
146 }
147
148 if(std::all_of(other_services_available_.cbegin(),
149 other_services_available_.cend(),
150 [](const std::map<std::pair<vsomeip::service_t,
151 vsomeip::instance_t>, bool>::value_type& v) {
152 return v.second;})) {
153 std::lock_guard<std::mutex> its_lock(mutex_);
154 wait_until_other_services_available_ = false;
155 condition_.notify_one();
156 }
157 }
158 }
159
on_subscription_state_change(const vsomeip::service_t _service,const vsomeip::instance_t _instance,const vsomeip::eventgroup_t _eventgroup,const vsomeip::event_t _event,const uint16_t _error)160 void on_subscription_state_change(const vsomeip::service_t _service, const vsomeip::instance_t _instance,
161 const vsomeip::eventgroup_t _eventgroup, const vsomeip::event_t _event, const uint16_t _error) {
162 (void)_service;
163 (void)_instance;
164 (void)_eventgroup;
165 (void)_event;
166
167 if (!_error) {
168 subscription_state_handler_called_++;
169 } else {
170 subscription_error_occured_ = true;
171 VSOMEIP_WARNING << std::hex << app_->get_client()
172 << " : on_subscription_state_change: for service " << std::hex
173 << _service << " received a subscription error!";
174 }
175 }
176
on_subscription(vsomeip::client_t _client,std::uint32_t _uid,std::uint32_t _gid,bool _subscribed)177 bool on_subscription(vsomeip::client_t _client, std::uint32_t _uid, std::uint32_t _gid,
178 bool _subscribed) {
179 (void)_uid;
180 (void)_gid;
181 std::lock_guard<std::mutex> its_lock(subscribers_mutex_);
182 static bool notified(false);
183 if (_subscribed) {
184 subscribers_.insert(_client);
185 } else {
186 subscribers_.erase(_client);
187 }
188
189 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
190 << service_info_.service_id << "] " << "Client: "
191 << std::setw(4) << std::setfill('0') << std::hex << _client
192 << (_subscribed ? " subscribed" : " unsubscribed")
193 << ", now have " << std::dec << subscribers_.size()
194 << " subscribers" ;
195 // check if all other services have subscribed:
196 // -1 for placeholder in array
197 // divide by two because we only receive once subscription per remote node
198 // no matter how many clients subscribed to this eventgroup on the remote node
199 if (!notified && subscribers_.size() == (service_infos_.size() - 1) / 2 )
200 {
201 // notify the notify thread to start sending out notifications
202 std::lock_guard<std::mutex> its_lock(notify_mutex_);
203 wait_for_notify_ = false;
204 notify_condition_.notify_one();
205 notified = true;
206 }
207 return true;
208 }
209
on_request(const std::shared_ptr<vsomeip::message> & _message)210 void on_request(const std::shared_ptr<vsomeip::message> &_message) {
211 if(_message->get_message_type() == vsomeip::message_type_e::MT_REQUEST) {
212 VSOMEIP_DEBUG << "Received a request with Client/Session [" << std::setw(4)
213 << std::setfill('0') << std::hex << _message->get_client() << "/"
214 << std::setw(4) << std::setfill('0') << std::hex
215 << _message->get_session() << "]";
216 std::shared_ptr<vsomeip::message> its_response = vsomeip::runtime::get()
217 ->create_response(_message);
218 app_->send(its_response);
219 }
220 }
221
on_message(const std::shared_ptr<vsomeip::message> & _message)222 void on_message(const std::shared_ptr<vsomeip::message> &_message) {
223 if(_message->get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
224
225 other_services_received_notification_[std::make_pair(_message->get_service(),
226 _message->get_method())]++;
227
228 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
229 << service_info_.service_id << "] "
230 << "Received a notification with Client/Session [" << std::setw(4)
231 << std::setfill('0') << std::hex << _message->get_client() << "/"
232 << std::setw(4) << std::setfill('0') << std::hex
233 << _message->get_session() << "] from Service/Method ["
234 << std::setw(4) << std::setfill('0') << std::hex
235 << _message->get_service() << "/" << std::setw(4) << std::setfill('0')
236 << std::hex << _message->get_method() << "/" << std::dec << _message->get_length() << "] (now have: "
237 << std::dec << other_services_received_notification_[std::make_pair(_message->get_service(),
238 _message->get_method())] << ")";
239
240 if(all_notifications_received()) {
241 std::lock_guard<std::mutex> its_lock(stop_mutex_);
242 wait_for_stop_ = false;
243 stop_condition_.notify_one();
244 }
245 }
246 }
247
all_notifications_received()248 bool all_notifications_received() {
249 return std::all_of(
250 other_services_received_notification_.cbegin(),
251 other_services_received_notification_.cend(),
252 [&](const std::map<std::pair<vsomeip::service_t,
253 vsomeip::method_t>, std::uint32_t>::value_type& v)
254 {
255 return v.second == subscribe_notify_test::notifications_to_send;
256 }
257 );
258 }
259
all_notifications_received_tcp_and_udp()260 bool all_notifications_received_tcp_and_udp() {
261 std::uint32_t received_twice(0);
262 std::uint32_t received_normal(0);
263 for(const auto &v : other_services_received_notification_) {
264 if (v.second == subscribe_notify_test::notifications_to_send * 2) {
265 received_twice++;
266 } else if(v.second == subscribe_notify_test::notifications_to_send) {
267 received_normal++;
268 }
269 }
270
271 if( received_twice == (service_infos_.size() - 1) / 2
272 && received_normal == (service_infos_.size() - 1) / 2 - 1) {
273 // routing manager stub receives the notification
274 // - twice from external nodes
275 // - and normal from all internal nodes
276 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
277 << service_info_.service_id << "] "
278 << "Received notifications:"
279 << " Normal: " << received_normal
280 << " Twice: " << received_twice;
281 return true;
282 }
283 return false;
284 }
285
run()286 void run() {
287 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
288 << service_info_.service_id << "] Running";
289 std::unique_lock<std::mutex> its_lock(mutex_);
290 while (wait_until_registered_) {
291 condition_.wait(its_lock);
292 }
293
294 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
295 << service_info_.service_id << "] Offering";
296 offer();
297
298
299 while (wait_until_other_services_available_) {
300 condition_.wait(its_lock);
301 }
302
303 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
304 << service_info_.service_id << "] Subscribing";
305 // subscribe to events of other services
306 uint32_t subscribe_count = 0;
307 for(const subscribe_notify_test::service_info& i: service_infos_) {
308 if ((i.service_id == service_info_.service_id
309 && i.instance_id == service_info_.instance_id)
310 || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
311 continue;
312 }
313 ++subscribe_count;
314 app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
315 vsomeip::DEFAULT_MAJOR);
316 VSOMEIP_DEBUG << "[" << std::hex << service_info_.service_id
317 << "] subscribing to Service/Instance/Eventgroup ["
318 << std::setw(4) << std::setfill('0') << std::hex << i.service_id << "/"
319 << std::setw(4) << std::setfill('0') << std::hex << i.instance_id
320 << "/" << std::setw(4) << std::setfill('0') << std::hex << i.eventgroup_id <<"]";
321 }
322
323 while (wait_until_notified_from_other_services_) {
324 condition_.wait(its_lock);
325 }
326
327 // It is possible that we run in the case a subscription is NACKED
328 // due to TCP endpoint not completely connected when subscription
329 // is processed in the server - due to resubscribing the error handler
330 // count may differ from expected value, but its not a real but as
331 // the subscription takes places anyways and all events will be received.
332 if (!subscription_error_occured_) {
333 ASSERT_EQ(subscribe_count, subscription_state_handler_called_);
334 } else {
335 VSOMEIP_WARNING << "Subscription state handler check skipped: CallCount="
336 << std::dec << subscription_state_handler_called_;
337 }
338 }
339
notify()340 void notify() {
341 std::unique_lock<std::mutex> its_lock(notify_mutex_);
342 while (wait_for_notify_) {
343 notify_condition_.wait(its_lock);
344 }
345
346 // sleep a while before starting to notify this is necessary as it's not
347 // possible to detect if _all_ clients on the remote side have
348 // successfully subscribed as we only receive once subscription per
349 // remote node no matter how many clients subscribed to this eventgroup
350 // on the remote node
351 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
352
353 VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
354 << service_info_.service_id << "] Starting to notify";
355
356 for(uint32_t i = 0; i < subscribe_notify_test::notifications_to_send; i++) {
357 std::shared_ptr<vsomeip::payload> its_payload =
358 vsomeip::runtime::get()->create_payload();
359
360 vsomeip::byte_t its_data[10] = {0};
361 for (uint32_t j = 0; j < i+1; ++j) {
362 its_data[j] = static_cast<uint8_t>(j);
363 }
364 its_payload->set_data(its_data, i+1);
365 VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
366 << service_info_.service_id << "] Notifying: " << i+1;
367 app_->notify(service_info_.service_id, service_info_.instance_id,
368 service_info_.event_id, its_payload);
369 std::this_thread::sleep_for(std::chrono::milliseconds(100));
370 }
371 }
372
wait_for_stop()373 void wait_for_stop() {
374 std::unique_lock<std::mutex> its_lock(stop_mutex_);
375 while (wait_for_stop_) {
376 stop_condition_.wait(its_lock);
377 }
378
379 // wait until all notifications have been sent out
380 notify_thread_.join();
381
382 VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
383 << service_info_.service_id
384 << "] Received notifications from all other services, going down";
385
386 // let offer thread exit
387 {
388 std::lock_guard<std::mutex> its_lock(mutex_);
389 wait_until_notified_from_other_services_ = false;
390 condition_.notify_one();
391 }
392
393 stop_offer();
394
395 // ensure that the service which hosts the routing doesn't exit to early
396 if (app_->is_routing()) {
397 for (const auto& i : service_infos_) {
398 if ((i.service_id == service_info_.service_id
399 && i.instance_id == service_info_.instance_id)
400 || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
401 continue;
402 }
403 while (app_->is_available(i.service_id, i.instance_id,
404 vsomeip::ANY_MAJOR, vsomeip::ANY_MINOR)) {
405 std::this_thread::sleep_for(std::chrono::milliseconds(100));
406 }
407 }
408 }
409
410 std::this_thread::sleep_for(std::chrono::seconds(1));
411 for(const auto& i : service_infos_) {
412 if ((i.service_id == service_info_.service_id
413 && i.instance_id == service_info_.instance_id)
414 || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
415 continue;
416 }
417 app_->unregister_subscription_status_handler(i.service_id, i.instance_id,
418 i.eventgroup_id, vsomeip::ANY_EVENT);
419 app_->unsubscribe(i.service_id, i.instance_id, i.eventgroup_id);
420 app_->release_event(i.service_id, i.instance_id, i.event_id);
421 app_->release_service(i.service_id, i.instance_id);
422 }
423 app_->clear_all_handler();
424 app_->stop();
425 }
426
427 private:
428 subscribe_notify_test::service_info service_info_;
429 std::array<subscribe_notify_test::service_info, 7> service_infos_;
430 std::shared_ptr<vsomeip::application> app_;
431 std::map<std::pair<vsomeip::service_t, vsomeip::instance_t>, bool> other_services_available_;
432 std::map<std::pair<vsomeip::service_t, vsomeip::method_t>, std::uint32_t> other_services_received_notification_;
433
434 bool wait_until_registered_;
435 bool wait_until_other_services_available_;
436 bool wait_until_notified_from_other_services_;
437 std::mutex mutex_;
438 std::condition_variable condition_;
439 std::thread offer_thread_;
440
441 bool wait_for_stop_;
442 std::mutex stop_mutex_;
443 std::condition_variable stop_condition_;
444 std::thread stop_thread_;
445
446 std::set<vsomeip::client_t> subscribers_;
447 bool wait_for_notify_;
448 std::mutex notify_mutex_;
449 std::condition_variable notify_condition_;
450 std::thread notify_thread_;
451 std::atomic<uint32_t> subscription_state_handler_called_;
452 std::atomic<bool> subscription_error_occured_;
453
454 std::mutex subscribers_mutex_;
455 vsomeip::reliability_type_e reliability_type_;
456 };
457
458 static unsigned long service_number;
459 static bool use_same_service_id;
460 vsomeip::reliability_type_e reliability_type = vsomeip::reliability_type_e::RT_UNKNOWN;
461
462
TEST(someip_subscribe_notify_test,send_ten_notifications_to_service)463 TEST(someip_subscribe_notify_test, send_ten_notifications_to_service)
464 {
465 if(use_same_service_id) {
466 subscribe_notify_test_service its_sample(
467 subscribe_notify_test::service_infos_same_service_id[service_number],
468 subscribe_notify_test::service_infos_same_service_id,
469 reliability_type);
470 } else {
471 subscribe_notify_test_service its_sample(
472 subscribe_notify_test::service_infos[service_number],
473 subscribe_notify_test::service_infos,
474 reliability_type);
475 }
476 }
477
478 #ifndef _WIN32
main(int argc,char ** argv)479 int main(int argc, char** argv)
480 {
481 ::testing::InitGoogleTest(&argc, argv);
482 if(argc < 2) {
483 std::cerr << "Please specify a service number and event reliability type, like: " << argv[0] << " 2 UDP SAME_SERVICE_ID" << std::endl;
484 std::cerr << "Valid service numbers are in the range of [1,6]" << std::endl;
485 std::cerr << "Valid event reliability types are [UDP, TCP, TCP_AND_UDP]" << std::endl;
486 std::cerr << "If SAME_SERVICE_ID is specified as third parameter the test is run w/ multiple instances of the same service" << std::endl;
487 return 1;
488 }
489
490 service_number = std::stoul(std::string(argv[1]), nullptr);
491
492 if (argc >= 3) {
493 if (std::string("TCP")== std::string(argv[2])) {
494 reliability_type = vsomeip::reliability_type_e::RT_RELIABLE;
495 } else if (std::string("UDP")== std::string(argv[2])) {
496 reliability_type = vsomeip::reliability_type_e::RT_UNRELIABLE;
497 } else if (std::string("TCP_AND_UDP")== std::string(argv[2])) {
498 reliability_type = vsomeip::reliability_type_e::RT_BOTH;
499 }
500 }
501
502 if (argc >= 4 && std::string("SAME_SERVICE_ID") == std::string(argv[3])) {
503 use_same_service_id = true;
504 } else {
505 use_same_service_id = false;
506 }
507
508
509
510 return RUN_ALL_TESTS();
511 }
512 #endif
513