1# Copyright 2020 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_channelz.v1.channelz.""" 15 16import asyncio 17import logging 18import unittest 19 20import grpc 21from grpc.experimental import aio 22from grpc_channelz.v1 import channelz 23from grpc_channelz.v1 import channelz_pb2 24from grpc_channelz.v1 import channelz_pb2_grpc 25 26from tests.unit.framework.common import test_constants 27from tests_aio.unit._test_base import AioTestBase 28 29_SUCCESSFUL_UNARY_UNARY = "/test/SuccessfulUnaryUnary" 30_FAILED_UNARY_UNARY = "/test/FailedUnaryUnary" 31_SUCCESSFUL_STREAM_STREAM = "/test/SuccessfulStreamStream" 32 33_REQUEST = b"\x00\x00\x00" 34_RESPONSE = b"\x01\x01\x01" 35 36_DISABLE_REUSE_PORT = (("grpc.so_reuseport", 0),) 37_ENABLE_CHANNELZ = (("grpc.enable_channelz", 1),) 38_DISABLE_CHANNELZ = (("grpc.enable_channelz", 0),) 39 40_LARGE_UNASSIGNED_ID = 10000 41 42 43async def _successful_unary_unary(request, servicer_context): 44 return _RESPONSE 45 46 47async def _failed_unary_unary(request, servicer_context): 48 servicer_context.set_code(grpc.StatusCode.INTERNAL) 49 servicer_context.set_details("Channelz Test Intended Failure") 50 51 52async def _successful_stream_stream(request_iterator, servicer_context): 53 async for _ in request_iterator: 54 yield _RESPONSE 55 56 57class _GenericHandler(grpc.GenericRpcHandler): 58 def service(self, handler_call_details): 59 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: 60 return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) 61 elif handler_call_details.method == _FAILED_UNARY_UNARY: 62 return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) 63 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: 64 return grpc.stream_stream_rpc_method_handler( 65 _successful_stream_stream 66 ) 67 else: 68 return None 69 70 71class _ChannelServerPair: 72 def __init__(self): 73 self.address = "" 74 self.server = None 75 self.channel = None 76 self.server_ref_id = None 77 self.channel_ref_id = None 78 79 async def start(self): 80 # Server will enable channelz service 81 self.server = aio.server(options=_DISABLE_REUSE_PORT + _ENABLE_CHANNELZ) 82 port = self.server.add_insecure_port("[::]:0") 83 self.address = "localhost:%d" % port 84 self.server.add_generic_rpc_handlers((_GenericHandler(),)) 85 await self.server.start() 86 87 # Channel will enable channelz service... 88 self.channel = aio.insecure_channel( 89 self.address, options=_ENABLE_CHANNELZ 90 ) 91 92 async def bind_channelz(self, channelz_stub): 93 resp = await channelz_stub.GetTopChannels( 94 channelz_pb2.GetTopChannelsRequest(start_channel_id=0) 95 ) 96 for channel in resp.channel: 97 if channel.data.target == "dns:///" + self.address: 98 self.channel_ref_id = channel.ref.channel_id 99 100 resp = await channelz_stub.GetServers( 101 channelz_pb2.GetServersRequest(start_server_id=0) 102 ) 103 self.server_ref_id = resp.server[-1].ref.server_id 104 105 async def stop(self): 106 await self.channel.close() 107 await self.server.stop(None) 108 109 110async def _create_channel_server_pairs(n, channelz_stub=None): 111 """Create channel-server pairs.""" 112 pairs = [_ChannelServerPair() for i in range(n)] 113 for pair in pairs: 114 await pair.start() 115 if channelz_stub: 116 await pair.bind_channelz(channelz_stub) 117 return pairs 118 119 120async def _destroy_channel_server_pairs(pairs): 121 for pair in pairs: 122 await pair.stop() 123 124 125class ChannelzServicerTest(AioTestBase): 126 async def setUp(self): 127 # This server is for Channelz info fetching only 128 # It self should not enable Channelz 129 self._server = aio.server( 130 options=_DISABLE_REUSE_PORT + _DISABLE_CHANNELZ 131 ) 132 port = self._server.add_insecure_port("[::]:0") 133 channelz.add_channelz_servicer(self._server) 134 await self._server.start() 135 136 # This channel is used to fetch Channelz info only 137 # Channelz should not be enabled 138 self._channel = aio.insecure_channel( 139 "localhost:%d" % port, options=_DISABLE_CHANNELZ 140 ) 141 self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) 142 143 async def tearDown(self): 144 await self._channel.close() 145 await self._server.stop(None) 146 147 async def _get_server_by_ref_id(self, ref_id): 148 """Server id may not be consecutive""" 149 resp = await self._channelz_stub.GetServers( 150 channelz_pb2.GetServersRequest(start_server_id=ref_id) 151 ) 152 self.assertEqual(ref_id, resp.server[0].ref.server_id) 153 return resp.server[0] 154 155 async def _send_successful_unary_unary(self, pair): 156 call = pair.channel.unary_unary(_SUCCESSFUL_UNARY_UNARY)(_REQUEST) 157 self.assertEqual(grpc.StatusCode.OK, await call.code()) 158 159 async def _send_failed_unary_unary(self, pair): 160 try: 161 await pair.channel.unary_unary(_FAILED_UNARY_UNARY)(_REQUEST) 162 except grpc.RpcError: 163 return 164 else: 165 self.fail("This call supposed to fail") 166 167 async def _send_successful_stream_stream(self, pair): 168 call = pair.channel.stream_stream(_SUCCESSFUL_STREAM_STREAM)( 169 iter([_REQUEST] * test_constants.STREAM_LENGTH) 170 ) 171 cnt = 0 172 async for _ in call: 173 cnt += 1 174 self.assertEqual(cnt, test_constants.STREAM_LENGTH) 175 176 async def test_get_top_channels_high_start_id(self): 177 pairs = await _create_channel_server_pairs(1) 178 179 resp = await self._channelz_stub.GetTopChannels( 180 channelz_pb2.GetTopChannelsRequest( 181 start_channel_id=_LARGE_UNASSIGNED_ID 182 ) 183 ) 184 self.assertEqual(len(resp.channel), 0) 185 self.assertEqual(resp.end, True) 186 187 await _destroy_channel_server_pairs(pairs) 188 189 async def test_successful_request(self): 190 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 191 192 await self._send_successful_unary_unary(pairs[0]) 193 resp = await self._channelz_stub.GetChannel( 194 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id) 195 ) 196 197 self.assertEqual(resp.channel.data.calls_started, 1) 198 self.assertEqual(resp.channel.data.calls_succeeded, 1) 199 self.assertEqual(resp.channel.data.calls_failed, 0) 200 201 await _destroy_channel_server_pairs(pairs) 202 203 async def test_failed_request(self): 204 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 205 206 await self._send_failed_unary_unary(pairs[0]) 207 resp = await self._channelz_stub.GetChannel( 208 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id) 209 ) 210 self.assertEqual(resp.channel.data.calls_started, 1) 211 self.assertEqual(resp.channel.data.calls_succeeded, 0) 212 self.assertEqual(resp.channel.data.calls_failed, 1) 213 214 await _destroy_channel_server_pairs(pairs) 215 216 async def test_many_requests(self): 217 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 218 219 k_success = 7 220 k_failed = 9 221 for i in range(k_success): 222 await self._send_successful_unary_unary(pairs[0]) 223 for i in range(k_failed): 224 await self._send_failed_unary_unary(pairs[0]) 225 resp = await self._channelz_stub.GetChannel( 226 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id) 227 ) 228 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 229 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 230 self.assertEqual(resp.channel.data.calls_failed, k_failed) 231 232 await _destroy_channel_server_pairs(pairs) 233 234 async def test_many_requests_many_channel(self): 235 k_channels = 4 236 pairs = await _create_channel_server_pairs( 237 k_channels, self._channelz_stub 238 ) 239 k_success = 11 240 k_failed = 13 241 for i in range(k_success): 242 await self._send_successful_unary_unary(pairs[0]) 243 await self._send_successful_unary_unary(pairs[2]) 244 for i in range(k_failed): 245 await self._send_failed_unary_unary(pairs[1]) 246 await self._send_failed_unary_unary(pairs[2]) 247 248 # The first channel saw only successes 249 resp = await self._channelz_stub.GetChannel( 250 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id) 251 ) 252 self.assertEqual(resp.channel.data.calls_started, k_success) 253 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 254 self.assertEqual(resp.channel.data.calls_failed, 0) 255 256 # The second channel saw only failures 257 resp = await self._channelz_stub.GetChannel( 258 channelz_pb2.GetChannelRequest(channel_id=pairs[1].channel_ref_id) 259 ) 260 self.assertEqual(resp.channel.data.calls_started, k_failed) 261 self.assertEqual(resp.channel.data.calls_succeeded, 0) 262 self.assertEqual(resp.channel.data.calls_failed, k_failed) 263 264 # The third channel saw both successes and failures 265 resp = await self._channelz_stub.GetChannel( 266 channelz_pb2.GetChannelRequest(channel_id=pairs[2].channel_ref_id) 267 ) 268 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 269 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 270 self.assertEqual(resp.channel.data.calls_failed, k_failed) 271 272 # The fourth channel saw nothing 273 resp = await self._channelz_stub.GetChannel( 274 channelz_pb2.GetChannelRequest(channel_id=pairs[3].channel_ref_id) 275 ) 276 self.assertEqual(resp.channel.data.calls_started, 0) 277 self.assertEqual(resp.channel.data.calls_succeeded, 0) 278 self.assertEqual(resp.channel.data.calls_failed, 0) 279 280 await _destroy_channel_server_pairs(pairs) 281 282 async def test_many_subchannels(self): 283 k_channels = 4 284 pairs = await _create_channel_server_pairs( 285 k_channels, self._channelz_stub 286 ) 287 k_success = 17 288 k_failed = 19 289 for i in range(k_success): 290 await self._send_successful_unary_unary(pairs[0]) 291 await self._send_successful_unary_unary(pairs[2]) 292 for i in range(k_failed): 293 await self._send_failed_unary_unary(pairs[1]) 294 await self._send_failed_unary_unary(pairs[2]) 295 296 for i in range(k_channels): 297 gc_resp = await self._channelz_stub.GetChannel( 298 channelz_pb2.GetChannelRequest( 299 channel_id=pairs[i].channel_ref_id 300 ) 301 ) 302 # If no call performed in the channel, there shouldn't be any subchannel 303 if gc_resp.channel.data.calls_started == 0: 304 self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) 305 continue 306 307 # Otherwise, the subchannel should exist 308 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 309 gsc_resp = await self._channelz_stub.GetSubchannel( 310 channelz_pb2.GetSubchannelRequest( 311 subchannel_id=gc_resp.channel.subchannel_ref[ 312 0 313 ].subchannel_id 314 ) 315 ) 316 self.assertEqual( 317 gc_resp.channel.data.calls_started, 318 gsc_resp.subchannel.data.calls_started, 319 ) 320 self.assertEqual( 321 gc_resp.channel.data.calls_succeeded, 322 gsc_resp.subchannel.data.calls_succeeded, 323 ) 324 self.assertEqual( 325 gc_resp.channel.data.calls_failed, 326 gsc_resp.subchannel.data.calls_failed, 327 ) 328 329 await _destroy_channel_server_pairs(pairs) 330 331 async def test_server_call(self): 332 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 333 334 k_success = 23 335 k_failed = 29 336 for i in range(k_success): 337 await self._send_successful_unary_unary(pairs[0]) 338 for i in range(k_failed): 339 await self._send_failed_unary_unary(pairs[0]) 340 341 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 342 self.assertEqual(resp.data.calls_started, k_success + k_failed) 343 self.assertEqual(resp.data.calls_succeeded, k_success) 344 self.assertEqual(resp.data.calls_failed, k_failed) 345 346 await _destroy_channel_server_pairs(pairs) 347 348 async def test_many_subchannels_and_sockets(self): 349 k_channels = 4 350 pairs = await _create_channel_server_pairs( 351 k_channels, self._channelz_stub 352 ) 353 k_success = 3 354 k_failed = 5 355 for i in range(k_success): 356 await self._send_successful_unary_unary(pairs[0]) 357 await self._send_successful_unary_unary(pairs[2]) 358 for i in range(k_failed): 359 await self._send_failed_unary_unary(pairs[1]) 360 await self._send_failed_unary_unary(pairs[2]) 361 362 for i in range(k_channels): 363 gc_resp = await self._channelz_stub.GetChannel( 364 channelz_pb2.GetChannelRequest( 365 channel_id=pairs[i].channel_ref_id 366 ) 367 ) 368 369 # If no call performed in the channel, there shouldn't be any subchannel 370 if gc_resp.channel.data.calls_started == 0: 371 self.assertEqual(len(gc_resp.channel.subchannel_ref), 0) 372 continue 373 374 # Otherwise, the subchannel should exist 375 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 376 gsc_resp = await self._channelz_stub.GetSubchannel( 377 channelz_pb2.GetSubchannelRequest( 378 subchannel_id=gc_resp.channel.subchannel_ref[ 379 0 380 ].subchannel_id 381 ) 382 ) 383 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 384 385 gs_resp = await self._channelz_stub.GetSocket( 386 channelz_pb2.GetSocketRequest( 387 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id 388 ) 389 ) 390 self.assertEqual( 391 gsc_resp.subchannel.data.calls_started, 392 gs_resp.socket.data.streams_started, 393 ) 394 self.assertEqual(0, gs_resp.socket.data.streams_failed) 395 # Calls started == messages sent, only valid for unary calls 396 self.assertEqual( 397 gsc_resp.subchannel.data.calls_started, 398 gs_resp.socket.data.messages_sent, 399 ) 400 401 await _destroy_channel_server_pairs(pairs) 402 403 async def test_streaming_rpc(self): 404 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 405 # In C++, the argument for _send_successful_stream_stream is message length. 406 # Here the argument is still channel idx, to be consistent with the other two. 407 await self._send_successful_stream_stream(pairs[0]) 408 409 gc_resp = await self._channelz_stub.GetChannel( 410 channelz_pb2.GetChannelRequest(channel_id=pairs[0].channel_ref_id) 411 ) 412 self.assertEqual(gc_resp.channel.data.calls_started, 1) 413 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) 414 self.assertEqual(gc_resp.channel.data.calls_failed, 0) 415 # Subchannel exists 416 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 417 418 while True: 419 gsc_resp = await self._channelz_stub.GetSubchannel( 420 channelz_pb2.GetSubchannelRequest( 421 subchannel_id=gc_resp.channel.subchannel_ref[ 422 0 423 ].subchannel_id 424 ) 425 ) 426 if ( 427 gsc_resp.subchannel.data.calls_started 428 == gsc_resp.subchannel.data.calls_succeeded 429 + gsc_resp.subchannel.data.calls_failed 430 ): 431 break 432 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) 433 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) 434 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) 435 # Socket exists 436 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 437 438 while True: 439 gs_resp = await self._channelz_stub.GetSocket( 440 channelz_pb2.GetSocketRequest( 441 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id 442 ) 443 ) 444 if ( 445 gs_resp.socket.data.streams_started 446 == gs_resp.socket.data.streams_succeeded 447 + gs_resp.socket.data.streams_failed 448 ): 449 break 450 self.assertEqual(gs_resp.socket.data.streams_started, 1) 451 self.assertEqual(gs_resp.socket.data.streams_failed, 0) 452 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) 453 self.assertEqual( 454 gs_resp.socket.data.messages_sent, test_constants.STREAM_LENGTH 455 ) 456 self.assertEqual( 457 gs_resp.socket.data.messages_received, test_constants.STREAM_LENGTH 458 ) 459 460 await _destroy_channel_server_pairs(pairs) 461 462 async def test_server_sockets(self): 463 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 464 465 await self._send_successful_unary_unary(pairs[0]) 466 await self._send_failed_unary_unary(pairs[0]) 467 468 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 469 self.assertEqual(resp.data.calls_started, 2) 470 self.assertEqual(resp.data.calls_succeeded, 1) 471 self.assertEqual(resp.data.calls_failed, 1) 472 473 gss_resp = await self._channelz_stub.GetServerSockets( 474 channelz_pb2.GetServerSocketsRequest( 475 server_id=resp.ref.server_id, start_socket_id=0 476 ) 477 ) 478 # If the RPC call failed, it will raise a grpc.RpcError 479 # So, if there is no exception raised, considered pass 480 await _destroy_channel_server_pairs(pairs) 481 482 async def test_server_listen_sockets(self): 483 pairs = await _create_channel_server_pairs(1, self._channelz_stub) 484 485 resp = await self._get_server_by_ref_id(pairs[0].server_ref_id) 486 self.assertEqual(len(resp.listen_socket), 1) 487 488 gs_resp = await self._channelz_stub.GetSocket( 489 channelz_pb2.GetSocketRequest( 490 socket_id=resp.listen_socket[0].socket_id 491 ) 492 ) 493 # If the RPC call failed, it will raise a grpc.RpcError 494 # So, if there is no exception raised, considered pass 495 await _destroy_channel_server_pairs(pairs) 496 497 async def test_invalid_query_get_server(self): 498 with self.assertRaises(aio.AioRpcError) as exception_context: 499 await self._channelz_stub.GetServer( 500 channelz_pb2.GetServerRequest(server_id=_LARGE_UNASSIGNED_ID) 501 ) 502 self.assertEqual( 503 grpc.StatusCode.NOT_FOUND, exception_context.exception.code() 504 ) 505 506 async def test_invalid_query_get_channel(self): 507 with self.assertRaises(aio.AioRpcError) as exception_context: 508 await self._channelz_stub.GetChannel( 509 channelz_pb2.GetChannelRequest(channel_id=_LARGE_UNASSIGNED_ID) 510 ) 511 self.assertEqual( 512 grpc.StatusCode.NOT_FOUND, exception_context.exception.code() 513 ) 514 515 async def test_invalid_query_get_subchannel(self): 516 with self.assertRaises(aio.AioRpcError) as exception_context: 517 await self._channelz_stub.GetSubchannel( 518 channelz_pb2.GetSubchannelRequest( 519 subchannel_id=_LARGE_UNASSIGNED_ID 520 ) 521 ) 522 self.assertEqual( 523 grpc.StatusCode.NOT_FOUND, exception_context.exception.code() 524 ) 525 526 async def test_invalid_query_get_socket(self): 527 with self.assertRaises(aio.AioRpcError) as exception_context: 528 await self._channelz_stub.GetSocket( 529 channelz_pb2.GetSocketRequest(socket_id=_LARGE_UNASSIGNED_ID) 530 ) 531 self.assertEqual( 532 grpc.StatusCode.NOT_FOUND, exception_context.exception.code() 533 ) 534 535 async def test_invalid_query_get_server_sockets(self): 536 with self.assertRaises(aio.AioRpcError) as exception_context: 537 await self._channelz_stub.GetServerSockets( 538 channelz_pb2.GetServerSocketsRequest( 539 server_id=_LARGE_UNASSIGNED_ID, 540 start_socket_id=0, 541 ) 542 ) 543 self.assertEqual( 544 grpc.StatusCode.NOT_FOUND, exception_context.exception.code() 545 ) 546 547 548if __name__ == "__main__": 549 logging.basicConfig(level=logging.DEBUG) 550 unittest.main(verbosity=2) 551