• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2020 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import logging
15import uuid
16
17from absl import flags
18from absl.testing import absltest
19
20from framework import xds_k8s_testcase
21
22logger = logging.getLogger(__name__)
23flags.adopt_module_key_flags(xds_k8s_testcase)
24
25# Type aliases
26_XdsTestServer = xds_k8s_testcase.XdsTestServer
27_XdsTestClient = xds_k8s_testcase.XdsTestClient
28_SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode
29
30
31class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase):
32
33    def test_mtls(self):
34        """mTLS test.
35
36        Both client and server configured to use TLS and mTLS.
37        """
38        self.setupTrafficDirectorGrpc()
39        self.setupSecurityPolicies(server_tls=True,
40                                   server_mtls=True,
41                                   client_tls=True,
42                                   client_mtls=True)
43
44        test_server: _XdsTestServer = self.startSecureTestServer()
45        self.setupServerBackends()
46        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
47
48        self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server)
49        self.assertSuccessfulRpcs(test_client)
50        logger.info('[SUCCESS] mTLS security mode confirmed.')
51
52    def test_tls(self):
53        """TLS test.
54
55        Both client and server configured to use TLS and not use mTLS.
56        """
57        self.setupTrafficDirectorGrpc()
58        self.setupSecurityPolicies(server_tls=True,
59                                   server_mtls=False,
60                                   client_tls=True,
61                                   client_mtls=False)
62
63        test_server: _XdsTestServer = self.startSecureTestServer()
64        self.setupServerBackends()
65        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
66
67        self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server)
68        self.assertSuccessfulRpcs(test_client)
69        logger.info('[SUCCESS] TLS security mode confirmed.')
70
71    def test_plaintext_fallback(self):
72        """Plain-text fallback test.
73
74        Control plane provides no security config so both client and server
75        fallback to plaintext based on fallback-credentials.
76        """
77        self.setupTrafficDirectorGrpc()
78        self.setupSecurityPolicies(server_tls=False,
79                                   server_mtls=False,
80                                   client_tls=False,
81                                   client_mtls=False)
82
83        test_server: _XdsTestServer = self.startSecureTestServer()
84        self.setupServerBackends()
85        test_client: _XdsTestClient = self.startSecureTestClient(test_server)
86
87        self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client,
88                                   test_server)
89        self.assertSuccessfulRpcs(test_client)
90        logger.info('[SUCCESS] Plaintext security mode confirmed.')
91
92    def test_mtls_error(self):
93        """Negative test: mTLS Error.
94
95        Server expects client mTLS cert, but client configured only for TLS.
96
97        Note: because this is a negative test we need to make sure the mTLS
98        failure happens after receiving the correct configuration at the
99        client. To ensure that we will perform the following steps in that
100        sequence:
101
102        - Creation of a backendService, and attaching the backend (NEG)
103        - Creation of the Server mTLS Policy, and attaching to the ECS
104        - Creation of the Client TLS Policy, and attaching to the backendService
105        - Creation of the urlMap, targetProxy, and forwardingRule
106
107        With this sequence we are sure that when the client receives the
108        endpoints of the backendService the security-config would also have
109        been received as confirmed by the TD team.
110        """
111        # Create backend service
112        self.td.setup_backend_for_grpc(
113            health_check_port=self.server_maintenance_port)
114
115        # Start server and attach its NEGs to the backend service, but
116        # until they become healthy.
117        test_server: _XdsTestServer = self.startSecureTestServer()
118        self.setupServerBackends(wait_for_healthy_status=False)
119
120        # Setup policies and attach them.
121        self.setupSecurityPolicies(server_tls=True,
122                                   server_mtls=True,
123                                   client_tls=True,
124                                   client_mtls=False)
125
126        # Create the routing rule map.
127        self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
128                                                self.server_xds_port)
129        # Now that TD setup is complete, Backend Service can be populated
130        # with healthy backends (NEGs).
131        self.td.wait_for_backends_healthy_status()
132
133        # Start the client, but don't wait for it to report a healthy channel.
134        test_client: _XdsTestClient = self.startSecureTestClient(
135            test_server, wait_for_active_server_channel=False)
136
137        self.assertClientCannotReachServerRepeatedly(test_client)
138        logger.info(
139            "[SUCCESS] Client's connectivity state is consistent with a mTLS "
140            "error caused by not presenting mTLS certificate to the server.")
141
142    def test_server_authz_error(self):
143        """Negative test: AuthZ error.
144
145        Client does not authorize server because of mismatched SAN name.
146        The order of operations is the same as in `test_mtls_error`.
147        """
148        # Create backend service
149        self.td.setup_backend_for_grpc(
150            health_check_port=self.server_maintenance_port)
151
152        # Start server and attach its NEGs to the backend service, but
153        # until they become healthy.
154        test_server: _XdsTestServer = self.startSecureTestServer()
155        self.setupServerBackends(wait_for_healthy_status=False)
156
157        # Regular TLS setup, but with client policy configured using
158        # intentionality incorrect server_namespace.
159        self.td.setup_server_security(server_namespace=self.server_namespace,
160                                      server_name=self.server_name,
161                                      server_port=self.server_port,
162                                      tls=True,
163                                      mtls=False)
164        incorrect_namespace = f'incorrect-namespace-{uuid.uuid4().hex}'
165        self.td.setup_client_security(server_namespace=incorrect_namespace,
166                                      server_name=self.server_name,
167                                      tls=True,
168                                      mtls=False)
169
170        # Create the routing rule map.
171        self.td.setup_routing_rule_map_for_grpc(self.server_xds_host,
172                                                self.server_xds_port)
173        # Now that TD setup is complete, Backend Service can be populated
174        # with healthy backends (NEGs).
175        self.td.wait_for_backends_healthy_status()
176
177        # Start the client, but don't wait for it to report a healthy channel.
178        test_client: _XdsTestClient = self.startSecureTestClient(
179            test_server, wait_for_active_server_channel=False)
180
181        self.assertClientCannotReachServerRepeatedly(test_client)
182        logger.info("[SUCCESS] Client's connectivity state is consistent with "
183                    "AuthZ error caused by server presenting incorrect SAN.")
184
185
186if __name__ == '__main__':
187    absltest.main()
188