1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import logging 15import uuid 16 17from absl import flags 18from absl.testing import absltest 19 20from framework import xds_k8s_testcase 21 22logger = logging.getLogger(__name__) 23flags.adopt_module_key_flags(xds_k8s_testcase) 24 25# Type aliases 26_XdsTestServer = xds_k8s_testcase.XdsTestServer 27_XdsTestClient = xds_k8s_testcase.XdsTestClient 28_SecurityMode = xds_k8s_testcase.SecurityXdsKubernetesTestCase.SecurityMode 29 30 31class SecurityTest(xds_k8s_testcase.SecurityXdsKubernetesTestCase): 32 33 def test_mtls(self): 34 """mTLS test. 35 36 Both client and server configured to use TLS and mTLS. 37 """ 38 self.setupTrafficDirectorGrpc() 39 self.setupSecurityPolicies(server_tls=True, 40 server_mtls=True, 41 client_tls=True, 42 client_mtls=True) 43 44 test_server: _XdsTestServer = self.startSecureTestServer() 45 self.setupServerBackends() 46 test_client: _XdsTestClient = self.startSecureTestClient(test_server) 47 48 self.assertTestAppSecurity(_SecurityMode.MTLS, test_client, test_server) 49 self.assertSuccessfulRpcs(test_client) 50 logger.info('[SUCCESS] mTLS security mode confirmed.') 51 52 def test_tls(self): 53 """TLS test. 54 55 Both client and server configured to use TLS and not use mTLS. 56 """ 57 self.setupTrafficDirectorGrpc() 58 self.setupSecurityPolicies(server_tls=True, 59 server_mtls=False, 60 client_tls=True, 61 client_mtls=False) 62 63 test_server: _XdsTestServer = self.startSecureTestServer() 64 self.setupServerBackends() 65 test_client: _XdsTestClient = self.startSecureTestClient(test_server) 66 67 self.assertTestAppSecurity(_SecurityMode.TLS, test_client, test_server) 68 self.assertSuccessfulRpcs(test_client) 69 logger.info('[SUCCESS] TLS security mode confirmed.') 70 71 def test_plaintext_fallback(self): 72 """Plain-text fallback test. 73 74 Control plane provides no security config so both client and server 75 fallback to plaintext based on fallback-credentials. 76 """ 77 self.setupTrafficDirectorGrpc() 78 self.setupSecurityPolicies(server_tls=False, 79 server_mtls=False, 80 client_tls=False, 81 client_mtls=False) 82 83 test_server: _XdsTestServer = self.startSecureTestServer() 84 self.setupServerBackends() 85 test_client: _XdsTestClient = self.startSecureTestClient(test_server) 86 87 self.assertTestAppSecurity(_SecurityMode.PLAINTEXT, test_client, 88 test_server) 89 self.assertSuccessfulRpcs(test_client) 90 logger.info('[SUCCESS] Plaintext security mode confirmed.') 91 92 def test_mtls_error(self): 93 """Negative test: mTLS Error. 94 95 Server expects client mTLS cert, but client configured only for TLS. 96 97 Note: because this is a negative test we need to make sure the mTLS 98 failure happens after receiving the correct configuration at the 99 client. To ensure that we will perform the following steps in that 100 sequence: 101 102 - Creation of a backendService, and attaching the backend (NEG) 103 - Creation of the Server mTLS Policy, and attaching to the ECS 104 - Creation of the Client TLS Policy, and attaching to the backendService 105 - Creation of the urlMap, targetProxy, and forwardingRule 106 107 With this sequence we are sure that when the client receives the 108 endpoints of the backendService the security-config would also have 109 been received as confirmed by the TD team. 110 """ 111 # Create backend service 112 self.td.setup_backend_for_grpc( 113 health_check_port=self.server_maintenance_port) 114 115 # Start server and attach its NEGs to the backend service, but 116 # until they become healthy. 117 test_server: _XdsTestServer = self.startSecureTestServer() 118 self.setupServerBackends(wait_for_healthy_status=False) 119 120 # Setup policies and attach them. 121 self.setupSecurityPolicies(server_tls=True, 122 server_mtls=True, 123 client_tls=True, 124 client_mtls=False) 125 126 # Create the routing rule map. 127 self.td.setup_routing_rule_map_for_grpc(self.server_xds_host, 128 self.server_xds_port) 129 # Now that TD setup is complete, Backend Service can be populated 130 # with healthy backends (NEGs). 131 self.td.wait_for_backends_healthy_status() 132 133 # Start the client, but don't wait for it to report a healthy channel. 134 test_client: _XdsTestClient = self.startSecureTestClient( 135 test_server, wait_for_active_server_channel=False) 136 137 self.assertClientCannotReachServerRepeatedly(test_client) 138 logger.info( 139 "[SUCCESS] Client's connectivity state is consistent with a mTLS " 140 "error caused by not presenting mTLS certificate to the server.") 141 142 def test_server_authz_error(self): 143 """Negative test: AuthZ error. 144 145 Client does not authorize server because of mismatched SAN name. 146 The order of operations is the same as in `test_mtls_error`. 147 """ 148 # Create backend service 149 self.td.setup_backend_for_grpc( 150 health_check_port=self.server_maintenance_port) 151 152 # Start server and attach its NEGs to the backend service, but 153 # until they become healthy. 154 test_server: _XdsTestServer = self.startSecureTestServer() 155 self.setupServerBackends(wait_for_healthy_status=False) 156 157 # Regular TLS setup, but with client policy configured using 158 # intentionality incorrect server_namespace. 159 self.td.setup_server_security(server_namespace=self.server_namespace, 160 server_name=self.server_name, 161 server_port=self.server_port, 162 tls=True, 163 mtls=False) 164 incorrect_namespace = f'incorrect-namespace-{uuid.uuid4().hex}' 165 self.td.setup_client_security(server_namespace=incorrect_namespace, 166 server_name=self.server_name, 167 tls=True, 168 mtls=False) 169 170 # Create the routing rule map. 171 self.td.setup_routing_rule_map_for_grpc(self.server_xds_host, 172 self.server_xds_port) 173 # Now that TD setup is complete, Backend Service can be populated 174 # with healthy backends (NEGs). 175 self.td.wait_for_backends_healthy_status() 176 177 # Start the client, but don't wait for it to report a healthy channel. 178 test_client: _XdsTestClient = self.startSecureTestClient( 179 test_server, wait_for_active_server_channel=False) 180 181 self.assertClientCannotReachServerRepeatedly(test_client) 182 logger.info("[SUCCESS] Client's connectivity state is consistent with " 183 "AuthZ error caused by server presenting incorrect SAN.") 184 185 186if __name__ == '__main__': 187 absltest.main() 188