// Copyright (C) 2015-2017 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. #include #include #include #include #include #include #include #include #include // for isfinite #include #include "cpu_load_test_globals.hpp" #include #include "cpu_load_measurer.hpp" // for getpid #include #include enum protocol_e { PR_UNKNOWN, PR_TCP, PR_UDP }; class cpu_load_test_client { public: cpu_load_test_client(protocol_e _protocol, std::uint32_t _number_of_calls, std::uint32_t _payload_size, bool _call_service_sync, bool _shutdown_service) : protocol_(_protocol), app_(vsomeip::runtime::get()->create_application("cpu_load_test_client")), request_(vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP)), call_service_sync_(_call_service_sync), shutdown_service_at_end_(_shutdown_service), sliding_window_size_(_number_of_calls), wait_for_availability_(true), is_available_(false), number_of_calls_(_number_of_calls), number_of_calls_current_(0), number_of_sent_messages_(0), number_of_sent_messages_total_(0), number_of_acknowledged_messages_(0), payload_size_(_payload_size), wait_for_all_msg_acknowledged_(true), initialized_(false), sender_(std::bind(&cpu_load_test_client::run, this)) { if (!app_->init()) { ADD_FAILURE() << "Couldn't initialize application"; return; } initialized_ = true; app_->register_state_handler( std::bind(&cpu_load_test_client::on_state, this, std::placeholders::_1)); app_->register_message_handler(vsomeip::ANY_SERVICE, vsomeip::ANY_INSTANCE, vsomeip::ANY_METHOD, std::bind(&cpu_load_test_client::on_message, this, std::placeholders::_1)); app_->register_availability_handler(cpu_load_test::service_id, cpu_load_test::instance_id, std::bind(&cpu_load_test_client::on_availability, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); VSOMEIP_INFO << "Starting..."; app_->start(); } ~cpu_load_test_client() { { std::lock_guard its_lock(mutex_); wait_for_availability_ = false; condition_.notify_one(); } { std::lock_guard its_lock(all_msg_acknowledged_mutex_); wait_for_all_msg_acknowledged_ = false; all_msg_acknowledged_cv_.notify_one(); } sender_.join(); } private: void stop() { VSOMEIP_INFO << "Stopping..."; // shutdown the service if(shutdown_service_at_end_) { shutdown_service(); } app_->clear_all_handler(); } void on_state(vsomeip::state_type_e _state) { if(_state == vsomeip::state_type_e::ST_REGISTERED) { app_->request_service(cpu_load_test::service_id, cpu_load_test::instance_id); } } void on_availability(vsomeip::service_t _service, vsomeip::instance_t _instance, bool _is_available) { VSOMEIP_INFO << "Service [" << std::setw(4) << std::setfill('0') << std::hex << _service << "." << _instance << "] is " << (_is_available ? "available." : "NOT available."); if (cpu_load_test::service_id == _service && cpu_load_test::instance_id == _instance) { if (is_available_ && !_is_available) { is_available_ = false; } else if (_is_available && !is_available_) { is_available_ = true; std::lock_guard its_lock(mutex_); wait_for_availability_ = false; condition_.notify_one(); } } } void on_message(const std::shared_ptr &_response) { number_of_acknowledged_messages_++; ASSERT_EQ(_response->get_service(), cpu_load_test::service_id); ASSERT_EQ(_response->get_method(), cpu_load_test::method_id); if(call_service_sync_) { // We notify the sender thread every time a message was acknowledged std::lock_guard lk(all_msg_acknowledged_mutex_); wait_for_all_msg_acknowledged_ = false; all_msg_acknowledged_cv_.notify_one(); } else { // We notify the sender thread only if all sent messages have been acknowledged if(number_of_acknowledged_messages_ == number_of_calls_current_) { std::lock_guard lk(all_msg_acknowledged_mutex_); number_of_acknowledged_messages_ = 0; wait_for_all_msg_acknowledged_ = false; all_msg_acknowledged_cv_.notify_one(); } else if(number_of_acknowledged_messages_ % sliding_window_size_ == 0) { std::lock_guard lk(all_msg_acknowledged_mutex_); wait_for_all_msg_acknowledged_ = false; all_msg_acknowledged_cv_.notify_one(); } } } void run() { std::unique_lock its_lock(mutex_); while (wait_for_availability_) { condition_.wait(its_lock); } request_->set_service(cpu_load_test::service_id); request_->set_instance(cpu_load_test::instance_id); request_->set_method(cpu_load_test::method_id); std::shared_ptr payload = vsomeip::runtime::get()->create_payload(); std::vector payload_data; payload_data.assign(payload_size_, cpu_load_test::load_test_data); payload->set_data(payload_data); request_->set_payload(payload); // lock the mutex for(std::uint32_t i=0; i <= number_of_calls_; i++) { number_of_calls_current_ = i; sliding_window_size_ = i; std::unique_lock lk(all_msg_acknowledged_mutex_); call_service_sync_ ? send_messages_sync(lk, i) : send_messages_async(lk, i); } const double average_load(std::accumulate(results_.begin(), results_.end(), 0.0) / static_cast(results_.size())); VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_ << " messages in total (excluding control messages). This caused: " << std::fixed << std::setprecision(2) << average_load << "% load in average (average of " << results_.size() << " measurements)."; std::vector results_no_zero; for(const auto &v : results_) { if(v > 0.0) { results_no_zero.push_back(v); } } const double average_load_no_zero(std::accumulate(results_no_zero.begin(), results_no_zero.end(), 0.0) / static_cast(results_no_zero.size())); VSOMEIP_INFO << "Sent: " << number_of_sent_messages_total_ << " messages in total (excluding control messages). This caused: " << std::fixed << std::setprecision(2) << average_load_no_zero << "% load in average, if measured " << "cpu load was greater zero (average of " << results_no_zero.size() << " measurements)."; wait_for_availability_ = true; stop(); if (initialized_) { app_->stop(); } } void send_messages_sync(std::unique_lock& lk, std::uint32_t _messages_to_send) { cpu_load_measurer c(static_cast(::getpid())); send_service_start_measuring(true); c.start(); for (number_of_sent_messages_ = 0; number_of_sent_messages_ < _messages_to_send; number_of_sent_messages_++, number_of_sent_messages_total_++) { app_->send(request_); // wait until the send messages has been acknowledged while(wait_for_all_msg_acknowledged_) { all_msg_acknowledged_cv_.wait(lk); } wait_for_all_msg_acknowledged_ = true; } c.stop(); send_service_start_measuring(false); VSOMEIP_DEBUG << "Synchronously sent " << std::setw(4) << std::setfill('0') << number_of_sent_messages_ << " messages. CPU load [%]: " << std::fixed << std::setprecision(2) << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0); results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0); } void send_messages_async(std::unique_lock& lk, std::uint32_t _messages_to_send) { cpu_load_measurer c(static_cast(::getpid())); send_service_start_measuring(true); c.start(); for (number_of_sent_messages_ = 0; number_of_sent_messages_ < _messages_to_send; number_of_sent_messages_++, number_of_sent_messages_total_++) { app_->send(request_); if((number_of_sent_messages_+1) % sliding_window_size_ == 0) { // wait until all send messages have been acknowledged while(wait_for_all_msg_acknowledged_) { all_msg_acknowledged_cv_.wait(lk); } wait_for_all_msg_acknowledged_ = true; } } c.stop(); send_service_start_measuring(false); VSOMEIP_DEBUG << "Asynchronously sent " << std::setw(4) << std::setfill('0') << number_of_sent_messages_ << " messages. CPU load [%]: " << std::fixed << std::setprecision(2) << (std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0); results_.push_back(std::isfinite(c.get_cpu_load()) ? c.get_cpu_load() : 0.0); } void send_service_start_measuring(bool _start_measuring) { std::shared_ptr m = vsomeip::runtime::get()->create_request(protocol_ == protocol_e::PR_TCP); m->set_service(cpu_load_test::service_id); m->set_instance(cpu_load_test::instance_id); _start_measuring ? m->set_method(cpu_load_test::method_id_cpu_measure_start) : m->set_method(cpu_load_test::method_id_cpu_measure_stop); app_->send(m); } void shutdown_service() { request_->set_service(cpu_load_test::service_id); request_->set_instance(cpu_load_test::instance_id); request_->set_method(cpu_load_test::method_id_shutdown); app_->send(request_); } private: protocol_e protocol_; std::shared_ptr app_; std::shared_ptr request_; bool call_service_sync_; bool shutdown_service_at_end_; std::uint32_t sliding_window_size_; std::mutex mutex_; std::condition_variable condition_; bool wait_for_availability_; bool is_available_; const std::uint32_t number_of_calls_; std::uint32_t number_of_calls_current_; std::uint32_t number_of_sent_messages_; std::uint32_t number_of_sent_messages_total_; std::uint32_t number_of_acknowledged_messages_; std::uint32_t payload_size_; bool wait_for_all_msg_acknowledged_; std::mutex all_msg_acknowledged_mutex_; std::condition_variable all_msg_acknowledged_cv_; std::vector results_; std::atomic initialized_; std::thread sender_; }; // this variables are changed via cmdline parameters static protocol_e protocol(protocol_e::PR_UNKNOWN); static std::uint32_t number_of_calls(0); static std::uint32_t payload_size(40); static bool call_service_sync(true); static bool shutdown_service(true); TEST(someip_load_test, DISABLED_send_messages_and_measure_cpu_load) { cpu_load_test_client test_client_(protocol, number_of_calls, payload_size, call_service_sync, shutdown_service); } #ifndef _WIN32 int main(int argc, char** argv) { int i = 0; while (i < argc) { if(std::string("--protocol") == std::string(argv[i]) || std::string("-p") == std::string(argv[i])) { if(std::string("udp") == std::string(argv[i+1]) || std::string("UDP") == std::string(argv[i+1])) { protocol = protocol_e::PR_UDP; i++; } else if(std::string("tcp") == std::string(argv[i+1]) || std::string("TCP") == std::string(argv[i+1])) { protocol = protocol_e::PR_TCP; i++; } } else if(std::string("--calls") == std::string(argv[i]) || std::string("-c") == std::string(argv[i])) { try { number_of_calls = static_cast(std::stoul(std::string(argv[i+1]), nullptr, 10)); } catch (const std::exception &e) { std::cerr << "Please specify a valid value for number of calls" << std::endl; return(EXIT_FAILURE); } i++; } else if(std::string("--mode") == std::string(argv[i]) || std::string("-m") == std::string(argv[i])) { if(std::string("sync") == std::string(argv[i+1]) || std::string("SYNC") == std::string(argv[i+1])) { call_service_sync = true; i++; } else if(std::string("async") == std::string(argv[i+1]) || std::string("ASYNC") == std::string(argv[i+1])) { call_service_sync = false; i++; } } else if(std::string("--payload-size") == std::string(argv[i]) || std::string("-pl") == std::string(argv[i])) { try { payload_size = static_cast(std::stoul(std::string(argv[i+1]), nullptr, 10)); } catch (const std::exception &e) { std::cerr << "Please specify a valid values for payload size" << std::endl; return(EXIT_FAILURE); } i++; } else if(std::string("--help") == std::string(argv[i]) || std::string("-h") == std::string(argv[i])) { std::cout << "Available options:" << std::endl; std::cout << "--protocol|-p: valid values TCP or UDP" << std::endl; std::cout << "--calls|-c: number of message calls to do" << std::endl; std::cout << "--mode|-m: mode sync or async" << std::endl; std::cout << "--payload-size|-pl: payload size in Bytes default: 40" << std::endl; } i++; } if(protocol == protocol_e::PR_UNKNOWN) { std::cerr << "Please specify valid protocol mode, see --help" << std::endl; return(EXIT_FAILURE); } if(!number_of_calls) { std::cerr << "Please specify valid number of calls, see --help" << std::endl; return(EXIT_FAILURE); } ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } #endif