1#!/usr/bin/env python3 2# Copyright 2021 The Pigweed Authors 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); you may not 5# use this file except in compliance with the License. You may obtain a copy of 6# the License at 7# 8# https://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13# License for the specific language governing permissions and limitations under 14# the License. 15"""Tests using the callback client for pw_rpc.""" 16 17import contextlib 18import unittest 19 20import pw_hdlc.rpc 21from pw_rpc import benchmark_pb2, testing 22from pw_status import Status 23 24ITERATIONS = 50 25 26 27class RpcIntegrationTest(unittest.TestCase): 28 """Calls RPCs on an RPC server through a socket.""" 29 30 test_server_command: tuple[str, ...] = () 31 port: int 32 33 def setUp(self) -> None: 34 self._context = pw_hdlc.rpc.HdlcRpcLocalServerAndClient( 35 self.test_server_command, self.port, [benchmark_pb2] 36 ) 37 self.rpcs = self._context.client.channel(1).rpcs 38 39 def tearDown(self) -> None: 40 self._context.close() 41 42 def test_unary(self) -> None: 43 for i in range(ITERATIONS): 44 payload = f'O_o #{i}'.encode() 45 status, reply = self.rpcs.pw.rpc.Benchmark.UnaryEcho( 46 payload=payload 47 ) 48 self.assertIs(status, Status.OK) 49 self.assertEqual(reply.payload, payload) 50 51 def test_bidirectional(self) -> None: 52 with self.rpcs.pw.rpc.Benchmark.BidirectionalEcho.invoke() as call: 53 responses = call.get_responses() 54 55 for i in range(ITERATIONS): 56 payload = f'O_o #{i}'.encode() 57 call.send(benchmark_pb2.Payload(payload=payload)) 58 59 self.assertEqual(next(responses).payload, payload) 60 61 def test_bidirectional_call_twice(self) -> None: 62 rpc = self.rpcs.pw.rpc.Benchmark.BidirectionalEcho 63 64 for _ in range(ITERATIONS): 65 with contextlib.ExitStack() as stack: 66 first_call = stack.enter_context(rpc.invoke()) 67 first_call_responses = first_call.get_responses() 68 first_call.send(payload=b'abc') 69 self.assertEqual( 70 next(first_call_responses), rpc.response(payload=b'abc') 71 ) 72 self.assertFalse(first_call.completed()) 73 74 second_call = stack.enter_context(rpc.invoke()) 75 second_call_responses = second_call.get_responses() 76 second_call.send(payload=b'123') 77 self.assertEqual( 78 next(second_call_responses), rpc.response(payload=b'123') 79 ) 80 self.assertFalse(second_call.completed()) 81 82 # Check that issuing `second_call` did not cancel `first call`. 83 self.assertFalse(first_call.completed()) 84 self.assertIs(first_call.error, None) 85 86 # Send to `first_call` again and check for a response. 87 first_call.send(payload=b'def') 88 self.assertEqual( 89 next(first_call_responses), rpc.response(payload=b'def') 90 ) 91 92 93def _main( 94 test_server_command: list[str], port: int, unittest_args: list[str] 95) -> None: 96 RpcIntegrationTest.test_server_command = tuple(test_server_command) 97 RpcIntegrationTest.port = port 98 unittest.main(argv=unittest_args) 99 100 101if __name__ == '__main__': 102 _main(**vars(testing.parse_test_server_args())) 103