• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2021 The Pigweed Authors
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may not
5# use this file except in compliance with the License. You may obtain a copy of
6# the License at
7#
8#     https://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations under
14# the License.
15"""Tests using the callback client for pw_rpc."""
16
17import contextlib
18import unittest
19
20import pw_hdlc.rpc
21from pw_rpc import benchmark_pb2, testing
22from pw_status import Status
23
24ITERATIONS = 50
25
26
27class RpcIntegrationTest(unittest.TestCase):
28    """Calls RPCs on an RPC server through a socket."""
29
30    test_server_command: tuple[str, ...] = ()
31    port: int
32
33    def setUp(self) -> None:
34        self._context = pw_hdlc.rpc.HdlcRpcLocalServerAndClient(
35            self.test_server_command, self.port, [benchmark_pb2]
36        )
37        self.rpcs = self._context.client.channel(1).rpcs
38
39    def tearDown(self) -> None:
40        self._context.close()
41
42    def test_unary(self) -> None:
43        for i in range(ITERATIONS):
44            payload = f'O_o #{i}'.encode()
45            status, reply = self.rpcs.pw.rpc.Benchmark.UnaryEcho(
46                payload=payload
47            )
48            self.assertIs(status, Status.OK)
49            self.assertEqual(reply.payload, payload)
50
51    def test_bidirectional(self) -> None:
52        with self.rpcs.pw.rpc.Benchmark.BidirectionalEcho.invoke() as call:
53            responses = call.get_responses()
54
55            for i in range(ITERATIONS):
56                payload = f'O_o #{i}'.encode()
57                call.send(benchmark_pb2.Payload(payload=payload))
58
59                self.assertEqual(next(responses).payload, payload)
60
61    def test_bidirectional_call_twice(self) -> None:
62        rpc = self.rpcs.pw.rpc.Benchmark.BidirectionalEcho
63
64        for _ in range(ITERATIONS):
65            with contextlib.ExitStack() as stack:
66                first_call = stack.enter_context(rpc.invoke())
67                first_call_responses = first_call.get_responses()
68                first_call.send(payload=b'abc')
69                self.assertEqual(
70                    next(first_call_responses), rpc.response(payload=b'abc')
71                )
72                self.assertFalse(first_call.completed())
73
74                second_call = stack.enter_context(rpc.invoke())
75                second_call_responses = second_call.get_responses()
76                second_call.send(payload=b'123')
77                self.assertEqual(
78                    next(second_call_responses), rpc.response(payload=b'123')
79                )
80                self.assertFalse(second_call.completed())
81
82                # Check that issuing `second_call` did not cancel `first call`.
83                self.assertFalse(first_call.completed())
84                self.assertIs(first_call.error, None)
85
86                # Send to `first_call` again and check for a response.
87                first_call.send(payload=b'def')
88                self.assertEqual(
89                    next(first_call_responses), rpc.response(payload=b'def')
90                )
91
92
93def _main(
94    test_server_command: list[str], port: int, unittest_args: list[str]
95) -> None:
96    RpcIntegrationTest.test_server_command = tuple(test_server_command)
97    RpcIntegrationTest.port = port
98    unittest.main(argv=unittest_args)
99
100
101if __name__ == '__main__':
102    _main(**vars(testing.parse_test_server_args()))
103