1# Copyright 2018 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Tests of grpc_channelz.v1.channelz.""" 15 16import unittest 17 18from concurrent import futures 19 20import grpc 21 22from grpc_channelz.v1 import channelz 23from grpc_channelz.v1 import channelz_pb2 24from grpc_channelz.v1 import channelz_pb2_grpc 25 26from tests.unit import test_common 27from tests.unit.framework.common import test_constants 28 29_SUCCESSFUL_UNARY_UNARY = '/test/SuccessfulUnaryUnary' 30_FAILED_UNARY_UNARY = '/test/FailedUnaryUnary' 31_SUCCESSFUL_STREAM_STREAM = '/test/SuccessfulStreamStream' 32 33_REQUEST = b'\x00\x00\x00' 34_RESPONSE = b'\x01\x01\x01' 35 36_DISABLE_REUSE_PORT = (('grpc.so_reuseport', 0),) 37_ENABLE_CHANNELZ = (('grpc.enable_channelz', 1),) 38_DISABLE_CHANNELZ = (('grpc.enable_channelz', 0),) 39 40 41def _successful_unary_unary(request, servicer_context): 42 return _RESPONSE 43 44 45def _failed_unary_unary(request, servicer_context): 46 servicer_context.set_code(grpc.StatusCode.INTERNAL) 47 servicer_context.set_details("Channelz Test Intended Failure") 48 49 50def _successful_stream_stream(request_iterator, servicer_context): 51 for _ in request_iterator: 52 yield _RESPONSE 53 54 55class _GenericHandler(grpc.GenericRpcHandler): 56 57 def service(self, handler_call_details): 58 if handler_call_details.method == _SUCCESSFUL_UNARY_UNARY: 59 return grpc.unary_unary_rpc_method_handler(_successful_unary_unary) 60 elif handler_call_details.method == _FAILED_UNARY_UNARY: 61 return grpc.unary_unary_rpc_method_handler(_failed_unary_unary) 62 elif handler_call_details.method == _SUCCESSFUL_STREAM_STREAM: 63 return grpc.stream_stream_rpc_method_handler( 64 _successful_stream_stream) 65 else: 66 return None 67 68 69class _ChannelServerPair(object): 70 71 def __init__(self): 72 # Server will enable channelz service 73 self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=3), 74 options=_DISABLE_REUSE_PORT + 75 _ENABLE_CHANNELZ) 76 port = self.server.add_insecure_port('[::]:0') 77 self.server.add_generic_rpc_handlers((_GenericHandler(),)) 78 self.server.start() 79 80 # Channel will enable channelz service... 81 self.channel = grpc.insecure_channel('localhost:%d' % port, 82 _ENABLE_CHANNELZ) 83 84 85def _generate_channel_server_pairs(n): 86 return [_ChannelServerPair() for i in range(n)] 87 88 89def _close_channel_server_pairs(pairs): 90 for pair in pairs: 91 pair.server.stop(None) 92 pair.channel.close() 93 94 95class ChannelzServicerTest(unittest.TestCase): 96 97 def _send_successful_unary_unary(self, idx): 98 _, r = self._pairs[idx].channel.unary_unary( 99 _SUCCESSFUL_UNARY_UNARY).with_call(_REQUEST) 100 self.assertEqual(r.code(), grpc.StatusCode.OK) 101 102 def _send_failed_unary_unary(self, idx): 103 try: 104 self._pairs[idx].channel.unary_unary(_FAILED_UNARY_UNARY).with_call( 105 _REQUEST) 106 except grpc.RpcError: 107 return 108 else: 109 self.fail("This call supposed to fail") 110 111 def _send_successful_stream_stream(self, idx): 112 response_iterator = self._pairs[idx].channel.stream_stream( 113 _SUCCESSFUL_STREAM_STREAM).__call__( 114 iter([_REQUEST] * test_constants.STREAM_LENGTH)) 115 cnt = 0 116 for _ in response_iterator: 117 cnt += 1 118 self.assertEqual(cnt, test_constants.STREAM_LENGTH) 119 120 def _get_channel_id(self, idx): 121 """Channel id may not be consecutive""" 122 resp = self._channelz_stub.GetTopChannels( 123 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 124 self.assertGreater(len(resp.channel), idx) 125 return resp.channel[idx].ref.channel_id 126 127 def setUp(self): 128 self._pairs = [] 129 # This server is for Channelz info fetching only 130 # It self should not enable Channelz 131 self._server = grpc.server(futures.ThreadPoolExecutor(max_workers=3), 132 options=_DISABLE_REUSE_PORT + 133 _DISABLE_CHANNELZ) 134 port = self._server.add_insecure_port('[::]:0') 135 channelz.add_channelz_servicer(self._server) 136 self._server.start() 137 138 # This channel is used to fetch Channelz info only 139 # Channelz should not be enabled 140 self._channel = grpc.insecure_channel('localhost:%d' % port, 141 _DISABLE_CHANNELZ) 142 self._channelz_stub = channelz_pb2_grpc.ChannelzStub(self._channel) 143 144 def tearDown(self): 145 self._server.stop(None) 146 self._channel.close() 147 _close_channel_server_pairs(self._pairs) 148 149 def test_get_top_channels_basic(self): 150 self._pairs = _generate_channel_server_pairs(1) 151 resp = self._channelz_stub.GetTopChannels( 152 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 153 self.assertEqual(len(resp.channel), 1) 154 self.assertEqual(resp.end, True) 155 156 def test_get_top_channels_high_start_id(self): 157 self._pairs = _generate_channel_server_pairs(1) 158 resp = self._channelz_stub.GetTopChannels( 159 channelz_pb2.GetTopChannelsRequest(start_channel_id=10000)) 160 self.assertEqual(len(resp.channel), 0) 161 self.assertEqual(resp.end, True) 162 163 def test_successful_request(self): 164 self._pairs = _generate_channel_server_pairs(1) 165 self._send_successful_unary_unary(0) 166 resp = self._channelz_stub.GetChannel( 167 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) 168 self.assertEqual(resp.channel.data.calls_started, 1) 169 self.assertEqual(resp.channel.data.calls_succeeded, 1) 170 self.assertEqual(resp.channel.data.calls_failed, 0) 171 172 def test_failed_request(self): 173 self._pairs = _generate_channel_server_pairs(1) 174 self._send_failed_unary_unary(0) 175 resp = self._channelz_stub.GetChannel( 176 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) 177 self.assertEqual(resp.channel.data.calls_started, 1) 178 self.assertEqual(resp.channel.data.calls_succeeded, 0) 179 self.assertEqual(resp.channel.data.calls_failed, 1) 180 181 def test_many_requests(self): 182 self._pairs = _generate_channel_server_pairs(1) 183 k_success = 7 184 k_failed = 9 185 for i in range(k_success): 186 self._send_successful_unary_unary(0) 187 for i in range(k_failed): 188 self._send_failed_unary_unary(0) 189 resp = self._channelz_stub.GetChannel( 190 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) 191 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 192 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 193 self.assertEqual(resp.channel.data.calls_failed, k_failed) 194 195 def test_many_channel(self): 196 k_channels = 4 197 self._pairs = _generate_channel_server_pairs(k_channels) 198 resp = self._channelz_stub.GetTopChannels( 199 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 200 self.assertEqual(len(resp.channel), k_channels) 201 202 def test_many_requests_many_channel(self): 203 k_channels = 4 204 self._pairs = _generate_channel_server_pairs(k_channels) 205 k_success = 11 206 k_failed = 13 207 for i in range(k_success): 208 self._send_successful_unary_unary(0) 209 self._send_successful_unary_unary(2) 210 for i in range(k_failed): 211 self._send_failed_unary_unary(1) 212 self._send_failed_unary_unary(2) 213 214 # The first channel saw only successes 215 resp = self._channelz_stub.GetChannel( 216 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) 217 self.assertEqual(resp.channel.data.calls_started, k_success) 218 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 219 self.assertEqual(resp.channel.data.calls_failed, 0) 220 221 # The second channel saw only failures 222 resp = self._channelz_stub.GetChannel( 223 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(1))) 224 self.assertEqual(resp.channel.data.calls_started, k_failed) 225 self.assertEqual(resp.channel.data.calls_succeeded, 0) 226 self.assertEqual(resp.channel.data.calls_failed, k_failed) 227 228 # The third channel saw both successes and failures 229 resp = self._channelz_stub.GetChannel( 230 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(2))) 231 self.assertEqual(resp.channel.data.calls_started, k_success + k_failed) 232 self.assertEqual(resp.channel.data.calls_succeeded, k_success) 233 self.assertEqual(resp.channel.data.calls_failed, k_failed) 234 235 # The fourth channel saw nothing 236 resp = self._channelz_stub.GetChannel( 237 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(3))) 238 self.assertEqual(resp.channel.data.calls_started, 0) 239 self.assertEqual(resp.channel.data.calls_succeeded, 0) 240 self.assertEqual(resp.channel.data.calls_failed, 0) 241 242 def test_many_subchannels(self): 243 k_channels = 4 244 self._pairs = _generate_channel_server_pairs(k_channels) 245 k_success = 17 246 k_failed = 19 247 for i in range(k_success): 248 self._send_successful_unary_unary(0) 249 self._send_successful_unary_unary(2) 250 for i in range(k_failed): 251 self._send_failed_unary_unary(1) 252 self._send_failed_unary_unary(2) 253 254 gtc_resp = self._channelz_stub.GetTopChannels( 255 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 256 self.assertEqual(len(gtc_resp.channel), k_channels) 257 for i in range(k_channels): 258 # If no call performed in the channel, there shouldn't be any subchannel 259 if gtc_resp.channel[i].data.calls_started == 0: 260 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) 261 continue 262 263 # Otherwise, the subchannel should exist 264 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) 265 gsc_resp = self._channelz_stub.GetSubchannel( 266 channelz_pb2.GetSubchannelRequest( 267 subchannel_id=gtc_resp.channel[i].subchannel_ref[0]. 268 subchannel_id)) 269 self.assertEqual(gtc_resp.channel[i].data.calls_started, 270 gsc_resp.subchannel.data.calls_started) 271 self.assertEqual(gtc_resp.channel[i].data.calls_succeeded, 272 gsc_resp.subchannel.data.calls_succeeded) 273 self.assertEqual(gtc_resp.channel[i].data.calls_failed, 274 gsc_resp.subchannel.data.calls_failed) 275 276 def test_server_basic(self): 277 self._pairs = _generate_channel_server_pairs(1) 278 resp = self._channelz_stub.GetServers( 279 channelz_pb2.GetServersRequest(start_server_id=0)) 280 self.assertEqual(len(resp.server), 1) 281 282 def test_get_one_server(self): 283 self._pairs = _generate_channel_server_pairs(1) 284 gss_resp = self._channelz_stub.GetServers( 285 channelz_pb2.GetServersRequest(start_server_id=0)) 286 self.assertEqual(len(gss_resp.server), 1) 287 gs_resp = self._channelz_stub.GetServer( 288 channelz_pb2.GetServerRequest( 289 server_id=gss_resp.server[0].ref.server_id)) 290 self.assertEqual(gss_resp.server[0].ref.server_id, 291 gs_resp.server.ref.server_id) 292 293 def test_server_call(self): 294 self._pairs = _generate_channel_server_pairs(1) 295 k_success = 23 296 k_failed = 29 297 for i in range(k_success): 298 self._send_successful_unary_unary(0) 299 for i in range(k_failed): 300 self._send_failed_unary_unary(0) 301 302 resp = self._channelz_stub.GetServers( 303 channelz_pb2.GetServersRequest(start_server_id=0)) 304 self.assertEqual(len(resp.server), 1) 305 self.assertEqual(resp.server[0].data.calls_started, 306 k_success + k_failed) 307 self.assertEqual(resp.server[0].data.calls_succeeded, k_success) 308 self.assertEqual(resp.server[0].data.calls_failed, k_failed) 309 310 def test_many_subchannels_and_sockets(self): 311 k_channels = 4 312 self._pairs = _generate_channel_server_pairs(k_channels) 313 k_success = 3 314 k_failed = 5 315 for i in range(k_success): 316 self._send_successful_unary_unary(0) 317 self._send_successful_unary_unary(2) 318 for i in range(k_failed): 319 self._send_failed_unary_unary(1) 320 self._send_failed_unary_unary(2) 321 322 gtc_resp = self._channelz_stub.GetTopChannels( 323 channelz_pb2.GetTopChannelsRequest(start_channel_id=0)) 324 self.assertEqual(len(gtc_resp.channel), k_channels) 325 for i in range(k_channels): 326 # If no call performed in the channel, there shouldn't be any subchannel 327 if gtc_resp.channel[i].data.calls_started == 0: 328 self.assertEqual(len(gtc_resp.channel[i].subchannel_ref), 0) 329 continue 330 331 # Otherwise, the subchannel should exist 332 self.assertGreater(len(gtc_resp.channel[i].subchannel_ref), 0) 333 gsc_resp = self._channelz_stub.GetSubchannel( 334 channelz_pb2.GetSubchannelRequest( 335 subchannel_id=gtc_resp.channel[i].subchannel_ref[0]. 336 subchannel_id)) 337 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 338 339 gs_resp = self._channelz_stub.GetSocket( 340 channelz_pb2.GetSocketRequest( 341 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) 342 self.assertEqual(gsc_resp.subchannel.data.calls_started, 343 gs_resp.socket.data.streams_started) 344 self.assertEqual(gsc_resp.subchannel.data.calls_started, 345 gs_resp.socket.data.streams_succeeded) 346 # Calls started == messages sent, only valid for unary calls 347 self.assertEqual(gsc_resp.subchannel.data.calls_started, 348 gs_resp.socket.data.messages_sent) 349 # Only receive responses when the RPC was successful 350 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 351 gs_resp.socket.data.messages_received) 352 353 def test_streaming_rpc(self): 354 self._pairs = _generate_channel_server_pairs(1) 355 # In C++, the argument for _send_successful_stream_stream is message length. 356 # Here the argument is still channel idx, to be consistent with the other two. 357 self._send_successful_stream_stream(0) 358 359 gc_resp = self._channelz_stub.GetChannel( 360 channelz_pb2.GetChannelRequest(channel_id=self._get_channel_id(0))) 361 self.assertEqual(gc_resp.channel.data.calls_started, 1) 362 self.assertEqual(gc_resp.channel.data.calls_succeeded, 1) 363 self.assertEqual(gc_resp.channel.data.calls_failed, 0) 364 # Subchannel exists 365 self.assertGreater(len(gc_resp.channel.subchannel_ref), 0) 366 367 gsc_resp = self._channelz_stub.GetSubchannel( 368 channelz_pb2.GetSubchannelRequest( 369 subchannel_id=gc_resp.channel.subchannel_ref[0].subchannel_id)) 370 self.assertEqual(gsc_resp.subchannel.data.calls_started, 1) 371 self.assertEqual(gsc_resp.subchannel.data.calls_succeeded, 1) 372 self.assertEqual(gsc_resp.subchannel.data.calls_failed, 0) 373 # Socket exists 374 self.assertEqual(len(gsc_resp.subchannel.socket_ref), 1) 375 376 gs_resp = self._channelz_stub.GetSocket( 377 channelz_pb2.GetSocketRequest( 378 socket_id=gsc_resp.subchannel.socket_ref[0].socket_id)) 379 self.assertEqual(gs_resp.socket.data.streams_started, 1) 380 self.assertEqual(gs_resp.socket.data.streams_succeeded, 1) 381 self.assertEqual(gs_resp.socket.data.streams_failed, 0) 382 self.assertEqual(gs_resp.socket.data.messages_sent, 383 test_constants.STREAM_LENGTH) 384 self.assertEqual(gs_resp.socket.data.messages_received, 385 test_constants.STREAM_LENGTH) 386 387 def test_server_sockets(self): 388 self._pairs = _generate_channel_server_pairs(1) 389 self._send_successful_unary_unary(0) 390 self._send_failed_unary_unary(0) 391 392 gs_resp = self._channelz_stub.GetServers( 393 channelz_pb2.GetServersRequest(start_server_id=0)) 394 self.assertEqual(len(gs_resp.server), 1) 395 self.assertEqual(gs_resp.server[0].data.calls_started, 2) 396 self.assertEqual(gs_resp.server[0].data.calls_succeeded, 1) 397 self.assertEqual(gs_resp.server[0].data.calls_failed, 1) 398 399 gss_resp = self._channelz_stub.GetServerSockets( 400 channelz_pb2.GetServerSocketsRequest( 401 server_id=gs_resp.server[0].ref.server_id, start_socket_id=0)) 402 # If the RPC call failed, it will raise a grpc.RpcError 403 # So, if there is no exception raised, considered pass 404 405 def test_server_listen_sockets(self): 406 self._pairs = _generate_channel_server_pairs(1) 407 408 gss_resp = self._channelz_stub.GetServers( 409 channelz_pb2.GetServersRequest(start_server_id=0)) 410 self.assertEqual(len(gss_resp.server), 1) 411 self.assertEqual(len(gss_resp.server[0].listen_socket), 1) 412 413 gs_resp = self._channelz_stub.GetSocket( 414 channelz_pb2.GetSocketRequest( 415 socket_id=gss_resp.server[0].listen_socket[0].socket_id)) 416 # If the RPC call failed, it will raise a grpc.RpcError 417 # So, if there is no exception raised, considered pass 418 419 def test_invalid_query_get_server(self): 420 try: 421 self._channelz_stub.GetServer( 422 channelz_pb2.GetServerRequest(server_id=10000)) 423 except BaseException as e: 424 self.assertIn('StatusCode.NOT_FOUND', str(e)) 425 else: 426 self.fail('Invalid query not detected') 427 428 def test_invalid_query_get_channel(self): 429 try: 430 self._channelz_stub.GetChannel( 431 channelz_pb2.GetChannelRequest(channel_id=10000)) 432 except BaseException as e: 433 self.assertIn('StatusCode.NOT_FOUND', str(e)) 434 else: 435 self.fail('Invalid query not detected') 436 437 def test_invalid_query_get_subchannel(self): 438 try: 439 self._channelz_stub.GetSubchannel( 440 channelz_pb2.GetSubchannelRequest(subchannel_id=10000)) 441 except BaseException as e: 442 self.assertIn('StatusCode.NOT_FOUND', str(e)) 443 else: 444 self.fail('Invalid query not detected') 445 446 def test_invalid_query_get_socket(self): 447 try: 448 self._channelz_stub.GetSocket( 449 channelz_pb2.GetSocketRequest(socket_id=10000)) 450 except BaseException as e: 451 self.assertIn('StatusCode.NOT_FOUND', str(e)) 452 else: 453 self.fail('Invalid query not detected') 454 455 def test_invalid_query_get_server_sockets(self): 456 try: 457 self._channelz_stub.GetServerSockets( 458 channelz_pb2.GetServerSocketsRequest( 459 server_id=10000, 460 start_socket_id=0, 461 )) 462 except BaseException as e: 463 self.assertIn('StatusCode.NOT_FOUND', str(e)) 464 else: 465 self.fail('Invalid query not detected') 466 467 468if __name__ == '__main__': 469 unittest.main(verbosity=2) 470