• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The gRPC Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""A simple test to ensure that the Python wrapper can get xDS config."""
15
16from concurrent.futures import ThreadPoolExecutor
17import logging
18import os
19import queue
20import sys
21import time
22import unittest
23
24from envoy.service.status.v3 import csds_pb2
25from envoy.service.status.v3 import csds_pb2_grpc
26from google.protobuf import json_format
27import grpc
28import grpc_csds
29
30_DUMMY_XDS_ADDRESS = "xds:///foo.bar"
31_DUMMY_BOOTSTRAP_FILE = """
32{
33  \"xds_servers\": [
34    {
35      \"server_uri\": \"fake:///xds_server\",
36      \"channel_creds\": [
37        {
38          \"type\": \"fake\"
39        }
40      ],
41      \"server_features\": [\"xds_v3\"]
42    }
43  ],
44  \"node\": {
45    \"id\": \"python_test_csds\",
46    \"cluster\": \"test\",
47    \"metadata\": {
48      \"foo\": \"bar\"
49    },
50    \"locality\": {
51      \"region\": \"corp\",
52      \"zone\": \"svl\",
53      \"sub_zone\": \"mp3\"
54    }
55  }
56}\
57"""
58
59
60@unittest.skipIf(
61    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
62)
63class TestCsds(unittest.TestCase):
64    def setUp(self):
65        os.environ["GRPC_XDS_BOOTSTRAP_CONFIG"] = _DUMMY_BOOTSTRAP_FILE
66        self._server = grpc.server(ThreadPoolExecutor())
67        port = self._server.add_insecure_port("localhost:0")
68        grpc_csds.add_csds_servicer(self._server)
69        self._server.start()
70
71        self._channel = grpc.insecure_channel("localhost:%s" % port)
72        self._stub = csds_pb2_grpc.ClientStatusDiscoveryServiceStub(
73            self._channel
74        )
75
76    def tearDown(self):
77        self._channel.close()
78        self._server.stop(0)
79        os.environ.pop("GRPC_XDS_BOOTSTRAP_CONFIG", None)
80
81    def get_xds_config_dump(self):
82        return self._stub.FetchClientStatus(csds_pb2.ClientStatusRequest())
83
84    def test_no_lds_found(self):
85        dummy_channel = grpc.insecure_channel(_DUMMY_XDS_ADDRESS)
86
87        # Force the XdsClient to initialize and request a resource
88        with self.assertRaises(grpc.RpcError) as rpc_error:
89            dummy_channel.unary_unary(
90                "",
91                _registered_method=True,
92            )(b"", wait_for_ready=False, timeout=1)
93        self.assertEqual(
94            grpc.StatusCode.DEADLINE_EXCEEDED, rpc_error.exception.code()
95        )
96
97        # The resource request will fail with DOES_NOT_EXIST (after 15s)
98        while True:
99            resp = self.get_xds_config_dump()
100            # Check node is setup in the CSDS response
101            self.assertEqual(1, len(resp.config))
102            self.assertEqual("python_test_csds", resp.config[0].node.id)
103            self.assertEqual("test", resp.config[0].node.cluster)
104            config = json_format.MessageToDict(resp)
105            ok = False
106            try:
107                for xds_config in config["config"][0].get("xdsConfig", []):
108                    if "listenerConfig" in xds_config:
109                        listener = xds_config["listenerConfig"][
110                            "dynamicListeners"
111                        ][0]
112                        if listener["clientStatus"] == "REQUESTED":
113                            ok = True
114                            break
115                for generic_xds_config in config["config"][0].get(
116                    "genericXdsConfigs", []
117                ):
118                    if "Listener" in generic_xds_config["typeUrl"]:
119                        if generic_xds_config["clientStatus"] == "REQUESTED":
120                            ok = True
121                            break
122            except KeyError as e:
123                logging.debug("Invalid config: %s\n%s: %s", config, type(e), e)
124            if ok:
125                break
126            time.sleep(1)
127        dummy_channel.close()
128
129
130@unittest.skipIf(
131    sys.version_info[0] < 3, "ProtoBuf descriptor has moved on from Python2"
132)
133class TestCsdsStream(TestCsds):
134    def get_xds_config_dump(self):
135        if not hasattr(self, "request_queue"):
136            request_queue = queue.Queue()
137            response_iterator = self._stub.StreamClientStatus(
138                iter(request_queue.get, None)
139            )
140        request_queue.put(csds_pb2.ClientStatusRequest())
141        return next(response_iterator)
142
143
144if __name__ == "__main__":
145    logging.basicConfig(level=logging.DEBUG)
146    unittest.main(verbosity=2)
147