• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_channelz.v1.channelz."""
15
16import unittest
17import logging
18import asyncio
19
20import grpc
21from grpc.experimental import aio
22
23from grpc_channelz.v1 import channelz
24from grpc_channelz.v1 import channelz_pb2
25from grpc_channelz.v1 import channelz_pb2_grpc
26
27from tests.unit.framework.common import test_constants
28from tests_aio.unit._test_base import AioTestBase
29
30_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
31_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
32_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
33
34_REQUEST = b'\x00\x00\x00'
35_RESPONSE = b'\x01\x01\x01'
36
37_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
38_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
39_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
40
41_LARGE_UNASSIGNED_ID = 10000
42
43
44async def _successful_unary_unary(request, servicer_context):
45    return _RESPONSE
46
47
48async def _failed_unary_unary(request, servicer_context):
49    servicer_context.set_code(grpc.StatusCode.INTERNAL)
50    servicer_context.set_details("Channelz Test Intended Failure")
51
52
53async def _successful_stream_stream(request_iterator, servicer_context):
54    async for _ in request_iterator:
55        yield _RESPONSE
56
57
58class _GenericHandler(grpc.GenericRpcHandler):
59
60    def service(self, handler_call_details):
61        if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
62            return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
63        elif handler_call_details.method == _FAILED_UNARY_UNARY:
64            return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
65        elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
66            return grpc.stream_stream_rpc_method_handler(
67                _successful_stream_stream)
68        else:
69            return None
70
71
72class _ChannelServerPair:
73
74    def __init__(self):
75        self.address = ''
76        self.server = None
77        self.channel = None
78        self.server_ref_id = None
79        self.channel_ref_id = None
80
81    async def start(self):
82        # Server will enable channelz service
83        self.server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ)
84        port = self.server.add_insecure_port('[::]:0')
85        self.address = 'localhost:%d' % port
86        self.server.add_generic_rpc_handlers((_GenericHandler(),))
87        await self.server.start()
88
89        # Channel will enable channelz service...
90        self.channel = aio.insecure_channel(self.address,
91                                            options=_ENABLE_CHANNELZ)
92
93    async def bind_channelz(self, channelz_stub):
94        resp = await channelz_stub.GetTopChannels(
95            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
96        for channel in resp.channel:
97            if channel.data.target == self.address:
98                self.channel_ref_id = channel.ref.channel_id
99
100        resp = await channelz_stub.GetServers(
101            channelz_pb2.GetServersRequest(start_server_id=0))
102        self.server_ref_id = resp.server[-1].ref.server_id
103
104    async def stop(self):
105        await self.channel.close()
106        await self.server.stop(None)
107
108
109async def _create_channel_server_pairs(n, channelz_stub=None):
110    """Create channel-server pairs."""
111    pairs = [_ChannelServerPair() for i in range(n)]
112    for pair in pairs:
113        await pair.start()
114        if channelz_stub:
115            await pair.bind_channelz(channelz_stub)
116    return pairs
117
118
119async def _destroy_channel_server_pairs(pairs):
120    for pair in pairs:
121        await pair.stop()
122
123
124class ChannelzServicerTest(AioTestBase):
125
126    async def setUp(self):
127        # This server is for Channelz info fetching only
128        # It self should not enable Channelz
129        self._server = aio.server(options=_DISABLE_REUSE_PORT +
130                                  _DISABLE_CHANNELZ)
131        port = self._server.add_insecure_port('[::]:0')
132        channelz.add_channelz_servicer(self._server)
133        await self._server.start()
134
135        # This channel is used to fetch Channelz info only
136        # Channelz should not be enabled
137        self._channel = aio.insecure_channel('localhost:%d' % port,
138                                             options=_DISABLE_CHANNELZ)
139        self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
140
141    async def tearDown(self):
142        await self._channel.close()
143        await self._server.stop(None)
144
145    async def _get_server_by_ref_id(self, ref_id):
146        """Server id may not be consecutive"""
147        resp = await self._channelz_stub.GetServers(
148            channelz_pb2.GetServersRequest(start_server_id=ref_id))
149        self.assertEqual(ref_id, resp.server[0].ref.server_id)
150        return resp.server[0]
151
152    async def _send_successful_unary_unary(self, pair):
153        call = pair.channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)(_REQUEST)
154        self.assertEqual(grpc.StatusCode.OK, await call.code())
155
156    async def _send_failed_unary_unary(self, pair):
157        try:
158            await pair.channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST)
159        except grpc.RpcError:
160            return
161        else:
162            self.fail("This call supposed to fail")
163
164    async def _send_successful_stream_stream(self, pair):
165        call = pair.channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)(iter(
166            [_REQUEST] * test_constants.STREAM_LENGTH))
167        cnt = 0
168        async for _ in call:
169            cnt += 1
170        self.assertEqual(cnt, test_constants.STREAM_LENGTH)
171
172    async def test_get_top_channels_high_start_id(self):
173        pairs = await _create_channel_server_pairs(1)
174
175        resp = await self._channelz_stub.GetTopChannels(
176            channelz_pb2.GetTopChannelsRequest(
177                start_channel_id=_LARGE_UNASSIGNED_ID))
178        self.assertEqual(len(resp.channel), 0)
179        self.assertEqual(resp.end, True)
180
181        await _destroy_channel_server_pairs(pairs)
182
183    async def test_successful_request(self):
184        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
185
186        await self._send_successful_unary_unary(pairs[0])
187        resp = await self._channelz_stub.GetChannel(
188            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
189
190        self.assertEqual(resp.channel.data.calls_started, 1)
191        self.assertEqual(resp.channel.data.calls_succeeded, 1)
192        self.assertEqual(resp.channel.data.calls_failed, 0)
193
194        await _destroy_channel_server_pairs(pairs)
195
196    async def test_failed_request(self):
197        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
198
199        await self._send_failed_unary_unary(pairs[0])
200        resp = await self._channelz_stub.GetChannel(
201            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
202        self.assertEqual(resp.channel.data.calls_started, 1)
203        self.assertEqual(resp.channel.data.calls_succeeded, 0)
204        self.assertEqual(resp.channel.data.calls_failed, 1)
205
206        await _destroy_channel_server_pairs(pairs)
207
208    async def test_many_requests(self):
209        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
210
211        k_success = 7
212        k_failed = 9
213        for i in range(k_success):
214            await self._send_successful_unary_unary(pairs[0])
215        for i in range(k_failed):
216            await self._send_failed_unary_unary(pairs[0])
217        resp = await self._channelz_stub.GetChannel(
218            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
219        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
220        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
221        self.assertEqual(resp.channel.data.calls_failed, k_failed)
222
223        await _destroy_channel_server_pairs(pairs)
224
225    async def test_many_requests_many_channel(self):
226        k_channels = 4
227        pairs = await _create_channel_server_pairs(k_channels,
228                                                   self._channelz_stub)
229        k_success = 11
230        k_failed = 13
231        for i in range(k_success):
232            await self._send_successful_unary_unary(pairs[0])
233            await self._send_successful_unary_unary(pairs[2])
234        for i in range(k_failed):
235            await self._send_failed_unary_unary(pairs[1])
236            await self._send_failed_unary_unary(pairs[2])
237
238        # The first channel saw only successes
239        resp = await self._channelz_stub.GetChannel(
240            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
241        self.assertEqual(resp.channel.data.calls_started, k_success)
242        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
243        self.assertEqual(resp.channel.data.calls_failed, 0)
244
245        # The second channel saw only failures
246        resp = await self._channelz_stub.GetChannel(
247            channelz_pb2.GetChannelRequest(channel_id=pairs[1].channel_ref_id))
248        self.assertEqual(resp.channel.data.calls_started, k_failed)
249        self.assertEqual(resp.channel.data.calls_succeeded, 0)
250        self.assertEqual(resp.channel.data.calls_failed, k_failed)
251
252        # The third channel saw both successes and failures
253        resp = await self._channelz_stub.GetChannel(
254            channelz_pb2.GetChannelRequest(channel_id=pairs[2].channel_ref_id))
255        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
256        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
257        self.assertEqual(resp.channel.data.calls_failed, k_failed)
258
259        # The fourth channel saw nothing
260        resp = await self._channelz_stub.GetChannel(
261            channelz_pb2.GetChannelRequest(channel_id=pairs[3].channel_ref_id))
262        self.assertEqual(resp.channel.data.calls_started, 0)
263        self.assertEqual(resp.channel.data.calls_succeeded, 0)
264        self.assertEqual(resp.channel.data.calls_failed, 0)
265
266        await _destroy_channel_server_pairs(pairs)
267
268    async def test_many_subchannels(self):
269        k_channels = 4
270        pairs = await _create_channel_server_pairs(k_channels,
271                                                   self._channelz_stub)
272        k_success = 17
273        k_failed = 19
274        for i in range(k_success):
275            await self._send_successful_unary_unary(pairs[0])
276            await self._send_successful_unary_unary(pairs[2])
277        for i in range(k_failed):
278            await self._send_failed_unary_unary(pairs[1])
279            await self._send_failed_unary_unary(pairs[2])
280
281        for i in range(k_channels):
282            gc_resp = await self._channelz_stub.GetChannel(
283                channelz_pb2.GetChannelRequest(
284                    channel_id=pairs[i].channel_ref_id))
285            # If no call performed in the channel, there shouldn't be any subchannel
286            if gc_resp.channel.data.calls_started == 0:
287                self.assertEqual(len(gc_resp.channel.subchannel_ref), 0)
288                continue
289
290            # Otherwise, the subchannel should exist
291            self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
292            gsc_resp = await self._channelz_stub.GetSubchannel(
293                channelz_pb2.GetSubchannelRequest(
294                    subchannel_id=gc_resp.channel.subchannel_ref[0].
295                    subchannel_id))
296            self.assertEqual(gc_resp.channel.data.calls_started,
297                             gsc_resp.subchannel.data.calls_started)
298            self.assertEqual(gc_resp.channel.data.calls_succeeded,
299                             gsc_resp.subchannel.data.calls_succeeded)
300            self.assertEqual(gc_resp.channel.data.calls_failed,
301                             gsc_resp.subchannel.data.calls_failed)
302
303        await _destroy_channel_server_pairs(pairs)
304
305    async def test_server_call(self):
306        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
307
308        k_success = 23
309        k_failed = 29
310        for i in range(k_success):
311            await self._send_successful_unary_unary(pairs[0])
312        for i in range(k_failed):
313            await self._send_failed_unary_unary(pairs[0])
314
315        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
316        self.assertEqual(resp.data.calls_started, k_success + k_failed)
317        self.assertEqual(resp.data.calls_succeeded, k_success)
318        self.assertEqual(resp.data.calls_failed, k_failed)
319
320        await _destroy_channel_server_pairs(pairs)
321
322    async def test_many_subchannels_and_sockets(self):
323        k_channels = 4
324        pairs = await _create_channel_server_pairs(k_channels,
325                                                   self._channelz_stub)
326        k_success = 3
327        k_failed = 5
328        for i in range(k_success):
329            await self._send_successful_unary_unary(pairs[0])
330            await self._send_successful_unary_unary(pairs[2])
331        for i in range(k_failed):
332            await self._send_failed_unary_unary(pairs[1])
333            await self._send_failed_unary_unary(pairs[2])
334
335        for i in range(k_channels):
336            gc_resp = await self._channelz_stub.GetChannel(
337                channelz_pb2.GetChannelRequest(
338                    channel_id=pairs[i].channel_ref_id))
339
340            # If no call performed in the channel, there shouldn't be any subchannel
341            if gc_resp.channel.data.calls_started == 0:
342                self.assertEqual(len(gc_resp.channel.subchannel_ref), 0)
343                continue
344
345            # Otherwise, the subchannel should exist
346            self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
347            gsc_resp = await self._channelz_stub.GetSubchannel(
348                channelz_pb2.GetSubchannelRequest(
349                    subchannel_id=gc_resp.channel.subchannel_ref[0].
350                    subchannel_id))
351            self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
352
353            gs_resp = await self._channelz_stub.GetSocket(
354                channelz_pb2.GetSocketRequest(
355                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
356            self.assertEqual(gsc_resp.subchannel.data.calls_started,
357                             gs_resp.socket.data.streams_started)
358            self.assertEqual(0, gs_resp.socket.data.streams_failed)
359            # Calls started == messages sent, only valid for unary calls
360            self.assertEqual(gsc_resp.subchannel.data.calls_started,
361                             gs_resp.socket.data.messages_sent)
362
363        await _destroy_channel_server_pairs(pairs)
364
365    async def test_streaming_rpc(self):
366        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
367        # In C++, the argument for _send_successful_stream_stream is message length.
368        # Here the argument is still channel idx, to be consistent with the other two.
369        await self._send_successful_stream_stream(pairs[0])
370
371        gc_resp = await self._channelz_stub.GetChannel(
372            channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id))
373        self.assertEqual(gc_resp.channel.data.calls_started, 1)
374        self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
375        self.assertEqual(gc_resp.channel.data.calls_failed, 0)
376        # Subchannel exists
377        self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
378
379        gsc_resp = await self._channelz_stub.GetSubchannel(
380            channelz_pb2.GetSubchannelRequest(
381                subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
382        self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
383        self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
384        self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
385        # Socket exists
386        self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
387
388        gs_resp = await self._channelz_stub.GetSocket(
389            channelz_pb2.GetSocketRequest(
390                socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
391        self.assertEqual(gs_resp.socket.data.streams_started, 1)
392        self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
393        self.assertEqual(gs_resp.socket.data.streams_failed, 0)
394        self.assertEqual(gs_resp.socket.data.messages_sent,
395                         test_constants.STREAM_LENGTH)
396        self.assertEqual(gs_resp.socket.data.messages_received,
397                         test_constants.STREAM_LENGTH)
398
399        await _destroy_channel_server_pairs(pairs)
400
401    async def test_server_sockets(self):
402        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
403
404        await self._send_successful_unary_unary(pairs[0])
405        await self._send_failed_unary_unary(pairs[0])
406
407        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
408        self.assertEqual(resp.data.calls_started, 2)
409        self.assertEqual(resp.data.calls_succeeded, 1)
410        self.assertEqual(resp.data.calls_failed, 1)
411
412        gss_resp = await self._channelz_stub.GetServerSockets(
413            channelz_pb2.GetServerSocketsRequest(server_id=resp.ref.server_id,
414                                                 start_socket_id=0))
415        # If the RPC call failed, it will raise a grpc.RpcError
416        # So, if there is no exception raised, considered pass
417        await _destroy_channel_server_pairs(pairs)
418
419    async def test_server_listen_sockets(self):
420        pairs = await _create_channel_server_pairs(1, self._channelz_stub)
421
422        resp = await self._get_server_by_ref_id(pairs[0].server_ref_id)
423        self.assertEqual(len(resp.listen_socket), 1)
424
425        gs_resp = await self._channelz_stub.GetSocket(
426            channelz_pb2.GetSocketRequest(
427                socket_id=resp.listen_socket[0].socket_id))
428        # If the RPC call failed, it will raise a grpc.RpcError
429        # So, if there is no exception raised, considered pass
430        await _destroy_channel_server_pairs(pairs)
431
432    async def test_invalid_query_get_server(self):
433        with self.assertRaises(aio.AioRpcError) as exception_context:
434            await self._channelz_stub.GetServer(
435                channelz_pb2.GetServerRequest(server_id=_LARGE_UNASSIGNED_ID))
436        self.assertEqual(grpc.StatusCode.NOT_FOUND,
437                         exception_context.exception.code())
438
439    async def test_invalid_query_get_channel(self):
440        with self.assertRaises(aio.AioRpcError) as exception_context:
441            await self._channelz_stub.GetChannel(
442                channelz_pb2.GetChannelRequest(channel_id=_LARGE_UNASSIGNED_ID))
443        self.assertEqual(grpc.StatusCode.NOT_FOUND,
444                         exception_context.exception.code())
445
446    async def test_invalid_query_get_subchannel(self):
447        with self.assertRaises(aio.AioRpcError) as exception_context:
448            await self._channelz_stub.GetSubchannel(
449                channelz_pb2.GetSubchannelRequest(
450                    subchannel_id=_LARGE_UNASSIGNED_ID))
451        self.assertEqual(grpc.StatusCode.NOT_FOUND,
452                         exception_context.exception.code())
453
454    async def test_invalid_query_get_socket(self):
455        with self.assertRaises(aio.AioRpcError) as exception_context:
456            await self._channelz_stub.GetSocket(
457                channelz_pb2.GetSocketRequest(socket_id=_LARGE_UNASSIGNED_ID))
458        self.assertEqual(grpc.StatusCode.NOT_FOUND,
459                         exception_context.exception.code())
460
461    async def test_invalid_query_get_server_sockets(self):
462        with self.assertRaises(aio.AioRpcError) as exception_context:
463            await self._channelz_stub.GetServerSockets(
464                channelz_pb2.GetServerSocketsRequest(
465                    server_id=_LARGE_UNASSIGNED_ID,
466                    start_socket_id=0,
467                ))
468        self.assertEqual(grpc.StatusCode.NOT_FOUND,
469                         exception_context.exception.code())
470
471
472if __name__ == '__main__':
473    logging.basicConfig(level=logging.DEBUG)
474    unittest.main(verbosity=2)
475