1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_channelz.v1.channelz.""" 15 16import unittest 17import logging 18import asyncio 19 20import grpc 21from grpc.experimental import aio 22 23from grpc_channelz.v1 import channelz 24from grpc_channelz.v1 import channelz_pb2 25from grpc_channelz.v1 import channelz_pb2_grpc 26 27from tests.unit.framework.common import test_constants 28from tests_aio.unit._test_base import AioTestBase 29 30_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary' 31_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary' 32_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream' 33 34_REQUEST = b'\x00\x00\x00' 35_RESPONSE = b'\x01\x01\x01' 36 37_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),) 38_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),) 39_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),) 40 41_LARGE_UNASSIGNED_ID = 10000 42 43 44async def _successful_unary_unary(request, servicer_context): 45 return _RESPONSE 46 47 48async def _failed_unary_unary(request, servicer_context): 49 servicer_context.set_code(grpc.StatusCode.INTERNAL) 50 servicer_context.set_details("Channelz Test Intended Failure") 51 52 53async def _successful_stream_stream(request_iterator, servicer_context): 54 async for _ in request_iterator: 55 yield _RESPONSE 56 57 58class _GenericHandler(grpc.GenericRpcHandler): 59 60 def service(self, handler_call_details): 61 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: 62 return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) 63 elif handler_call_details.method == _FAILED_UNARY_UNARY: 64 return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) 65 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: 66 return grpc.stream_stream_rpc_method_handler( 67 _successful_stream_stream) 68 else: 69 return None 70 71 72class _ChannelServerPair: 73 74 def __init__(self): 75 self.address = '' 76 self.server = None 77 self.channel = None 78 self.server_ref_id = None 79 self.channel_ref_id = None 80 81 async def start(self): 82 # Server will enable channelz service 83 self.server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) 84 port = self.server.add_insecure_port('[::]:0') 85 self.address = 'localhost:%d' % port 86 self.server.add_generic_rpc_handlers((_GenericHandler(),)) 87 await self.server.start() 88 89 # Channel will enable channelz service... 90 self.channel = aio.insecure_channel(self.address, 91 options=_ENABLE_CHANNELZ) 92 93 async def bind_channelz(self, channelz_stub): 94 resp = await channelz_stub.GetTopChannels( 95 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 96 for channel in resp.channel: 97 if channel.data.target == self.address: 98 self.channel_ref_id = channel.ref.channel_id 99 100 resp = await channelz_stub.GetServers( 101 channelz_pb2.GetServersRequest(start_server_id=0)) 102 self.server_ref_id = resp.server[-1].ref.server_id 103 104 async def stop(self): 105 await self.channel.close() 106 await self.server.stop(None) 107 108 109async def _create_channel_server_pairs(n, channelz_stub=None): 110 """Create channel-server pairs.""" 111 pairs = [_ChannelServerPair() for i in range(n)] 112 for pair in pairs: 113 await pair.start() 114 if channelz_stub: 115 await pair.bind_channelz(channelz_stub) 116 return pairs 117 118 119async def _destroy_channel_server_pairs(pairs): 120 for pair in pairs: 121 await pair.stop() 122 123 124class ChannelzServicerTest(AioTestBase): 125 126 async def setUp(self): 127 # This server is for Channelz info fetching only 128 # It self should not enable Channelz 129 self._server = aio.server(options=_DISABLE_REUSE_PORT + 130 _DISABLE_CHANNELZ) 131 port = self._server.add_insecure_port('[::]:0') 132 channelz.add_channelz_servicer(self._server) 133 await self._server.start() 134 135 # This channel is used to fetch Channelz info only 136 # Channelz should not be enabled 137 self._channel = aio.insecure_channel('localhost:%d' % port, 138 options=_DISABLE_CHANNELZ) 139 self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) 140 141 async def tearDown(self): 142 await self._channel.close() 143 await self._server.stop(None) 144 145 async def _get_server_by_ref_id(self, ref_id): 146 """Server id may not be consecutive""" 147 resp = await self._channelz_stub.GetServers( 148 channelz_pb2.GetServersRequest(start_server_id=ref_id)) 149 self.assertEqual(ref_id, resp.server[0].ref.server_id) 150 return resp.server[0] 151 152 async def _send_successful_unary_unary(self, pair): 153 call = pair.channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)(_REQUEST) 154 self.assertEqual(grpc.StatusCode.OK, await call.code()) 155 156 async def _send_failed_unary_unary(self, pair): 157 try: 158 await pair.channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST) 159 except grpc.RpcError: 160 return 161 else: 162 self.fail("This call supposed to fail") 163 164 async def _send_successful_stream_stream(self, pair): 165 call = pair.channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)(iter( 166 [_REQUEST] * test_constants.STREAM_LENGTH)) 167 cnt = 0 168 async for _ in call: 169 cnt += 1 170 self.assertEqual(cnt, test_constants.STREAM_LENGTH) 171 172 async def test_get_top_channels_high_start_id(self): 173 pairs = await _create_channel_server_pairs(1) 174 175 resp = await self._channelz_stub.GetTopChannels( 176 channelz_pb2.GetTopChannelsRequest( 177 start_channel_id=_LARGE_UNASSIGNED_ID)) 178 self.assertEqual(len(resp.channel), 0) 179 self.assertEqual(resp.end, True) 180 181 await _destroy_channel_server_pairs(pairs) 182 183 async def test_successful_request(self): 184 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 185 186 await self._send_successful_unary_unary(pairs[0]) 187 resp = await self._channelz_stub.GetChannel( 188 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)) 189 190 self.assertEqual(resp.channel.data.calls_started, 1) 191 self.assertEqual(resp.channel.data.calls_succeeded, 1) 192 self.assertEqual(resp.channel.data.calls_failed, 0) 193 194 await _destroy_channel_server_pairs(pairs) 195 196 async def test_failed_request(self): 197 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 198 199 await self._send_failed_unary_unary(pairs[0]) 200 resp = await self._channelz_stub.GetChannel( 201 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)) 202 self.assertEqual(resp.channel.data.calls_started, 1) 203 self.assertEqual(resp.channel.data.calls_succeeded, 0) 204 self.assertEqual(resp.channel.data.calls_failed, 1) 205 206 await _destroy_channel_server_pairs(pairs) 207 208 async def test_many_requests(self): 209 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 210 211 k_success = 7 212 k_failed = 9 213 for i in range(k_success): 214 await self._send_successful_unary_unary(pairs[0]) 215 for i in range(k_failed): 216 await self._send_failed_unary_unary(pairs[0]) 217 resp = await self._channelz_stub.GetChannel( 218 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)) 219 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 220 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 221 self.assertEqual(resp.channel.data.calls_failed, k_failed) 222 223 await _destroy_channel_server_pairs(pairs) 224 225 async def test_many_requests_many_channel(self): 226 k_channels = 4 227 pairs = await _create_channel_server_pairs(k_channels, 228 self._channelz_stub) 229 k_success = 11 230 k_failed = 13 231 for i in range(k_success): 232 await self._send_successful_unary_unary(pairs[0]) 233 await self._send_successful_unary_unary(pairs[2]) 234 for i in range(k_failed): 235 await self._send_failed_unary_unary(pairs[1]) 236 await self._send_failed_unary_unary(pairs[2]) 237 238 # The first channel saw only successes 239 resp = await self._channelz_stub.GetChannel( 240 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)) 241 self.assertEqual(resp.channel.data.calls_started, k_success) 242 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 243 self.assertEqual(resp.channel.data.calls_failed, 0) 244 245 # The second channel saw only failures 246 resp = await self._channelz_stub.GetChannel( 247 channelz_pb2.GetChannelRequest(channel_id=pairs[1].channel_ref_id)) 248 self.assertEqual(resp.channel.data.calls_started, k_failed) 249 self.assertEqual(resp.channel.data.calls_succeeded, 0) 250 self.assertEqual(resp.channel.data.calls_failed, k_failed) 251 252 # The third channel saw both successes and failures 253 resp = await self._channelz_stub.GetChannel( 254 channelz_pb2.GetChannelRequest(channel_id=pairs[2].channel_ref_id)) 255 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 256 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 257 self.assertEqual(resp.channel.data.calls_failed, k_failed) 258 259 # The fourth channel saw nothing 260 resp = await self._channelz_stub.GetChannel( 261 channelz_pb2.GetChannelRequest(channel_id=pairs[3].channel_ref_id)) 262 self.assertEqual(resp.channel.data.calls_started, 0) 263 self.assertEqual(resp.channel.data.calls_succeeded, 0) 264 self.assertEqual(resp.channel.data.calls_failed, 0) 265 266 await _destroy_channel_server_pairs(pairs) 267 268 async def test_many_subchannels(self): 269 k_channels = 4 270 pairs = await _create_channel_server_pairs(k_channels, 271 self._channelz_stub) 272 k_success = 17 273 k_failed = 19 274 for i in range(k_success): 275 await self._send_successful_unary_unary(pairs[0]) 276 await self._send_successful_unary_unary(pairs[2]) 277 for i in range(k_failed): 278 await self._send_failed_unary_unary(pairs[1]) 279 await self._send_failed_unary_unary(pairs[2]) 280 281 for i in range(k_channels): 282 gc_resp = await self._channelz_stub.GetChannel( 283 channelz_pb2.GetChannelRequest( 284 channel_id=pairs[i].channel_ref_id)) 285 # If no call performed in the channel, there shouldn't be any subchannel 286 if gc_resp.channel.data.calls_started == 0: 287 self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) 288 continue 289 290 # Otherwise, the subchannel should exist 291 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 292 gsc_resp = await self._channelz_stub.GetSubchannel( 293 channelz_pb2.GetSubchannelRequest( 294 subchannel_id=gc_resp.channel.subchannel_ref[0]. 295 subchannel_id)) 296 self.assertEqual(gc_resp.channel.data.calls_started, 297 gsc_resp.subchannel.data.calls_started) 298 self.assertEqual(gc_resp.channel.data.calls_succeeded, 299 gsc_resp.subchannel.data.calls_succeeded) 300 self.assertEqual(gc_resp.channel.data.calls_failed, 301 gsc_resp.subchannel.data.calls_failed) 302 303 await _destroy_channel_server_pairs(pairs) 304 305 async def test_server_call(self): 306 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 307 308 k_success = 23 309 k_failed = 29 310 for i in range(k_success): 311 await self._send_successful_unary_unary(pairs[0]) 312 for i in range(k_failed): 313 await self._send_failed_unary_unary(pairs[0]) 314 315 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 316 self.assertEqual(resp.data.calls_started, k_success + k_failed) 317 self.assertEqual(resp.data.calls_succeeded, k_success) 318 self.assertEqual(resp.data.calls_failed, k_failed) 319 320 await _destroy_channel_server_pairs(pairs) 321 322 async def test_many_subchannels_and_sockets(self): 323 k_channels = 4 324 pairs = await _create_channel_server_pairs(k_channels, 325 self._channelz_stub) 326 k_success = 3 327 k_failed = 5 328 for i in range(k_success): 329 await self._send_successful_unary_unary(pairs[0]) 330 await self._send_successful_unary_unary(pairs[2]) 331 for i in range(k_failed): 332 await self._send_failed_unary_unary(pairs[1]) 333 await self._send_failed_unary_unary(pairs[2]) 334 335 for i in range(k_channels): 336 gc_resp = await self._channelz_stub.GetChannel( 337 channelz_pb2.GetChannelRequest( 338 channel_id=pairs[i].channel_ref_id)) 339 340 # If no call performed in the channel, there shouldn't be any subchannel 341 if gc_resp.channel.data.calls_started == 0: 342 self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) 343 continue 344 345 # Otherwise, the subchannel should exist 346 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 347 gsc_resp = await self._channelz_stub.GetSubchannel( 348 channelz_pb2.GetSubchannelRequest( 349 subchannel_id=gc_resp.channel.subchannel_ref[0]. 350 subchannel_id)) 351 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 352 353 gs_resp = await self._channelz_stub.GetSocket( 354 channelz_pb2.GetSocketRequest( 355 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) 356 self.assertEqual(gsc_resp.subchannel.data.calls_started, 357 gs_resp.socket.data.streams_started) 358 self.assertEqual(0, gs_resp.socket.data.streams_failed) 359 # Calls started == messages sent, only valid for unary calls 360 self.assertEqual(gsc_resp.subchannel.data.calls_started, 361 gs_resp.socket.data.messages_sent) 362 363 await _destroy_channel_server_pairs(pairs) 364 365 async def test_streaming_rpc(self): 366 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 367 # In C++, the argument for _send_successful_stream_stream is message length. 368 # Here the argument is still channel idx, to be consistent with the other two. 369 await self._send_successful_stream_stream(pairs[0]) 370 371 gc_resp = await self._channelz_stub.GetChannel( 372 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id)) 373 self.assertEqual(gc_resp.channel.data.calls_started, 1) 374 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) 375 self.assertEqual(gc_resp.channel.data.calls_failed, 0) 376 # Subchannel exists 377 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 378 379 gsc_resp = await self._channelz_stub.GetSubchannel( 380 channelz_pb2.GetSubchannelRequest( 381 subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id)) 382 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) 383 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) 384 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) 385 # Socket exists 386 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 387 388 gs_resp = await self._channelz_stub.GetSocket( 389 channelz_pb2.GetSocketRequest( 390 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) 391 self.assertEqual(gs_resp.socket.data.streams_started, 1) 392 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) 393 self.assertEqual(gs_resp.socket.data.streams_failed, 0) 394 self.assertEqual(gs_resp.socket.data.messages_sent, 395 test_constants.STREAM_LENGTH) 396 self.assertEqual(gs_resp.socket.data.messages_received, 397 test_constants.STREAM_LENGTH) 398 399 await _destroy_channel_server_pairs(pairs) 400 401 async def test_server_sockets(self): 402 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 403 404 await self._send_successful_unary_unary(pairs[0]) 405 await self._send_failed_unary_unary(pairs[0]) 406 407 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 408 self.assertEqual(resp.data.calls_started, 2) 409 self.assertEqual(resp.data.calls_succeeded, 1) 410 self.assertEqual(resp.data.calls_failed, 1) 411 412 gss_resp = await self._channelz_stub.GetServerSockets( 413 channelz_pb2.GetServerSocketsRequest(server_id=resp.ref.server_id, 414 start_socket_id=0)) 415 # If the RPC call failed, it will raise a grpc.RpcError 416 # So, if there is no exception raised, considered pass 417 await _destroy_channel_server_pairs(pairs) 418 419 async def test_server_listen_sockets(self): 420 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 421 422 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 423 self.assertEqual(len(resp.listen_socket), 1) 424 425 gs_resp = await self._channelz_stub.GetSocket( 426 channelz_pb2.GetSocketRequest( 427 socket_id=resp.listen_socket[0].socket_id)) 428 # If the RPC call failed, it will raise a grpc.RpcError 429 # So, if there is no exception raised, considered pass 430 await _destroy_channel_server_pairs(pairs) 431 432 async def test_invalid_query_get_server(self): 433 with self.assertRaises(aio.AioRpcError) as exception_context: 434 await self._channelz_stub.GetServer( 435 channelz_pb2.GetServerRequest(server_id=_LARGE_UNASSIGNED_ID)) 436 self.assertEqual(grpc.StatusCode.NOT_FOUND, 437 exception_context.exception.code()) 438 439 async def test_invalid_query_get_channel(self): 440 with self.assertRaises(aio.AioRpcError) as exception_context: 441 await self._channelz_stub.GetChannel( 442 channelz_pb2.GetChannelRequest(channel_id=_LARGE_UNASSIGNED_ID)) 443 self.assertEqual(grpc.StatusCode.NOT_FOUND, 444 exception_context.exception.code()) 445 446 async def test_invalid_query_get_subchannel(self): 447 with self.assertRaises(aio.AioRpcError) as exception_context: 448 await self._channelz_stub.GetSubchannel( 449 channelz_pb2.GetSubchannelRequest( 450 subchannel_id=_LARGE_UNASSIGNED_ID)) 451 self.assertEqual(grpc.StatusCode.NOT_FOUND, 452 exception_context.exception.code()) 453 454 async def test_invalid_query_get_socket(self): 455 with self.assertRaises(aio.AioRpcError) as exception_context: 456 await self._channelz_stub.GetSocket( 457 channelz_pb2.GetSocketRequest(socket_id=_LARGE_UNASSIGNED_ID)) 458 self.assertEqual(grpc.StatusCode.NOT_FOUND, 459 exception_context.exception.code()) 460 461 async def test_invalid_query_get_server_sockets(self): 462 with self.assertRaises(aio.AioRpcError) as exception_context: 463 await self._channelz_stub.GetServerSockets( 464 channelz_pb2.GetServerSocketsRequest( 465 server_id=_LARGE_UNASSIGNED_ID, 466 start_socket_id=0, 467 )) 468 self.assertEqual(grpc.StatusCode.NOT_FOUND, 469 exception_context.exception.code()) 470 471 472if __name__ == '__main__': 473 logging.basicConfig(level=logging.DEBUG) 474 unittest.main(verbosity=2) 475