1# Copyright 2021 The gRPC Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""A simple test to ensure that the Python wrapper can get xDS config.""" 15 16from concurrent.futures import ThreadPoolExecutor 17import logging 18import os 19import queue 20import sys 21import time 22import unittest 23 24from envoy.service.status.v3 import csds_pb2 25from envoy.service.status.v3 import csds_pb2_grpc 26from google.protobuf import json_format 27import grpc 28import grpc_csds 29 30_DUMMY_XDS_ADDRESS = "xds:///foo.bar" 31_DUMMY_BOOTSTRAP_FILE = """ 32{ 33 \"xds_servers\": [ 34 { 35 \"server_uri\": \"fake:///xds_server\", 36 \"channel_creds\": [ 37 { 38 \"type\": \"fake\" 39 } 40 ], 41 \"server_features\": [\"xds_v3\"] 42 } 43 ], 44 \"node\": { 45 \"id\": \"python_test_csds\", 46 \"cluster\": \"test\", 47 \"metadata\": { 48 \"foo\": \"bar\" 49 }, 50 \"locality\": { 51 \"region\": \"corp\", 52 \"zone\": \"svl\", 53 \"sub_zone\": \"mp3\" 54 } 55 } 56}\ 57""" 58 59 60@unittest.skipIf( 61 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 62) 63class TestCsds(unittest.TestCase): 64 def setUp(self): 65 os.environ["GRPC_XDS_BOOTSTRAP_CONFIG"] = _DUMMY_BOOTSTRAP_FILE 66 self._server = grpc.server(ThreadPoolExecutor()) 67 port = self._server.add_insecure_port("localhost:0") 68 grpc_csds.add_csds_servicer(self._server) 69 self._server.start() 70 71 self._channel = grpc.insecure_channel("localhost:%s" % port) 72 self._stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub( 73 self._channel 74 ) 75 76 def tearDown(self): 77 self._channel.close() 78 self._server.stop(0) 79 os.environ.pop("GRPC_XDS_BOOTSTRAP_CONFIG", None) 80 81 def get_xds_config_dump(self): 82 return self._stub.FetchClientStatus(csds_pb2.ClientStatusRequest()) 83 84 def test_no_lds_found(self): 85 dummy_channel = grpc.insecure_channel(_DUMMY_XDS_ADDRESS) 86 87 # Force the XdsClient to initialize and request a resource 88 with self.assertRaises(grpc.RpcError) as rpc_error: 89 dummy_channel.unary_unary( 90 "", 91 _registered_method=True, 92 )(b"", wait_for_ready=False, timeout=1) 93 self.assertEqual( 94 grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.exception.code() 95 ) 96 97 # The resource request will fail with DOES_NOT_EXIST (after 15s) 98 while True: 99 resp = self.get_xds_config_dump() 100 # Check node is setup in the CSDS response 101 self.assertEqual(1, len(resp.config)) 102 self.assertEqual("python_test_csds", resp.config[0].node.id) 103 self.assertEqual("test", resp.config[0].node.cluster) 104 config = json_format.MessageToDict(resp) 105 ok = False 106 try: 107 for xds_config in config["config"][0].get("xdsConfig", []): 108 if "listenerConfig" in xds_config: 109 listener = xds_config["listenerConfig"][ 110 "dynamicListeners" 111 ][0] 112 if listener["clientStatus"] == "REQUESTED": 113 ok = True 114 break 115 for generic_xds_config in config["config"][0].get( 116 "genericXdsConfigs", [] 117 ): 118 if "Listener" in generic_xds_config["typeUrl"]: 119 if generic_xds_config["clientStatus"] == "REQUESTED": 120 ok = True 121 break 122 except KeyError as e: 123 logging.debug("Invalid config: %s\n%s: %s", config, type(e), e) 124 if ok: 125 break 126 time.sleep(1) 127 dummy_channel.close() 128 129 130@unittest.skipIf( 131 sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2" 132) 133class TestCsdsStream(TestCsds): 134 def get_xds_config_dump(self): 135 if not hasattr(self, "request_queue"): 136 request_queue = queue.Queue() 137 response_iterator = self._stub.StreamClientStatus( 138 iter(request_queue.get, None) 139 ) 140 request_queue.put(csds_pb2.ClientStatusRequest()) 141 return next(response_iterator) 142 143 144if __name__ == "__main__": 145 logging.basicConfig(level=logging.DEBUG) 146 unittest.main(verbosity=2) 147