1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Helpers to run docker instances as jobs.""" 15 16from __future__ import print_function 17 18import json 19import os 20import subprocess 21import sys 22import tempfile 23import time 24import uuid 25 26sys.path.append(os.path.dirname(os.path.abspath(__file__))) 27import jobset 28 29_DEVNULL = open(os.devnull, "w") 30 31 32def random_name(base_name): 33 """Randomizes given base name.""" 34 return "%s_%s" % (base_name, uuid.uuid4()) 35 36 37def docker_kill(cid): 38 """Kills a docker container. Returns True if successful.""" 39 return ( 40 subprocess.call( 41 ["docker", "kill", str(cid)], 42 stdin=subprocess.PIPE, 43 stdout=_DEVNULL, 44 stderr=subprocess.STDOUT, 45 ) 46 == 0 47 ) 48 49 50def docker_mapped_port(cid, port, timeout_seconds=15): 51 """Get port mapped to internal given internal port for given container.""" 52 started = time.time() 53 while time.time() - started < timeout_seconds: 54 try: 55 output = subprocess.check_output( 56 "docker port %s %s" % (cid, port), stderr=_DEVNULL, shell=True 57 ).decode() 58 return int(output.split(":", 2)[1]) 59 except subprocess.CalledProcessError as e: 60 pass 61 raise Exception( 62 "Failed to get exposed port %s for container %s." % (port, cid) 63 ) 64 65 66def docker_ip_address(cid, timeout_seconds=15): 67 """Get port mapped to internal given internal port for given container.""" 68 started = time.time() 69 while time.time() - started < timeout_seconds: 70 cmd = "docker inspect %s" % cid 71 try: 72 output = subprocess.check_output( 73 cmd, stderr=_DEVNULL, shell=True 74 ).decode() 75 json_info = json.loads(output) 76 assert len(json_info) == 1 77 out = json_info[0]["NetworkSettings"]["IPAddress"] 78 if not out: 79 continue 80 return out 81 except subprocess.CalledProcessError as e: 82 pass 83 raise Exception( 84 "Non-retryable error: Failed to get ip address of container %s." % cid 85 ) 86 87 88def wait_for_healthy(cid, shortname, timeout_seconds): 89 """Wait timeout_seconds for the container to become healthy""" 90 started = time.time() 91 while time.time() - started < timeout_seconds: 92 try: 93 output = subprocess.check_output( 94 [ 95 "docker", 96 "inspect", 97 '--format="{{.State.Health.Status}}"', 98 cid, 99 ], 100 stderr=_DEVNULL, 101 ).decode() 102 if output.strip("\n") == "healthy": 103 return 104 except subprocess.CalledProcessError as e: 105 pass 106 time.sleep(1) 107 raise Exception( 108 "Timed out waiting for %s (%s) to pass health check" % (shortname, cid) 109 ) 110 111 112def finish_jobs(jobs, suppress_failure=True): 113 """Kills given docker containers and waits for corresponding jobs to finish""" 114 for job in jobs: 115 job.kill(suppress_failure=suppress_failure) 116 117 while any(job.is_running() for job in jobs): 118 time.sleep(1) 119 120 121def image_exists(image): 122 """Returns True if given docker image exists.""" 123 return ( 124 subprocess.call( 125 ["docker", "inspect", image], 126 stdin=subprocess.PIPE, 127 stdout=_DEVNULL, 128 stderr=subprocess.STDOUT, 129 ) 130 == 0 131 ) 132 133 134def remove_image(image, skip_nonexistent=False, max_retries=10): 135 """Attempts to remove docker image with retries.""" 136 if skip_nonexistent and not image_exists(image): 137 return True 138 for attempt in range(0, max_retries): 139 if ( 140 subprocess.call( 141 ["docker", "rmi", "-f", image], 142 stdin=subprocess.PIPE, 143 stdout=_DEVNULL, 144 stderr=subprocess.STDOUT, 145 ) 146 == 0 147 ): 148 return True 149 time.sleep(2) 150 print("Failed to remove docker image %s" % image) 151 return False 152 153 154class DockerJob: 155 """Encapsulates a job""" 156 157 def __init__(self, spec): 158 self._spec = spec 159 self._job = jobset.Job( 160 spec, newline_on_success=True, travis=True, add_env={} 161 ) 162 self._container_name = spec.container_name 163 164 def mapped_port(self, port): 165 return docker_mapped_port(self._container_name, port) 166 167 def ip_address(self): 168 return docker_ip_address(self._container_name) 169 170 def wait_for_healthy(self, timeout_seconds): 171 wait_for_healthy( 172 self._container_name, self._spec.shortname, timeout_seconds 173 ) 174 175 def kill(self, suppress_failure=False): 176 """Sends kill signal to the container.""" 177 if suppress_failure: 178 self._job.suppress_failure_message() 179 return docker_kill(self._container_name) 180 181 def is_running(self): 182 """Polls a job and returns True if given job is still running.""" 183 return self._job.state() == jobset._RUNNING 184