• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2014-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
2 // This Source Code Form is subject to the terms of the Mozilla Public
3 // License, v. 2.0. If a copy of the MPL was not distributed with this
4 // file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 
6 #include <chrono>
7 #include <condition_variable>
8 #include <iomanip>
9 #include <iostream>
10 #include <sstream>
11 #include <thread>
12 #include <map>
13 #include <algorithm>
14 #include <atomic>
15 
16 #include <gtest/gtest.h>
17 
18 #include <vsomeip/vsomeip.hpp>
19 #include <vsomeip/internal/logger.hpp>
20 
21 #include "subscribe_notify_test_globals.hpp"
22 
23 
24 class subscribe_notify_test_service {
25 public:
subscribe_notify_test_service(struct subscribe_notify_test::service_info _service_info,std::array<subscribe_notify_test::service_info,7> _service_infos,vsomeip::reliability_type_e _reliability_type)26     subscribe_notify_test_service(struct subscribe_notify_test::service_info _service_info,
27                                   std::array<subscribe_notify_test::service_info, 7> _service_infos,
28                                   vsomeip::reliability_type_e _reliability_type) :
29             service_info_(_service_info),
30             service_infos_(_service_infos),
31             app_(vsomeip::runtime::get()->create_application()),
32             wait_until_registered_(true),
33             wait_until_other_services_available_(true),
34             wait_until_notified_from_other_services_(true),
35             offer_thread_(std::bind(&subscribe_notify_test_service::run, this)),
36             wait_for_stop_(true),
37             stop_thread_(std::bind(&subscribe_notify_test_service::wait_for_stop, this)),
38             wait_for_notify_(true),
39             notify_thread_(std::bind(&subscribe_notify_test_service::notify, this)),
40             subscription_state_handler_called_(0),
41             subscription_error_occured_(false),
42             reliability_type_(_reliability_type) {
43         if (!app_->init()) {
44             ADD_FAILURE() << "Couldn't initialize application";
45             return;
46         }
47 
48         app_->register_state_handler(
49                 std::bind(&subscribe_notify_test_service::on_state, this,
50                         std::placeholders::_1));
51         app_->register_message_handler(service_info_.service_id,
52                 service_info_.instance_id, service_info_.method_id,
53                 std::bind(&subscribe_notify_test_service::on_request, this,
54                         std::placeholders::_1));
55         app_->register_message_handler(vsomeip::ANY_SERVICE,
56                 vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD,
57                 std::bind(&subscribe_notify_test_service::on_message, this,
58                         std::placeholders::_1));
59 
60         // offer event
61         std::set<vsomeip::eventgroup_t> its_eventgroups;
62         its_eventgroups.insert(service_info_.eventgroup_id);
63         app_->offer_event(service_info_.service_id, service_info_.instance_id,
64                 service_info_.event_id, its_eventgroups,
65                 vsomeip::event_type_e::ET_FIELD, std::chrono::milliseconds::zero(),
66                 false, true, nullptr, reliability_type_);
67 
68 
69         // register availability for all other services and request their event.
70         for(const auto& i : service_infos_) {
71             if ((i.service_id == service_info_.service_id
72                     && i.instance_id == service_info_.instance_id)
73                     || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
74                 continue;
75             }
76             app_->request_service(i.service_id, i.instance_id);
77             app_->register_availability_handler(i.service_id, i.instance_id,
78                     std::bind(&subscribe_notify_test_service::on_availability, this,
79                             std::placeholders::_1, std::placeholders::_2,
80                             std::placeholders::_3));
81 
82             auto handler = std::bind(&subscribe_notify_test_service::on_subscription_state_change, this,
83                     std::placeholders::_1, std::placeholders::_2,
84                     std::placeholders::_3, std::placeholders::_4, std::placeholders::_5);
85             app_->register_subscription_status_handler(i.service_id, i.instance_id, i.eventgroup_id, vsomeip::ANY_EVENT, handler);
86 
87 
88             std::set<vsomeip::eventgroup_t> its_eventgroups;
89             its_eventgroups.insert(i.eventgroup_id);
90             app_->request_event(i.service_id, i.instance_id, i.event_id, its_eventgroups, vsomeip::event_type_e::ET_FIELD, reliability_type_);
91 
92             other_services_available_[std::make_pair(i.service_id, i.instance_id)] = false;
93             other_services_received_notification_[std::make_pair(i.service_id, i.method_id)] = 0;
94         }
95 
96         // register subscription handler to detect whether or not all other
97         // other services have subscribed
98         app_->register_subscription_handler(service_info_.service_id,
99                 service_info_.instance_id, service_info_.eventgroup_id,
100                 std::bind(&subscribe_notify_test_service::on_subscription, this,
101                         std::placeholders::_1, std::placeholders::_2,
102                         std::placeholders::_3, std::placeholders::_4));
103 
104         app_->start();
105     }
106 
~subscribe_notify_test_service()107     ~subscribe_notify_test_service() {
108         offer_thread_.join();
109         stop_thread_.join();
110     }
111 
offer()112     void offer() {
113         app_->offer_service(service_info_.service_id, service_info_.instance_id);
114     }
115 
stop_offer()116     void stop_offer() {
117         app_->stop_offer_event(service_info_.service_id, service_info_.instance_id, service_info_.event_id);
118         app_->stop_offer_service(service_info_.service_id, service_info_.instance_id);
119     }
120 
on_state(vsomeip::state_type_e _state)121     void on_state(vsomeip::state_type_e _state) {
122         VSOMEIP_INFO << "Application " << app_->get_name() << " is "
123         << (_state == vsomeip::state_type_e::ST_REGISTERED ?
124                 "registered." : "deregistered.");
125 
126         if (_state == vsomeip::state_type_e::ST_REGISTERED) {
127             std::lock_guard<std::mutex> its_lock(mutex_);
128             wait_until_registered_ = false;
129             condition_.notify_one();
130         }
131     }
132 
on_availability(vsomeip::service_t _service,vsomeip::instance_t _instance,bool _is_available)133     void on_availability(vsomeip::service_t _service,
134                          vsomeip::instance_t _instance, bool _is_available) {
135         if(_is_available) {
136             auto its_service = other_services_available_.find(std::make_pair(_service, _instance));
137             if(its_service != other_services_available_.end()) {
138                 if(its_service->second != _is_available) {
139                 its_service->second = true;
140                 VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
141                         << service_info_.service_id << "] Service ["
142                 << std::setw(4) << std::setfill('0') << std::hex << _service << "." << _instance
143                 << "] is available.";
144 
145                 }
146             }
147 
148             if(std::all_of(other_services_available_.cbegin(),
149                            other_services_available_.cend(),
150                            [](const std::map<std::pair<vsomeip::service_t,
151                                    vsomeip::instance_t>, bool>::value_type& v) {
152                                 return v.second;})) {
153                 std::lock_guard<std::mutex> its_lock(mutex_);
154                 wait_until_other_services_available_ = false;
155                 condition_.notify_one();
156             }
157         }
158     }
159 
on_subscription_state_change(const vsomeip::service_t _service,const vsomeip::instance_t _instance,const vsomeip::eventgroup_t _eventgroup,const vsomeip::event_t _event,const uint16_t _error)160     void on_subscription_state_change(const vsomeip::service_t _service, const vsomeip::instance_t _instance,
161             const vsomeip::eventgroup_t _eventgroup, const vsomeip::event_t _event, const uint16_t _error) {
162         (void)_service;
163         (void)_instance;
164         (void)_eventgroup;
165         (void)_event;
166 
167         if (!_error) {
168             subscription_state_handler_called_++;
169         } else {
170             subscription_error_occured_ = true;
171             VSOMEIP_WARNING << std::hex << app_->get_client()
172                     << " : on_subscription_state_change: for service " << std::hex
173                     << _service << " received a subscription error!";
174         }
175    }
176 
on_subscription(vsomeip::client_t _client,std::uint32_t _uid,std::uint32_t _gid,bool _subscribed)177     bool on_subscription(vsomeip::client_t _client, std::uint32_t _uid, std::uint32_t _gid,
178                          bool _subscribed) {
179         (void)_uid;
180         (void)_gid;
181         std::lock_guard<std::mutex> its_lock(subscribers_mutex_);
182         static bool notified(false);
183         if (_subscribed) {
184             subscribers_.insert(_client);
185         } else {
186             subscribers_.erase(_client);
187         }
188 
189         VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
190                 << service_info_.service_id << "] " << "Client: "
191                 << std::setw(4) << std::setfill('0') << std::hex << _client
192                 << (_subscribed ? " subscribed" : " unsubscribed")
193                 << ", now have " << std::dec << subscribers_.size()
194                 << " subscribers" ;
195         // check if all other services have subscribed:
196         // -1 for placeholder in array
197         // divide by two because we only receive once subscription per remote node
198         // no matter how many clients subscribed to this eventgroup on the remote node
199         if (!notified && subscribers_.size() == (service_infos_.size() - 1) / 2 )
200         {
201             // notify the notify thread to start sending out notifications
202             std::lock_guard<std::mutex> its_lock(notify_mutex_);
203             wait_for_notify_ = false;
204             notify_condition_.notify_one();
205             notified = true;
206         }
207         return true;
208     }
209 
on_request(const std::shared_ptr<vsomeip::message> & _message)210     void on_request(const std::shared_ptr<vsomeip::message> &_message) {
211         if(_message->get_message_type() == vsomeip::message_type_e::MT_REQUEST) {
212             VSOMEIP_DEBUG << "Received a request with Client/Session [" << std::setw(4)
213             << std::setfill('0') << std::hex << _message->get_client() << "/"
214             << std::setw(4) << std::setfill('0') << std::hex
215             << _message->get_session() << "]";
216             std::shared_ptr<vsomeip::message> its_response = vsomeip::runtime::get()
217             ->create_response(_message);
218             app_->send(its_response);
219         }
220     }
221 
on_message(const std::shared_ptr<vsomeip::message> & _message)222     void on_message(const std::shared_ptr<vsomeip::message> &_message) {
223         if(_message->get_message_type() == vsomeip::message_type_e::MT_NOTIFICATION) {
224 
225             other_services_received_notification_[std::make_pair(_message->get_service(),
226                                                              _message->get_method())]++;
227 
228             VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
229             << service_info_.service_id << "] "
230             << "Received a notification with Client/Session [" << std::setw(4)
231             << std::setfill('0') << std::hex << _message->get_client() << "/"
232             << std::setw(4) << std::setfill('0') << std::hex
233             << _message->get_session() << "] from Service/Method ["
234             << std::setw(4) << std::setfill('0') << std::hex
235             << _message->get_service() << "/" << std::setw(4) << std::setfill('0')
236             << std::hex << _message->get_method() << "/" << std::dec << _message->get_length() << "] (now have: "
237             << std::dec << other_services_received_notification_[std::make_pair(_message->get_service(),
238                                                                     _message->get_method())] << ")";
239 
240             if(all_notifications_received()) {
241                 std::lock_guard<std::mutex> its_lock(stop_mutex_);
242                 wait_for_stop_ = false;
243                 stop_condition_.notify_one();
244             }
245         }
246     }
247 
all_notifications_received()248     bool all_notifications_received() {
249         return std::all_of(
250                 other_services_received_notification_.cbegin(),
251                 other_services_received_notification_.cend(),
252                 [&](const std::map<std::pair<vsomeip::service_t,
253                         vsomeip::method_t>, std::uint32_t>::value_type& v)
254                 {
255                     return v.second == subscribe_notify_test::notifications_to_send;
256                 }
257         );
258     }
259 
all_notifications_received_tcp_and_udp()260     bool all_notifications_received_tcp_and_udp() {
261         std::uint32_t received_twice(0);
262         std::uint32_t received_normal(0);
263         for(const auto &v : other_services_received_notification_) {
264             if (v.second == subscribe_notify_test::notifications_to_send * 2) {
265                 received_twice++;
266             } else if(v.second == subscribe_notify_test::notifications_to_send) {
267                 received_normal++;
268             }
269         }
270 
271         if(   received_twice == (service_infos_.size() - 1) / 2
272            && received_normal == (service_infos_.size() - 1) / 2 - 1) {
273             // routing manager stub receives the notification
274             // - twice from external nodes
275             // - and normal from all internal nodes
276             VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
277                         << service_info_.service_id << "] "
278                         << "Received notifications:"
279                         << " Normal: " << received_normal
280                         << " Twice: " << received_twice;
281             return true;
282         }
283         return false;
284     }
285 
run()286     void run() {
287         VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
288                 << service_info_.service_id << "] Running";
289         std::unique_lock<std::mutex> its_lock(mutex_);
290         while (wait_until_registered_) {
291             condition_.wait(its_lock);
292         }
293 
294         VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
295                 << service_info_.service_id << "] Offering";
296         offer();
297 
298 
299         while (wait_until_other_services_available_) {
300             condition_.wait(its_lock);
301         }
302 
303         VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
304                 << service_info_.service_id << "] Subscribing";
305         // subscribe to events of other services
306         uint32_t subscribe_count = 0;
307         for(const subscribe_notify_test::service_info& i: service_infos_) {
308             if ((i.service_id == service_info_.service_id
309                             && i.instance_id == service_info_.instance_id)
310                     || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
311                 continue;
312             }
313             ++subscribe_count;
314             app_->subscribe(i.service_id, i.instance_id, i.eventgroup_id,
315                             vsomeip::DEFAULT_MAJOR);
316             VSOMEIP_DEBUG << "[" << std::hex << service_info_.service_id
317             << "] subscribing to Service/Instance/Eventgroup ["
318             << std::setw(4) << std::setfill('0') << std::hex << i.service_id << "/"
319             << std::setw(4) << std::setfill('0') << std::hex << i.instance_id
320             << "/" << std::setw(4) << std::setfill('0') << std::hex << i.eventgroup_id <<"]";
321         }
322 
323         while (wait_until_notified_from_other_services_) {
324             condition_.wait(its_lock);
325         }
326 
327         // It is possible that we run in the case a subscription is NACKED
328         // due to TCP endpoint not completely connected when subscription
329         // is processed in the server - due to resubscribing the error handler
330         // count may differ from expected value, but its not a real but as
331         // the subscription takes places anyways and all events will be received.
332         if (!subscription_error_occured_) {
333             ASSERT_EQ(subscribe_count, subscription_state_handler_called_);
334         } else {
335             VSOMEIP_WARNING << "Subscription state handler check skipped: CallCount="
336                     << std::dec << subscription_state_handler_called_;
337         }
338     }
339 
notify()340     void notify() {
341         std::unique_lock<std::mutex> its_lock(notify_mutex_);
342         while (wait_for_notify_) {
343             notify_condition_.wait(its_lock);
344         }
345 
346         // sleep a while before starting to notify this is necessary as it's not
347         // possible to detect if _all_ clients on the remote side have
348         // successfully subscribed as we only receive once subscription per
349         // remote node no matter how many clients subscribed to this eventgroup
350         // on the remote node
351         std::this_thread::sleep_for(std::chrono::milliseconds(1000));
352 
353         VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
354                 << service_info_.service_id << "] Starting to notify";
355 
356         for(uint32_t i = 0; i < subscribe_notify_test::notifications_to_send; i++) {
357             std::shared_ptr<vsomeip::payload> its_payload =
358                     vsomeip::runtime::get()->create_payload();
359 
360             vsomeip::byte_t its_data[10] = {0};
361             for (uint32_t j = 0; j < i+1; ++j) {
362                 its_data[j] = static_cast<uint8_t>(j);
363             }
364             its_payload->set_data(its_data, i+1);
365             VSOMEIP_DEBUG << "[" << std::setw(4) << std::setfill('0') << std::hex
366                 << service_info_.service_id << "] Notifying: " << i+1;
367             app_->notify(service_info_.service_id, service_info_.instance_id,
368                     service_info_.event_id, its_payload);
369             std::this_thread::sleep_for(std::chrono::milliseconds(100));
370         }
371     }
372 
wait_for_stop()373     void wait_for_stop() {
374         std::unique_lock<std::mutex> its_lock(stop_mutex_);
375         while (wait_for_stop_) {
376             stop_condition_.wait(its_lock);
377         }
378 
379         // wait until all notifications have been sent out
380         notify_thread_.join();
381 
382         VSOMEIP_INFO << "[" << std::setw(4) << std::setfill('0') << std::hex
383                 << service_info_.service_id
384                 << "] Received notifications from all other services, going down";
385 
386         // let offer thread exit
387         {
388             std::lock_guard<std::mutex> its_lock(mutex_);
389             wait_until_notified_from_other_services_ = false;
390             condition_.notify_one();
391         }
392 
393         stop_offer();
394 
395         // ensure that the service which hosts the routing doesn't exit to early
396         if (app_->is_routing()) {
397             for (const auto& i : service_infos_) {
398                 if ((i.service_id == service_info_.service_id
399                                 && i.instance_id == service_info_.instance_id)
400                         || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
401                     continue;
402                 }
403                 while (app_->is_available(i.service_id, i.instance_id,
404                         vsomeip::ANY_MAJOR, vsomeip::ANY_MINOR)) {
405                     std::this_thread::sleep_for(std::chrono::milliseconds(100));
406                 }
407             }
408         }
409 
410         std::this_thread::sleep_for(std::chrono::seconds(1));
411         for(const auto& i : service_infos_) {
412             if ((i.service_id == service_info_.service_id
413                     && i.instance_id == service_info_.instance_id)
414                     || (i.service_id == 0xFFFF && i.instance_id == 0xFFFF)) {
415                 continue;
416             }
417             app_->unregister_subscription_status_handler(i.service_id, i.instance_id,
418                     i.eventgroup_id, vsomeip::ANY_EVENT);
419             app_->unsubscribe(i.service_id, i.instance_id, i.eventgroup_id);
420             app_->release_event(i.service_id, i.instance_id, i.event_id);
421             app_->release_service(i.service_id, i.instance_id);
422         }
423         app_->clear_all_handler();
424         app_->stop();
425     }
426 
427 private:
428     subscribe_notify_test::service_info service_info_;
429     std::array<subscribe_notify_test::service_info, 7> service_infos_;
430     std::shared_ptr<vsomeip::application> app_;
431     std::map<std::pair<vsomeip::service_t, vsomeip::instance_t>, bool> other_services_available_;
432     std::map<std::pair<vsomeip::service_t, vsomeip::method_t>, std::uint32_t> other_services_received_notification_;
433 
434     bool wait_until_registered_;
435     bool wait_until_other_services_available_;
436     bool wait_until_notified_from_other_services_;
437     std::mutex mutex_;
438     std::condition_variable condition_;
439     std::thread offer_thread_;
440 
441     bool wait_for_stop_;
442     std::mutex stop_mutex_;
443     std::condition_variable stop_condition_;
444     std::thread stop_thread_;
445 
446     std::set<vsomeip::client_t> subscribers_;
447     bool wait_for_notify_;
448     std::mutex notify_mutex_;
449     std::condition_variable notify_condition_;
450     std::thread notify_thread_;
451     std::atomic<uint32_t> subscription_state_handler_called_;
452     std::atomic<bool> subscription_error_occured_;
453 
454     std::mutex subscribers_mutex_;
455     vsomeip::reliability_type_e reliability_type_;
456 };
457 
458 static unsigned long service_number;
459 static bool use_same_service_id;
460 vsomeip::reliability_type_e reliability_type = vsomeip::reliability_type_e::RT_UNKNOWN;
461 
462 
TEST(someip_subscribe_notify_test,send_ten_notifications_to_service)463 TEST(someip_subscribe_notify_test, send_ten_notifications_to_service)
464 {
465     if(use_same_service_id) {
466         subscribe_notify_test_service its_sample(
467                 subscribe_notify_test::service_infos_same_service_id[service_number],
468                 subscribe_notify_test::service_infos_same_service_id,
469                 reliability_type);
470     } else {
471         subscribe_notify_test_service its_sample(
472                 subscribe_notify_test::service_infos[service_number],
473                 subscribe_notify_test::service_infos,
474                 reliability_type);
475     }
476 }
477 
478 #ifndef _WIN32
main(int argc,char ** argv)479 int main(int argc, char** argv)
480 {
481     ::testing::InitGoogleTest(&argc, argv);
482     if(argc < 2) {
483         std::cerr << "Please specify a service number and event reliability type, like: " << argv[0] << " 2 UDP SAME_SERVICE_ID" << std::endl;
484         std::cerr << "Valid service numbers are in the range of [1,6]" << std::endl;
485         std::cerr << "Valid event reliability types are [UDP, TCP, TCP_AND_UDP]" << std::endl;
486         std::cerr << "If SAME_SERVICE_ID is specified as third parameter the test is run w/ multiple instances of the same service" << std::endl;
487         return 1;
488     }
489 
490     service_number = std::stoul(std::string(argv[1]), nullptr);
491 
492     if (argc >= 3) {
493         if (std::string("TCP")== std::string(argv[2])) {
494             reliability_type = vsomeip::reliability_type_e::RT_RELIABLE;
495         } else if (std::string("UDP")== std::string(argv[2])) {
496             reliability_type = vsomeip::reliability_type_e::RT_UNRELIABLE;
497         } else if (std::string("TCP_AND_UDP")== std::string(argv[2])) {
498             reliability_type = vsomeip::reliability_type_e::RT_BOTH;
499         }
500     }
501 
502     if (argc >= 4 && std::string("SAME_SERVICE_ID") == std::string(argv[3])) {
503         use_same_service_id = true;
504     } else {
505         use_same_service_id = false;
506     }
507 
508 
509 
510     return RUN_ALL_TESTS();
511 }
512 #endif
513