1# Copyright 2020 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14import contextlib 15import logging 16import pathlib 17from typing import Optional 18 19import mako.template 20import yaml 21 22from framework.infrastructure import k8s 23 24logger = logging.getLogger(__name__) 25 26 27class RunnerError(Exception): 28 """Error running app""" 29 30 31class KubernetesBaseRunner: 32 TEMPLATE_DIR_NAME = 'kubernetes-manifests' 33 TEMPLATE_DIR_RELATIVE_PATH = f'../../{TEMPLATE_DIR_NAME}' 34 35 def __init__(self, 36 k8s_namespace, 37 namespace_template=None, 38 reuse_namespace=False): 39 # Kubernetes namespaced resources manager 40 self.k8s_namespace: k8s.KubernetesNamespace = k8s_namespace 41 self.reuse_namespace = reuse_namespace 42 self.namespace_template = namespace_template or 'namespace.yaml' 43 44 # Mutable state 45 self.namespace: Optional[k8s.V1Namespace] = None 46 47 def run(self, **kwargs): 48 if self.reuse_namespace: 49 self.namespace = self._reuse_namespace() 50 if not self.namespace: 51 self.namespace = self._create_namespace( 52 self.namespace_template, namespace_name=self.k8s_namespace.name) 53 54 def cleanup(self, *, force=False): 55 if (self.namespace and not self.reuse_namespace) or force: 56 self._delete_namespace() 57 self.namespace = None 58 59 @staticmethod 60 def _render_template(template_file, **kwargs): 61 template = mako.template.Template(filename=str(template_file)) 62 return template.render(**kwargs) 63 64 @staticmethod 65 def _manifests_from_yaml_file(yaml_file): 66 with open(yaml_file) as f: 67 with contextlib.closing(yaml.safe_load_all(f)) as yml: 68 for manifest in yml: 69 yield manifest 70 71 @staticmethod 72 def _manifests_from_str(document): 73 with contextlib.closing(yaml.safe_load_all(document)) as yml: 74 for manifest in yml: 75 yield manifest 76 77 @classmethod 78 def _template_file_from_name(cls, template_name): 79 templates_path = (pathlib.Path(__file__).parent / 80 cls.TEMPLATE_DIR_RELATIVE_PATH) 81 return templates_path.joinpath(template_name).resolve() 82 83 def _create_from_template(self, template_name, **kwargs): 84 template_file = self._template_file_from_name(template_name) 85 logger.debug("Loading k8s manifest template: %s", template_file) 86 87 yaml_doc = self._render_template(template_file, **kwargs) 88 logger.info("Rendered template %s/%s:\n%s", self.TEMPLATE_DIR_NAME, 89 template_name, yaml_doc) 90 91 manifests = self._manifests_from_str(yaml_doc) 92 manifest = next(manifests) 93 # Error out on multi-document yaml 94 if next(manifests, False): 95 raise RunnerError('Exactly one document expected in manifest ' 96 f'{template_file}') 97 k8s_objects = self.k8s_namespace.apply_manifest(manifest) 98 if len(k8s_objects) != 1: 99 raise RunnerError('Expected exactly one object must created from ' 100 f'manifest {template_file}') 101 102 logger.info('%s %s created', k8s_objects[0].kind, 103 k8s_objects[0].metadata.name) 104 return k8s_objects[0] 105 106 def _reuse_deployment(self, deployment_name) -> k8s.V1Deployment: 107 deployment = self.k8s_namespace.get_deployment(deployment_name) 108 # TODO(sergiitk): check if good or must be recreated 109 return deployment 110 111 def _reuse_service(self, service_name) -> k8s.V1Service: 112 service = self.k8s_namespace.get_service(service_name) 113 # TODO(sergiitk): check if good or must be recreated 114 return service 115 116 def _reuse_namespace(self) -> k8s.V1Namespace: 117 return self.k8s_namespace.get() 118 119 def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace: 120 namespace = self._create_from_template(template, **kwargs) 121 if not isinstance(namespace, k8s.V1Namespace): 122 raise RunnerError('Expected V1Namespace to be created ' 123 f'from manifest {template}') 124 if namespace.metadata.name != kwargs['namespace_name']: 125 raise RunnerError('V1Namespace created with unexpected name: ' 126 f'{namespace.metadata.name}') 127 logger.debug('V1Namespace %s created at %s', 128 namespace.metadata.self_link, 129 namespace.metadata.creation_timestamp) 130 return namespace 131 132 def _create_service_account(self, template, 133 **kwargs) -> k8s.V1ServiceAccount: 134 resource = self._create_from_template(template, **kwargs) 135 if not isinstance(resource, k8s.V1ServiceAccount): 136 raise RunnerError('Expected V1ServiceAccount to be created ' 137 f'from manifest {template}') 138 if resource.metadata.name != kwargs['service_account_name']: 139 raise RunnerError('V1ServiceAccount created with unexpected name: ' 140 f'{resource.metadata.name}') 141 logger.debug('V1ServiceAccount %s created at %s', 142 resource.metadata.self_link, 143 resource.metadata.creation_timestamp) 144 return resource 145 146 def _create_deployment(self, template, **kwargs) -> k8s.V1Deployment: 147 deployment = self._create_from_template(template, **kwargs) 148 if not isinstance(deployment, k8s.V1Deployment): 149 raise RunnerError('Expected V1Deployment to be created ' 150 f'from manifest {template}') 151 if deployment.metadata.name != kwargs['deployment_name']: 152 raise RunnerError('V1Deployment created with unexpected name: ' 153 f'{deployment.metadata.name}') 154 logger.debug('V1Deployment %s created at %s', 155 deployment.metadata.self_link, 156 deployment.metadata.creation_timestamp) 157 return deployment 158 159 def _create_service(self, template, **kwargs) -> k8s.V1Service: 160 service = self._create_from_template(template, **kwargs) 161 if not isinstance(service, k8s.V1Service): 162 raise RunnerError('Expected V1Service to be created ' 163 f'from manifest {template}') 164 if service.metadata.name != kwargs['service_name']: 165 raise RunnerError('V1Service created with unexpected name: ' 166 f'{service.metadata.name}') 167 logger.debug('V1Service %s created at %s', service.metadata.self_link, 168 service.metadata.creation_timestamp) 169 return service 170 171 def _delete_deployment(self, name, wait_for_deletion=True): 172 logger.info('Deleting deployment %s', name) 173 try: 174 self.k8s_namespace.delete_deployment(name) 175 except k8s.ApiException as e: 176 logger.info('Deployment %s deletion failed, error: %s %s', name, 177 e.status, e.reason) 178 return 179 180 if wait_for_deletion: 181 self.k8s_namespace.wait_for_deployment_deleted(name) 182 logger.debug('Deployment %s deleted', name) 183 184 def _delete_service(self, name, wait_for_deletion=True): 185 logger.info('Deleting service %s', name) 186 try: 187 self.k8s_namespace.delete_service(name) 188 except k8s.ApiException as e: 189 logger.info('Service %s deletion failed, error: %s %s', name, 190 e.status, e.reason) 191 return 192 193 if wait_for_deletion: 194 self.k8s_namespace.wait_for_service_deleted(name) 195 logger.debug('Service %s deleted', name) 196 197 def _delete_service_account(self, name, wait_for_deletion=True): 198 logger.info('Deleting service account %s', name) 199 try: 200 self.k8s_namespace.delete_service_account(name) 201 except k8s.ApiException as e: 202 logger.info('Service account %s deletion failed, error: %s %s', 203 name, e.status, e.reason) 204 return 205 206 if wait_for_deletion: 207 self.k8s_namespace.wait_for_service_account_deleted(name) 208 logger.debug('Service account %s deleted', name) 209 210 def _delete_namespace(self, wait_for_deletion=True): 211 logger.info('Deleting namespace %s', self.k8s_namespace.name) 212 try: 213 self.k8s_namespace.delete() 214 except k8s.ApiException as e: 215 logger.info('Namespace %s deletion failed, error: %s %s', 216 self.k8s_namespace.name, e.status, e.reason) 217 return 218 219 if wait_for_deletion: 220 self.k8s_namespace.wait_for_namespace_deleted() 221 logger.debug('Namespace %s deleted', self.k8s_namespace.name) 222 223 def _wait_deployment_with_available_replicas(self, name, count=1, **kwargs): 224 logger.info('Waiting for deployment %s to have %s available replica(s)', 225 name, count) 226 self.k8s_namespace.wait_for_deployment_available_replicas( 227 name, count, **kwargs) 228 deployment = self.k8s_namespace.get_deployment(name) 229 logger.info('Deployment %s has %i replicas available', 230 deployment.metadata.name, 231 deployment.status.available_replicas) 232 233 def _wait_pod_started(self, name, **kwargs): 234 logger.info('Waiting for pod %s to start', name) 235 self.k8s_namespace.wait_for_pod_started(name, **kwargs) 236 pod = self.k8s_namespace.get_pod(name) 237 logger.info('Pod %s ready, IP: %s', pod.metadata.name, 238 pod.status.pod_ip) 239 240 def _wait_service_neg(self, name, service_port, **kwargs): 241 logger.info('Waiting for NEG for service %s', name) 242 self.k8s_namespace.wait_for_service_neg(name, **kwargs) 243 neg_name, neg_zones = self.k8s_namespace.get_service_neg( 244 name, service_port) 245 logger.info("Service %s: detected NEG=%s in zones=%s", name, neg_name, 246 neg_zones) 247