• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Helpers to run docker instances as jobs."""
15
16from __future__ import print_function
17
18import json
19import os
20import subprocess
21import sys
22import tempfile
23import time
24import uuid
25
26sys.path.append(os.path.dirname(os.path.abspath(__file__)))
27import jobset
28
29_DEVNULL = open(os.devnull, 'w')
30
31
32def random_name(base_name):
33    """Randomizes given base name."""
34    return '%s_%s' % (base_name, uuid.uuid4())
35
36
37def docker_kill(cid):
38    """Kills a docker container. Returns True if successful."""
39    return subprocess.call(['docker', 'kill', str(cid)],
40                           stdin=subprocess.PIPE,
41                           stdout=_DEVNULL,
42                           stderr=subprocess.STDOUT) == 0
43
44
45def docker_mapped_port(cid, port, timeout_seconds=15):
46    """Get port mapped to internal given internal port for given container."""
47    started = time.time()
48    while time.time() - started < timeout_seconds:
49        try:
50            output = subprocess.check_output('docker port %s %s' % (cid, port),
51                                             stderr=_DEVNULL,
52                                             shell=True).decode()
53            return int(output.split(':', 2)[1])
54        except subprocess.CalledProcessError as e:
55            pass
56    raise Exception('Failed to get exposed port %s for container %s.' %
57                    (port, cid))
58
59
60def docker_ip_address(cid, timeout_seconds=15):
61    """Get port mapped to internal given internal port for given container."""
62    started = time.time()
63    while time.time() - started < timeout_seconds:
64        cmd = 'docker inspect %s' % cid
65        try:
66            output = subprocess.check_output(cmd, stderr=_DEVNULL,
67                                             shell=True).decode()
68            json_info = json.loads(output)
69            assert len(json_info) == 1
70            out = json_info[0]['NetworkSettings']['IPAddress']
71            if not out:
72                continue
73            return out
74        except subprocess.CalledProcessError as e:
75            pass
76    raise Exception(
77        'Non-retryable error: Failed to get ip address of container %s.' % cid)
78
79
80def wait_for_healthy(cid, shortname, timeout_seconds):
81    """Wait timeout_seconds for the container to become healthy"""
82    started = time.time()
83    while time.time() - started < timeout_seconds:
84        try:
85            output = subprocess.check_output([
86                'docker', 'inspect', '--format="{{.State.Health.Status}}"', cid
87            ],
88                                             stderr=_DEVNULL).decode()
89            if output.strip('\n') == 'healthy':
90                return
91        except subprocess.CalledProcessError as e:
92            pass
93        time.sleep(1)
94    raise Exception('Timed out waiting for %s (%s) to pass health check' %
95                    (shortname, cid))
96
97
98def finish_jobs(jobs, suppress_failure=True):
99    """Kills given docker containers and waits for corresponding jobs to finish"""
100    for job in jobs:
101        job.kill(suppress_failure=suppress_failure)
102
103    while any(job.is_running() for job in jobs):
104        time.sleep(1)
105
106
107def image_exists(image):
108    """Returns True if given docker image exists."""
109    return subprocess.call(['docker', 'inspect', image],
110                           stdin=subprocess.PIPE,
111                           stdout=_DEVNULL,
112                           stderr=subprocess.STDOUT) == 0
113
114
115def remove_image(image, skip_nonexistent=False, max_retries=10):
116    """Attempts to remove docker image with retries."""
117    if skip_nonexistent and not image_exists(image):
118        return True
119    for attempt in range(0, max_retries):
120        if subprocess.call(['docker', 'rmi', '-f', image],
121                           stdin=subprocess.PIPE,
122                           stdout=_DEVNULL,
123                           stderr=subprocess.STDOUT) == 0:
124            return True
125        time.sleep(2)
126    print('Failed to remove docker image %s' % image)
127    return False
128
129
130class DockerJob:
131    """Encapsulates a job"""
132
133    def __init__(self, spec):
134        self._spec = spec
135        self._job = jobset.Job(spec,
136                               newline_on_success=True,
137                               travis=True,
138                               add_env={})
139        self._container_name = spec.container_name
140
141    def mapped_port(self, port):
142        return docker_mapped_port(self._container_name, port)
143
144    def ip_address(self):
145        return docker_ip_address(self._container_name)
146
147    def wait_for_healthy(self, timeout_seconds):
148        wait_for_healthy(self._container_name, self._spec.shortname,
149                         timeout_seconds)
150
151    def kill(self, suppress_failure=False):
152        """Sends kill signal to the container."""
153        if suppress_failure:
154            self._job.suppress_failure_message()
155        return docker_kill(self._container_name)
156
157    def is_running(self):
158        """Polls a job and returns True if given job is still running."""
159        return self._job.state() == jobset._RUNNING
160