1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14"""Helpers to run docker instances as jobs.""" 15 16from __future__ import print_function 17 18import tempfile 19import time 20import uuid 21import os 22import subprocess 23import json 24 25import jobset 26 27_DEVNULL = open(os.devnull, 'w') 28 29 30def random_name(base_name): 31 """Randomizes given base name.""" 32 return '%s_%s' % (base_name, uuid.uuid4()) 33 34 35def docker_kill(cid): 36 """Kills a docker container. Returns True if successful.""" 37 return subprocess.call(['docker', 'kill', str(cid)], 38 stdin=subprocess.PIPE, 39 stdout=_DEVNULL, 40 stderr=subprocess.STDOUT) == 0 41 42 43def docker_mapped_port(cid, port, timeout_seconds=15): 44 """Get port mapped to internal given internal port for given container.""" 45 started = time.time() 46 while time.time() - started < timeout_seconds: 47 try: 48 output = subprocess.check_output('docker port %s %s' % (cid, port), 49 stderr=_DEVNULL, 50 shell=True) 51 return int(output.split(':', 2)[1]) 52 except subprocess.CalledProcessError as e: 53 pass 54 raise Exception('Failed to get exposed port %s for container %s.' % 55 (port, cid)) 56 57 58def docker_ip_address(cid, timeout_seconds=15): 59 """Get port mapped to internal given internal port for given container.""" 60 started = time.time() 61 while time.time() - started < timeout_seconds: 62 cmd = 'docker inspect %s' % cid 63 try: 64 output = subprocess.check_output(cmd, stderr=_DEVNULL, shell=True) 65 json_info = json.loads(output) 66 assert len(json_info) == 1 67 out = json_info[0]['NetworkSettings']['IPAddress'] 68 if not out: 69 continue 70 return out 71 except subprocess.CalledProcessError as e: 72 pass 73 raise Exception( 74 'Non-retryable error: Failed to get ip address of container %s.' % cid) 75 76 77def wait_for_healthy(cid, shortname, timeout_seconds): 78 """Wait timeout_seconds for the container to become healthy""" 79 started = time.time() 80 while time.time() - started < timeout_seconds: 81 try: 82 output = subprocess.check_output([ 83 'docker', 'inspect', '--format="{{.State.Health.Status}}"', cid 84 ], 85 stderr=_DEVNULL) 86 if output.strip('\n') == 'healthy': 87 return 88 except subprocess.CalledProcessError as e: 89 pass 90 time.sleep(1) 91 raise Exception('Timed out waiting for %s (%s) to pass health check' % 92 (shortname, cid)) 93 94 95def finish_jobs(jobs, suppress_failure=True): 96 """Kills given docker containers and waits for corresponding jobs to finish""" 97 for job in jobs: 98 job.kill(suppress_failure=suppress_failure) 99 100 while any(job.is_running() for job in jobs): 101 time.sleep(1) 102 103 104def image_exists(image): 105 """Returns True if given docker image exists.""" 106 return subprocess.call(['docker', 'inspect', image], 107 stdin=subprocess.PIPE, 108 stdout=_DEVNULL, 109 stderr=subprocess.STDOUT) == 0 110 111 112def remove_image(image, skip_nonexistent=False, max_retries=10): 113 """Attempts to remove docker image with retries.""" 114 if skip_nonexistent and not image_exists(image): 115 return True 116 for attempt in range(0, max_retries): 117 if subprocess.call(['docker', 'rmi', '-f', image], 118 stdin=subprocess.PIPE, 119 stdout=_DEVNULL, 120 stderr=subprocess.STDOUT) == 0: 121 return True 122 time.sleep(2) 123 print('Failed to remove docker image %s' % image) 124 return False 125 126 127class DockerJob: 128 """Encapsulates a job""" 129 130 def __init__(self, spec): 131 self._spec = spec 132 self._job = jobset.Job(spec, 133 newline_on_success=True, 134 travis=True, 135 add_env={}) 136 self._container_name = spec.container_name 137 138 def mapped_port(self, port): 139 return docker_mapped_port(self._container_name, port) 140 141 def ip_address(self): 142 return docker_ip_address(self._container_name) 143 144 def wait_for_healthy(self, timeout_seconds): 145 wait_for_healthy(self._container_name, self._spec.shortname, 146 timeout_seconds) 147 148 def kill(self, suppress_failure=False): 149 """Sends kill signal to the container.""" 150 if suppress_failure: 151 self._job.suppress_failure_message() 152 return docker_kill(self._container_name) 153 154 def is_running(self): 155 """Polls a job and returns True if given job is still running.""" 156 return self._job.state() == jobset._RUNNING 157