// Copyright 2023 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_chrono/system_clock.h" #include "pw_rpc/client_server.h" #include "pw_rpc/synchronous_call.h" #include "pw_rpc_transport/egress_ingress.h" #include "pw_rpc_transport/internal/test.rpc.pwpb.h" #include "pw_rpc_transport/local_rpc_egress.h" #include "pw_rpc_transport/service_registry.h" #include "pw_rpc_transport/socket_rpc_transport.h" #include "pw_status/status.h" #include "pw_string/string.h" #include "pw_thread/thread.h" #include "pw_thread/thread_core.h" #include "pw_thread_stl/options.h" #include "pw_unit_test/framework.h" namespace pw::rpc { namespace { using namespace std::chrono_literals; constexpr size_t kMaxTestMessageSize = 1024; constexpr uint32_t kTestChannelId = 1; class TestService final : public pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Service< TestService> { public: Status Echo( const pw_rpc_transport::testing::pwpb::EchoMessage::Message& request, pw_rpc_transport::testing::pwpb::EchoMessage::Message& response) { response.msg = request.msg; return OkStatus(); } }; template struct SocketRpcEndpoint { explicit SocketRpcEndpoint(SocketRpcTransport& rpc_transport) : transport(rpc_transport), rpc_egress("tx", transport), tx_channels({rpc::Channel::Create(&rpc_egress)}), rx_channels({ChannelEgress{kTestChannelId, local_egress}}), rpc_ingress(rx_channels), service_registry(tx_channels) { local_egress.set_packet_processor(service_registry); transport.set_ingress(rpc_ingress); } LocalRpcEgress local_egress; SocketRpcTransport& transport; SimpleRpcEgress rpc_egress; std::array tx_channels; std::array rx_channels; SimpleRpcIngress rpc_ingress; ServiceRegistry service_registry; }; TEST(RpcIntegrationTest, SocketTransport) { constexpr size_t kMaxPacketSize = 512; constexpr size_t kLocalEgressQueueSize = 20; constexpr size_t kMessageSize = 50; SocketRpcTransport a_to_b_transport( SocketRpcTransport::kAsServer, /*port=*/0); auto a = SocketRpcEndpoint( a_to_b_transport); auto a_local_egress_thread = Thread(thread::stl::Options(), a.local_egress); auto a_transport_thread = Thread(thread::stl::Options(), a.transport); a_to_b_transport.WaitUntilReady(); SocketRpcTransport b_to_a_transport( SocketRpcTransport::kAsClient, "localhost", a_to_b_transport.port()); auto b = SocketRpcEndpoint( b_to_a_transport); auto b_local_egress_thread = Thread(thread::stl::Options(), b.local_egress); auto b_transport_thread = Thread(thread::stl::Options(), b.transport); TestService b_test_service; b.service_registry.RegisterService(b_test_service); a_to_b_transport.WaitUntilConnected(); b_to_a_transport.WaitUntilConnected(); for (int i = 0; i < 10; ++i) { InlineString test_message; test_message.append(kMessageSize, '*'); auto echo_request = pw_rpc_transport::testing::pwpb::EchoMessage::Message{ .msg = test_message}; const auto echo_response = rpc::SynchronousCall< pw_rpc_transport::testing::pw_rpc::pwpb::TestService::Echo>( a.service_registry.client_server().client(), kTestChannelId, echo_request); EXPECT_EQ(echo_response.status(), OkStatus()); EXPECT_EQ(echo_response.response().msg, test_message); } // Shut everything down. a.local_egress.Stop(); b.local_egress.Stop(); a.transport.Stop(); b.transport.Stop(); a_local_egress_thread.join(); b_local_egress_thread.join(); a_transport_thread.join(); b_transport_thread.join(); } } // namespace } // namespace pw::rpc