• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_channelz.v1.channelz."""
15
16import asyncio
17import logging
18import unittest
19
20import grpc
21from grpc.experimental import aio
22from grpc_channelz.v1 import channelz
23from grpc_channelz.v1 import channelz_pb2
24from grpc_channelz.v1 import channelz_pb2_grpc
25
26from tests.unit.framework.common import test_constants
27from tests_aio.unit._test_base import AioTestBase
28
29_SUCCESSFUL_UNARY_UNARY = "/test/SuccessfulUnaryUnary"
30_FAILED_UNARY_UNARY = "/test/FailedUnaryUnary"
31_SUCCESSFUL_STREAM_STREAM = "/test/SuccessfulStreamStream"
32
33_REQUEST = b"\x00\x00\x00"
34_RESPONSE = b"\x01\x01\x01"
35
36_DISABLE_REUSE_PORT = (("grpc.so_reuseport", 0),)
37_ENABLE_CHANNELZ = (("grpc.enable_channelz", 1),)
38_DISABLE_CHANNELZ = (("grpc.enable_channelz", 0),)
39
40_LARGE_UNASSIGNED_ID = 10000
41
42
43async def _successful_unary_unary(request, servicer_context):
44    return _RESPONSE
45
46
47async def _failed_unary_unary(request, servicer_context):
48    servicer_context.set_code(grpc.StatusCode.INTERNAL)
49    servicer_context.set_details("Channelz Test Intended Failure")
50
51
52async def _successful_stream_stream(request_iterator, servicer_context):
53    async for _ in request_iterator:
54        yield _RESPONSE
55
56
57class _GenericHandler(grpc.GenericRpcHandler):
58    def service(self, handler_call_details):
59        if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
60            return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
61        elif handler_call_details.method == _FAILED_UNARY_UNARY:
62            return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
63        elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
64            return grpc.stream_stream_rpc_method_handler(
65                _successful_stream_stream
66            )
67        else:
68            return None
69
70
71class _ChannelServerPair:
72    def __init__(self):
73        self.address = ""
74        self.server = None
75        self.channel = None
76        self.server_ref_id = None
77        self.channel_ref_id = None
78
79    async def start(self):
80        # Server will enable channelz service
81        self.server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ)
82        port = self.server.add_insecure_port("[::]:0")
83        self.address = "localhost:%d" % port
84        self.server.add_generic_rpc_handlers((_GenericHandler(),))
85        await self.server.start()
86
87        # Channel will enable channelz service...
88        self.channel = aio.insecure_channel(
89            self.address, options=_ENABLE_CHANNELZ
90        )
91
92    async def bind_channelz(self, channelz_stub):
93        resp = await channelz_stub.GetTopChannels(
94            channelz_pb2.GetTopChannelsRequest(start_channel_id=0)
95        )
96        for channel in resp.channel:
97            if channel.data.target == "dns:///" + self.address:
98                self.channel_ref_id = channel.ref.channel_id
99
100        resp = await channelz_stub.GetServers(
101            channelz_pb2.GetServersRequest(start_server_id=0)
102        )
103        self.server_ref_id = resp.server[-1].ref.server_id
104
105    async def stop(self):
106        await self.channel.close()
107        await self.server.stop(None)
108
109
110async def _create_channel_server_pairs(n, channelz_stub=None):
111    """Create channel-server pairs."""
112    pairs = [_ChannelServerPair() for i in range(n)]
113    for pair in pairs:
114        await pair.start()
115        if channelz_stub:
116            await pair.bind_channelz(channelz_stub)
117    return pairs
118
119
120async def _destroy_channel_server_pairs(pairs):
121    for pair in pairs:
122        await pair.stop()
123
124
125class ChannelzServicerTest(AioTestBase):
126    async def setUp(self):
127        # This server is for Channelz info fetching only
128        # It self should not enable Channelz
129        self._server = aio.server(
130            options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ
131        )
132        port = self._server.add_insecure_port("[::]:0")
133        channelz.add_channelz_servicer(self._server)
134        await self._server.start()
135
136        # This channel is used to fetch Channelz info only
137        # Channelz should not be enabled
138        self._channel = aio.insecure_channel(
139            "localhost:%d" % port, options=_DISABLE_CHANNELZ
140        )
141        self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
142
143    async def tearDown(self):
144        await self._channel.close()
145        await self._server.stop(None)
146
147    async def _get_server_by_ref_id(self, ref_id):
148        """Server id may not be consecutive"""
149        resp = await self._channelz_stub.GetServers(
150            channelz_pb2.GetServersRequest(start_server_id=ref_id)
151        )
152        self.assertEqual(ref_id, resp.server[0].ref.server_id)
153        return resp.server[0]
154
155    async def _send_successful_unary_unary(self, pair):
156        call = pair.channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)(_REQUEST)
157        self.assertEqual(grpc.StatusCode.OK, await call.code())
158
159    async def _send_failed_unary_unary(self, pair):
160        try:
161            await pair.channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST)
162        except grpc.RpcError:
163            return
164        else:
165            self.fail("This call supposed to fail")
166
167    async def _send_successful_stream_stream(self, pair):
168        call = pair.channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)(
169            iter([_REQUEST] * test_constants.STREAM_LENGTH)
170        )
171        cnt = 0
172        async for _ in call:
173            cnt += 1
174        self.assertEqual(cnt, test_constants.STREAM_LENGTH)
175
176    async def test_get_top_channels_high_start_id(self):
177        pairs = await _create_channel_server_pairs(1)
178
179        resp = await self._channelz_stub.GetTopChannels(
180            channelz_pb2.GetTopChannelsRequest(
181                start_channel_id=_LARGE_UNASSIGNED_ID
182            )
183        )
184        self.assertEqual(len(resp.channel), 0)
185        self.assertEqual(resp.end, True)
186
187        await _destroy_channel_server_pairs(pairs)
188
189    async def test_successful_request(self):
190        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
191
192        await self._send_successful_unary_unary(pairs[0])
193        resp = await self._channelz_stub.GetChannel(
194            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)
195        )
196
197        self.assertEqual(resp.channel.data.calls_started, 1)
198        self.assertEqual(resp.channel.data.calls_succeeded, 1)
199        self.assertEqual(resp.channel.data.calls_failed, 0)
200
201        await _destroy_channel_server_pairs(pairs)
202
203    async def test_failed_request(self):
204        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
205
206        await self._send_failed_unary_unary(pairs[0])
207        resp = await self._channelz_stub.GetChannel(
208            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)
209        )
210        self.assertEqual(resp.channel.data.calls_started, 1)
211        self.assertEqual(resp.channel.data.calls_succeeded, 0)
212        self.assertEqual(resp.channel.data.calls_failed, 1)
213
214        await _destroy_channel_server_pairs(pairs)
215
216    async def test_many_requests(self):
217        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
218
219        k_success = 7
220        k_failed = 9
221        for i in range(k_success):
222            await self._send_successful_unary_unary(pairs[0])
223        for i in range(k_failed):
224            await self._send_failed_unary_unary(pairs[0])
225        resp = await self._channelz_stub.GetChannel(
226            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)
227        )
228        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
229        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
230        self.assertEqual(resp.channel.data.calls_failed, k_failed)
231
232        await _destroy_channel_server_pairs(pairs)
233
234    async def test_many_requests_many_channel(self):
235        k_channels = 4
236        pairs = await _create_channel_server_pairs(
237            k_channels, self._channelz_stub
238        )
239        k_success = 11
240        k_failed = 13
241        for i in range(k_success):
242            await self._send_successful_unary_unary(pairs[0])
243            await self._send_successful_unary_unary(pairs[2])
244        for i in range(k_failed):
245            await self._send_failed_unary_unary(pairs[1])
246            await self._send_failed_unary_unary(pairs[2])
247
248        # The first channel saw only successes
249        resp = await self._channelz_stub.GetChannel(
250            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)
251        )
252        self.assertEqual(resp.channel.data.calls_started, k_success)
253        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
254        self.assertEqual(resp.channel.data.calls_failed, 0)
255
256        # The second channel saw only failures
257        resp = await self._channelz_stub.GetChannel(
258            channelz_pb2.GetChannelRequest(channel_id=pairs[1].channel_ref_id)
259        )
260        self.assertEqual(resp.channel.data.calls_started, k_failed)
261        self.assertEqual(resp.channel.data.calls_succeeded, 0)
262        self.assertEqual(resp.channel.data.calls_failed, k_failed)
263
264        # The third channel saw both successes and failures
265        resp = await self._channelz_stub.GetChannel(
266            channelz_pb2.GetChannelRequest(channel_id=pairs[2].channel_ref_id)
267        )
268        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
269        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
270        self.assertEqual(resp.channel.data.calls_failed, k_failed)
271
272        # The fourth channel saw nothing
273        resp = await self._channelz_stub.GetChannel(
274            channelz_pb2.GetChannelRequest(channel_id=pairs[3].channel_ref_id)
275        )
276        self.assertEqual(resp.channel.data.calls_started, 0)
277        self.assertEqual(resp.channel.data.calls_succeeded, 0)
278        self.assertEqual(resp.channel.data.calls_failed, 0)
279
280        await _destroy_channel_server_pairs(pairs)
281
282    async def test_many_subchannels(self):
283        k_channels = 4
284        pairs = await _create_channel_server_pairs(
285            k_channels, self._channelz_stub
286        )
287        k_success = 17
288        k_failed = 19
289        for i in range(k_success):
290            await self._send_successful_unary_unary(pairs[0])
291            await self._send_successful_unary_unary(pairs[2])
292        for i in range(k_failed):
293            await self._send_failed_unary_unary(pairs[1])
294            await self._send_failed_unary_unary(pairs[2])
295
296        for i in range(k_channels):
297            gc_resp = await self._channelz_stub.GetChannel(
298                channelz_pb2.GetChannelRequest(
299                    channel_id=pairs[i].channel_ref_id
300                )
301            )
302            # If no call performed in the channel, there shouldn't be any subchannel
303            if gc_resp.channel.data.calls_started == 0:
304                self.assertEqual(len(gc_resp.channel.subchannel_ref), 0)
305                continue
306
307            # Otherwise, the subchannel should exist
308            self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
309            gsc_resp = await self._channelz_stub.GetSubchannel(
310                channelz_pb2.GetSubchannelRequest(
311                    subchannel_id=gc_resp.channel.subchannel_ref[
312                        0
313                    ].subchannel_id
314                )
315            )
316            self.assertEqual(
317                gc_resp.channel.data.calls_started,
318                gsc_resp.subchannel.data.calls_started,
319            )
320            self.assertEqual(
321                gc_resp.channel.data.calls_succeeded,
322                gsc_resp.subchannel.data.calls_succeeded,
323            )
324            self.assertEqual(
325                gc_resp.channel.data.calls_failed,
326                gsc_resp.subchannel.data.calls_failed,
327            )
328
329        await _destroy_channel_server_pairs(pairs)
330
331    async def test_server_call(self):
332        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
333
334        k_success = 23
335        k_failed = 29
336        for i in range(k_success):
337            await self._send_successful_unary_unary(pairs[0])
338        for i in range(k_failed):
339            await self._send_failed_unary_unary(pairs[0])
340
341        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
342        self.assertEqual(resp.data.calls_started, k_success + k_failed)
343        self.assertEqual(resp.data.calls_succeeded, k_success)
344        self.assertEqual(resp.data.calls_failed, k_failed)
345
346        await _destroy_channel_server_pairs(pairs)
347
348    async def test_many_subchannels_and_sockets(self):
349        k_channels = 4
350        pairs = await _create_channel_server_pairs(
351            k_channels, self._channelz_stub
352        )
353        k_success = 3
354        k_failed = 5
355        for i in range(k_success):
356            await self._send_successful_unary_unary(pairs[0])
357            await self._send_successful_unary_unary(pairs[2])
358        for i in range(k_failed):
359            await self._send_failed_unary_unary(pairs[1])
360            await self._send_failed_unary_unary(pairs[2])
361
362        for i in range(k_channels):
363            gc_resp = await self._channelz_stub.GetChannel(
364                channelz_pb2.GetChannelRequest(
365                    channel_id=pairs[i].channel_ref_id
366                )
367            )
368
369            # If no call performed in the channel, there shouldn't be any subchannel
370            if gc_resp.channel.data.calls_started == 0:
371                self.assertEqual(len(gc_resp.channel.subchannel_ref), 0)
372                continue
373
374            # Otherwise, the subchannel should exist
375            self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
376            gsc_resp = await self._channelz_stub.GetSubchannel(
377                channelz_pb2.GetSubchannelRequest(
378                    subchannel_id=gc_resp.channel.subchannel_ref[
379                        0
380                    ].subchannel_id
381                )
382            )
383            self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
384
385            gs_resp = await self._channelz_stub.GetSocket(
386                channelz_pb2.GetSocketRequest(
387                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id
388                )
389            )
390            self.assertEqual(
391                gsc_resp.subchannel.data.calls_started,
392                gs_resp.socket.data.streams_started,
393            )
394            self.assertEqual(0, gs_resp.socket.data.streams_failed)
395            # Calls started == messages sent, only valid for unary calls
396            self.assertEqual(
397                gsc_resp.subchannel.data.calls_started,
398                gs_resp.socket.data.messages_sent,
399            )
400
401        await _destroy_channel_server_pairs(pairs)
402
403    async def test_streaming_rpc(self):
404        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
405        # In C++, the argument for _send_successful_stream_stream is message length.
406        # Here the argument is still channel idx, to be consistent with the other two.
407        await self._send_successful_stream_stream(pairs[0])
408
409        gc_resp = await self._channelz_stub.GetChannel(
410            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)
411        )
412        self.assertEqual(gc_resp.channel.data.calls_started, 1)
413        self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
414        self.assertEqual(gc_resp.channel.data.calls_failed, 0)
415        # Subchannel exists
416        self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
417
418        while True:
419            gsc_resp = await self._channelz_stub.GetSubchannel(
420                channelz_pb2.GetSubchannelRequest(
421                    subchannel_id=gc_resp.channel.subchannel_ref[
422                        0
423                    ].subchannel_id
424                )
425            )
426            if (
427                gsc_resp.subchannel.data.calls_started
428                == gsc_resp.subchannel.data.calls_succeeded
429                + gsc_resp.subchannel.data.calls_failed
430            ):
431                break
432        self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
433        self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
434        self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
435        # Socket exists
436        self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
437
438        while True:
439            gs_resp = await self._channelz_stub.GetSocket(
440                channelz_pb2.GetSocketRequest(
441                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id
442                )
443            )
444            if (
445                gs_resp.socket.data.streams_started
446                == gs_resp.socket.data.streams_succeeded
447                + gs_resp.socket.data.streams_failed
448            ):
449                break
450        self.assertEqual(gs_resp.socket.data.streams_started, 1)
451        self.assertEqual(gs_resp.socket.data.streams_failed, 0)
452        self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
453        self.assertEqual(
454            gs_resp.socket.data.messages_sent, test_constants.STREAM_LENGTH
455        )
456        self.assertEqual(
457            gs_resp.socket.data.messages_received, test_constants.STREAM_LENGTH
458        )
459
460        await _destroy_channel_server_pairs(pairs)
461
462    async def test_server_sockets(self):
463        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
464
465        await self._send_successful_unary_unary(pairs[0])
466        await self._send_failed_unary_unary(pairs[0])
467
468        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
469        self.assertEqual(resp.data.calls_started, 2)
470        self.assertEqual(resp.data.calls_succeeded, 1)
471        self.assertEqual(resp.data.calls_failed, 1)
472
473        gss_resp = await self._channelz_stub.GetServerSockets(
474            channelz_pb2.GetServerSocketsRequest(
475                server_id=resp.ref.server_id, start_socket_id=0
476            )
477        )
478        # If the RPC call failed, it will raise a grpc.RpcError
479        # So, if there is no exception raised, considered pass
480        await _destroy_channel_server_pairs(pairs)
481
482    async def test_server_listen_sockets(self):
483        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
484
485        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
486        self.assertEqual(len(resp.listen_socket), 1)
487
488        gs_resp = await self._channelz_stub.GetSocket(
489            channelz_pb2.GetSocketRequest(
490                socket_id=resp.listen_socket[0].socket_id
491            )
492        )
493        # If the RPC call failed, it will raise a grpc.RpcError
494        # So, if there is no exception raised, considered pass
495        await _destroy_channel_server_pairs(pairs)
496
497    async def test_invalid_query_get_server(self):
498        with self.assertRaises(aio.AioRpcError) as exception_context:
499            await self._channelz_stub.GetServer(
500                channelz_pb2.GetServerRequest(server_id=_LARGE_UNASSIGNED_ID)
501            )
502        self.assertEqual(
503            grpc.StatusCode.NOT_FOUND, exception_context.exception.code()
504        )
505
506    async def test_invalid_query_get_channel(self):
507        with self.assertRaises(aio.AioRpcError) as exception_context:
508            await self._channelz_stub.GetChannel(
509                channelz_pb2.GetChannelRequest(channel_id=_LARGE_UNASSIGNED_ID)
510            )
511        self.assertEqual(
512            grpc.StatusCode.NOT_FOUND, exception_context.exception.code()
513        )
514
515    async def test_invalid_query_get_subchannel(self):
516        with self.assertRaises(aio.AioRpcError) as exception_context:
517            await self._channelz_stub.GetSubchannel(
518                channelz_pb2.GetSubchannelRequest(
519                    subchannel_id=_LARGE_UNASSIGNED_ID
520                )
521            )
522        self.assertEqual(
523            grpc.StatusCode.NOT_FOUND, exception_context.exception.code()
524        )
525
526    async def test_invalid_query_get_socket(self):
527        with self.assertRaises(aio.AioRpcError) as exception_context:
528            await self._channelz_stub.GetSocket(
529                channelz_pb2.GetSocketRequest(socket_id=_LARGE_UNASSIGNED_ID)
530            )
531        self.assertEqual(
532            grpc.StatusCode.NOT_FOUND, exception_context.exception.code()
533        )
534
535    async def test_invalid_query_get_server_sockets(self):
536        with self.assertRaises(aio.AioRpcError) as exception_context:
537            await self._channelz_stub.GetServerSockets(
538                channelz_pb2.GetServerSocketsRequest(
539                    server_id=_LARGE_UNASSIGNED_ID,
540                    start_socket_id=0,
541                )
542            )
543        self.assertEqual(
544            grpc.StatusCode.NOT_FOUND, exception_context.exception.code()
545        )
546
547
548if __name__ == "__main__":
549    logging.basicConfig(level=logging.DEBUG)
550    unittest.main(verbosity=2)
551