• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2018 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests of grpc_channelz.v1.channelz."""
15
16import unittest
17
18from concurrent import futures
19
20import grpc
21
22from grpc_channelz.v1 import channelz
23from grpc_channelz.v1 import channelz_pb2
24from grpc_channelz.v1 import channelz_pb2_grpc
25
26from tests.unit import test_common
27from tests.unit.framework.common import test_constants
28
29_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary'
30_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary'
31_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream'
32
33_REQUEST = b'\x00\x00\x00'
34_RESPONSE = b'\x01\x01\x01'
35
36_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),)
37_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),)
38_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),)
39
40
41def _successful_unary_unary(request, servicer_context):
42    return _RESPONSE
43
44
45def _failed_unary_unary(request, servicer_context):
46    servicer_context.set_code(grpc.StatusCode.INTERNAL)
47    servicer_context.set_details("Channelz Test Intended Failure")
48
49
50def _successful_stream_stream(request_iterator, servicer_context):
51    for _ in request_iterator:
52        yield _RESPONSE
53
54
55class _GenericHandler(grpc.GenericRpcHandler):
56
57    def service(self, handler_call_details):
58        if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY:
59            return grpc.unary_unary_rpc_method_handler(_successful_unary_unary)
60        elif handler_call_details.method == _FAILED_UNARY_UNARY:
61            return grpc.unary_unary_rpc_method_handler(_failed_unary_unary)
62        elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM:
63            return grpc.stream_stream_rpc_method_handler(
64                _successful_stream_stream)
65        else:
66            return None
67
68
69class _ChannelServerPair(object):
70
71    def __init__(self):
72        # Server will enable channelz service
73        self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
74                                  options=_DISABLE_REUSE_PORT +
75                                  _ENABLE_CHANNELZ)
76        port = self.server.add_insecure_port('[::]:0')
77        self.server.add_generic_rpc_handlers((_GenericHandler(),))
78        self.server.start()
79
80        # Channel will enable channelz service...
81        self.channel = grpc.insecure_channel('localhost:%d' % port,
82                                             _ENABLE_CHANNELZ)
83
84
85def _generate_channel_server_pairs(n):
86    return [_ChannelServerPair() for i in range(n)]
87
88
89def _close_channel_server_pairs(pairs):
90    for pair in pairs:
91        pair.server.stop(None)
92        pair.channel.close()
93
94
95class ChannelzServicerTest(unittest.TestCase):
96
97    def _send_successful_unary_unary(self, idx):
98        _, r = self._pairs[idx].channel.unary_unary(
99            _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST)
100        self.assertEqual(r.code(), grpc.StatusCode.OK)
101
102    def _send_failed_unary_unary(self, idx):
103        try:
104            self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call(
105                _REQUEST)
106        except grpc.RpcError:
107            return
108        else:
109            self.fail("This call supposed to fail")
110
111    def _send_successful_stream_stream(self, idx):
112        response_iterator = self._pairs[idx].channel.stream_stream(
113            _SUCCESSFUL_STREAM_STREAM).__call__(
114                iter([_REQUEST] * test_constants.STREAM_LENGTH))
115        cnt = 0
116        for _ in response_iterator:
117            cnt += 1
118        self.assertEqual(cnt, test_constants.STREAM_LENGTH)
119
120    def _get_channel_id(self, idx):
121        """Channel id may not be consecutive"""
122        resp = self._channelz_stub.GetTopChannels(
123            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
124        self.assertGreater(len(resp.channel), idx)
125        return resp.channel[idx].ref.channel_id
126
127    def setUp(self):
128        self._pairs = []
129        # This server is for Channelz info fetching only
130        # It self should not enable Channelz
131        self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3),
132                                   options=_DISABLE_REUSE_PORT +
133                                   _DISABLE_CHANNELZ)
134        port = self._server.add_insecure_port('[::]:0')
135        channelz.add_channelz_servicer(self._server)
136        self._server.start()
137
138        # This channel is used to fetch Channelz info only
139        # Channelz should not be enabled
140        self._channel = grpc.insecure_channel('localhost:%d' % port,
141                                              _DISABLE_CHANNELZ)
142        self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel)
143
144    def tearDown(self):
145        self._server.stop(None)
146        self._channel.close()
147        _close_channel_server_pairs(self._pairs)
148
149    def test_get_top_channels_basic(self):
150        self._pairs = _generate_channel_server_pairs(1)
151        resp = self._channelz_stub.GetTopChannels(
152            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
153        self.assertEqual(len(resp.channel), 1)
154        self.assertEqual(resp.end, True)
155
156    def test_get_top_channels_high_start_id(self):
157        self._pairs = _generate_channel_server_pairs(1)
158        resp = self._channelz_stub.GetTopChannels(
159            channelz_pb2.GetTopChannelsRequest(start_channel_id=10000))
160        self.assertEqual(len(resp.channel), 0)
161        self.assertEqual(resp.end, True)
162
163    def test_successful_request(self):
164        self._pairs = _generate_channel_server_pairs(1)
165        self._send_successful_unary_unary(0)
166        resp = self._channelz_stub.GetChannel(
167            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
168        self.assertEqual(resp.channel.data.calls_started, 1)
169        self.assertEqual(resp.channel.data.calls_succeeded, 1)
170        self.assertEqual(resp.channel.data.calls_failed, 0)
171
172    def test_failed_request(self):
173        self._pairs = _generate_channel_server_pairs(1)
174        self._send_failed_unary_unary(0)
175        resp = self._channelz_stub.GetChannel(
176            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
177        self.assertEqual(resp.channel.data.calls_started, 1)
178        self.assertEqual(resp.channel.data.calls_succeeded, 0)
179        self.assertEqual(resp.channel.data.calls_failed, 1)
180
181    def test_many_requests(self):
182        self._pairs = _generate_channel_server_pairs(1)
183        k_success = 7
184        k_failed = 9
185        for i in range(k_success):
186            self._send_successful_unary_unary(0)
187        for i in range(k_failed):
188            self._send_failed_unary_unary(0)
189        resp = self._channelz_stub.GetChannel(
190            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
191        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
192        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
193        self.assertEqual(resp.channel.data.calls_failed, k_failed)
194
195    def test_many_channel(self):
196        k_channels = 4
197        self._pairs = _generate_channel_server_pairs(k_channels)
198        resp = self._channelz_stub.GetTopChannels(
199            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
200        self.assertEqual(len(resp.channel), k_channels)
201
202    def test_many_requests_many_channel(self):
203        k_channels = 4
204        self._pairs = _generate_channel_server_pairs(k_channels)
205        k_success = 11
206        k_failed = 13
207        for i in range(k_success):
208            self._send_successful_unary_unary(0)
209            self._send_successful_unary_unary(2)
210        for i in range(k_failed):
211            self._send_failed_unary_unary(1)
212            self._send_failed_unary_unary(2)
213
214        # The first channel saw only successes
215        resp = self._channelz_stub.GetChannel(
216            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
217        self.assertEqual(resp.channel.data.calls_started, k_success)
218        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
219        self.assertEqual(resp.channel.data.calls_failed, 0)
220
221        # The second channel saw only failures
222        resp = self._channelz_stub.GetChannel(
223            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1)))
224        self.assertEqual(resp.channel.data.calls_started, k_failed)
225        self.assertEqual(resp.channel.data.calls_succeeded, 0)
226        self.assertEqual(resp.channel.data.calls_failed, k_failed)
227
228        # The third channel saw both successes and failures
229        resp = self._channelz_stub.GetChannel(
230            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2)))
231        self.assertEqual(resp.channel.data.calls_started, k_success + k_failed)
232        self.assertEqual(resp.channel.data.calls_succeeded, k_success)
233        self.assertEqual(resp.channel.data.calls_failed, k_failed)
234
235        # The fourth channel saw nothing
236        resp = self._channelz_stub.GetChannel(
237            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3)))
238        self.assertEqual(resp.channel.data.calls_started, 0)
239        self.assertEqual(resp.channel.data.calls_succeeded, 0)
240        self.assertEqual(resp.channel.data.calls_failed, 0)
241
242    def test_many_subchannels(self):
243        k_channels = 4
244        self._pairs = _generate_channel_server_pairs(k_channels)
245        k_success = 17
246        k_failed = 19
247        for i in range(k_success):
248            self._send_successful_unary_unary(0)
249            self._send_successful_unary_unary(2)
250        for i in range(k_failed):
251            self._send_failed_unary_unary(1)
252            self._send_failed_unary_unary(2)
253
254        gtc_resp = self._channelz_stub.GetTopChannels(
255            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
256        self.assertEqual(len(gtc_resp.channel), k_channels)
257        for i in range(k_channels):
258            # If no call performed in the channel, there shouldn't be any subchannel
259            if gtc_resp.channel[i].data.calls_started == 0:
260                self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
261                continue
262
263            # Otherwise, the subchannel should exist
264            self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
265            gsc_resp = self._channelz_stub.GetSubchannel(
266                channelz_pb2.GetSubchannelRequest(
267                    subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
268                    subchannel_id))
269            self.assertEqual(gtc_resp.channel[i].data.calls_started,
270                             gsc_resp.subchannel.data.calls_started)
271            self.assertEqual(gtc_resp.channel[i].data.calls_succeeded,
272                             gsc_resp.subchannel.data.calls_succeeded)
273            self.assertEqual(gtc_resp.channel[i].data.calls_failed,
274                             gsc_resp.subchannel.data.calls_failed)
275
276    def test_server_basic(self):
277        self._pairs = _generate_channel_server_pairs(1)
278        resp = self._channelz_stub.GetServers(
279            channelz_pb2.GetServersRequest(start_server_id=0))
280        self.assertEqual(len(resp.server), 1)
281
282    def test_get_one_server(self):
283        self._pairs = _generate_channel_server_pairs(1)
284        gss_resp = self._channelz_stub.GetServers(
285            channelz_pb2.GetServersRequest(start_server_id=0))
286        self.assertEqual(len(gss_resp.server), 1)
287        gs_resp = self._channelz_stub.GetServer(
288            channelz_pb2.GetServerRequest(
289                server_id=gss_resp.server[0].ref.server_id))
290        self.assertEqual(gss_resp.server[0].ref.server_id,
291                         gs_resp.server.ref.server_id)
292
293    def test_server_call(self):
294        self._pairs = _generate_channel_server_pairs(1)
295        k_success = 23
296        k_failed = 29
297        for i in range(k_success):
298            self._send_successful_unary_unary(0)
299        for i in range(k_failed):
300            self._send_failed_unary_unary(0)
301
302        resp = self._channelz_stub.GetServers(
303            channelz_pb2.GetServersRequest(start_server_id=0))
304        self.assertEqual(len(resp.server), 1)
305        self.assertEqual(resp.server[0].data.calls_started,
306                         k_success + k_failed)
307        self.assertEqual(resp.server[0].data.calls_succeeded, k_success)
308        self.assertEqual(resp.server[0].data.calls_failed, k_failed)
309
310    def test_many_subchannels_and_sockets(self):
311        k_channels = 4
312        self._pairs = _generate_channel_server_pairs(k_channels)
313        k_success = 3
314        k_failed = 5
315        for i in range(k_success):
316            self._send_successful_unary_unary(0)
317            self._send_successful_unary_unary(2)
318        for i in range(k_failed):
319            self._send_failed_unary_unary(1)
320            self._send_failed_unary_unary(2)
321
322        gtc_resp = self._channelz_stub.GetTopChannels(
323            channelz_pb2.GetTopChannelsRequest(start_channel_id=0))
324        self.assertEqual(len(gtc_resp.channel), k_channels)
325        for i in range(k_channels):
326            # If no call performed in the channel, there shouldn't be any subchannel
327            if gtc_resp.channel[i].data.calls_started == 0:
328                self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0)
329                continue
330
331            # Otherwise, the subchannel should exist
332            self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0)
333            gsc_resp = self._channelz_stub.GetSubchannel(
334                channelz_pb2.GetSubchannelRequest(
335                    subchannel_id=gtc_resp.channel[i].subchannel_ref[0].
336                    subchannel_id))
337            self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
338
339            gs_resp = self._channelz_stub.GetSocket(
340                channelz_pb2.GetSocketRequest(
341                    socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
342            self.assertEqual(gsc_resp.subchannel.data.calls_started,
343                             gs_resp.socket.data.streams_started)
344            self.assertEqual(gsc_resp.subchannel.data.calls_started,
345                             gs_resp.socket.data.streams_succeeded)
346            # Calls started == messages sent, only valid for unary calls
347            self.assertEqual(gsc_resp.subchannel.data.calls_started,
348                             gs_resp.socket.data.messages_sent)
349            # Only receive responses when the RPC was successful
350            self.assertEqual(gsc_resp.subchannel.data.calls_succeeded,
351                             gs_resp.socket.data.messages_received)
352
353    def test_streaming_rpc(self):
354        self._pairs = _generate_channel_server_pairs(1)
355        # In C++, the argument for _send_successful_stream_stream is message length.
356        # Here the argument is still channel idx, to be consistent with the other two.
357        self._send_successful_stream_stream(0)
358
359        gc_resp = self._channelz_stub.GetChannel(
360            channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0)))
361        self.assertEqual(gc_resp.channel.data.calls_started, 1)
362        self.assertEqual(gc_resp.channel.data.calls_succeeded, 1)
363        self.assertEqual(gc_resp.channel.data.calls_failed, 0)
364        # Subchannel exists
365        self.assertGreater(len(gc_resp.channel.subchannel_ref), 0)
366
367        gsc_resp = self._channelz_stub.GetSubchannel(
368            channelz_pb2.GetSubchannelRequest(
369                subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id))
370        self.assertEqual(gsc_resp.subchannel.data.calls_started, 1)
371        self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1)
372        self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0)
373        # Socket exists
374        self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1)
375
376        gs_resp = self._channelz_stub.GetSocket(
377            channelz_pb2.GetSocketRequest(
378                socket_id=gsc_resp.subchannel.socket_ref[0].socket_id))
379        self.assertEqual(gs_resp.socket.data.streams_started, 1)
380        self.assertEqual(gs_resp.socket.data.streams_succeeded, 1)
381        self.assertEqual(gs_resp.socket.data.streams_failed, 0)
382        self.assertEqual(gs_resp.socket.data.messages_sent,
383                         test_constants.STREAM_LENGTH)
384        self.assertEqual(gs_resp.socket.data.messages_received,
385                         test_constants.STREAM_LENGTH)
386
387    def test_server_sockets(self):
388        self._pairs = _generate_channel_server_pairs(1)
389        self._send_successful_unary_unary(0)
390        self._send_failed_unary_unary(0)
391
392        gs_resp = self._channelz_stub.GetServers(
393            channelz_pb2.GetServersRequest(start_server_id=0))
394        self.assertEqual(len(gs_resp.server), 1)
395        self.assertEqual(gs_resp.server[0].data.calls_started, 2)
396        self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1)
397        self.assertEqual(gs_resp.server[0].data.calls_failed, 1)
398
399        gss_resp = self._channelz_stub.GetServerSockets(
400            channelz_pb2.GetServerSocketsRequest(
401                server_id=gs_resp.server[0].ref.server_id, start_socket_id=0))
402        # If the RPC call failed, it will raise a grpc.RpcError
403        # So, if there is no exception raised, considered pass
404
405    def test_server_listen_sockets(self):
406        self._pairs = _generate_channel_server_pairs(1)
407
408        gss_resp = self._channelz_stub.GetServers(
409            channelz_pb2.GetServersRequest(start_server_id=0))
410        self.assertEqual(len(gss_resp.server), 1)
411        self.assertEqual(len(gss_resp.server[0].listen_socket), 1)
412
413        gs_resp = self._channelz_stub.GetSocket(
414            channelz_pb2.GetSocketRequest(
415                socket_id=gss_resp.server[0].listen_socket[0].socket_id))
416        # If the RPC call failed, it will raise a grpc.RpcError
417        # So, if there is no exception raised, considered pass
418
419    def test_invalid_query_get_server(self):
420        try:
421            self._channelz_stub.GetServer(
422                channelz_pb2.GetServerRequest(server_id=10000))
423        except BaseException as e:
424            self.assertIn('StatusCode.NOT_FOUND', str(e))
425        else:
426            self.fail('Invalid query not detected')
427
428    def test_invalid_query_get_channel(self):
429        try:
430            self._channelz_stub.GetChannel(
431                channelz_pb2.GetChannelRequest(channel_id=10000))
432        except BaseException as e:
433            self.assertIn('StatusCode.NOT_FOUND', str(e))
434        else:
435            self.fail('Invalid query not detected')
436
437    def test_invalid_query_get_subchannel(self):
438        try:
439            self._channelz_stub.GetSubchannel(
440                channelz_pb2.GetSubchannelRequest(subchannel_id=10000))
441        except BaseException as e:
442            self.assertIn('StatusCode.NOT_FOUND', str(e))
443        else:
444            self.fail('Invalid query not detected')
445
446    def test_invalid_query_get_socket(self):
447        try:
448            self._channelz_stub.GetSocket(
449                channelz_pb2.GetSocketRequest(socket_id=10000))
450        except BaseException as e:
451            self.assertIn('StatusCode.NOT_FOUND', str(e))
452        else:
453            self.fail('Invalid query not detected')
454
455    def test_invalid_query_get_server_sockets(self):
456        try:
457            self._channelz_stub.GetServerSockets(
458                channelz_pb2.GetServerSocketsRequest(
459                    server_id=10000,
460                    start_socket_id=0,
461                ))
462        except BaseException as e:
463            self.assertIn('StatusCode.NOT_FOUND', str(e))
464        else:
465            self.fail('Invalid query not detected')
466
467
468if __name__ == '__main__':
469    unittest.main(verbosity=2)
470