• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Helpers to run docker instances as jobs."""
15
16from __future__ import print_function
17
18import json
19import os
20import subprocess
21import sys
22import tempfile
23import time
24import uuid
25
26sys.path.append(os.path.dirname(os.path.abspath(__file__)))
27import jobset
28
29_DEVNULL = open(os.devnull, "w")
30
31
32def random_name(base_name):
33    """Randomizes given base name."""
34    return "%s_%s" % (base_name, uuid.uuid4())
35
36
37def docker_kill(cid):
38    """Kills a docker container. Returns True if successful."""
39    return (
40        subprocess.call(
41            ["docker", "kill", str(cid)],
42            stdin=subprocess.PIPE,
43            stdout=_DEVNULL,
44            stderr=subprocess.STDOUT,
45        )
46        == 0
47    )
48
49
50def docker_mapped_port(cid, port, timeout_seconds=15):
51    """Get port mapped to internal given internal port for given container."""
52    started = time.time()
53    while time.time() - started < timeout_seconds:
54        try:
55            output = subprocess.check_output(
56                "docker port %s %s" % (cid, port), stderr=_DEVNULL, shell=True
57            ).decode()
58            return int(output.split(":", 2)[1])
59        except subprocess.CalledProcessError as e:
60            pass
61    raise Exception(
62        "Failed to get exposed port %s for container %s." % (port, cid)
63    )
64
65
66def docker_ip_address(cid, timeout_seconds=15):
67    """Get port mapped to internal given internal port for given container."""
68    started = time.time()
69    while time.time() - started < timeout_seconds:
70        cmd = "docker inspect %s" % cid
71        try:
72            output = subprocess.check_output(
73                cmd, stderr=_DEVNULL, shell=True
74            ).decode()
75            json_info = json.loads(output)
76            assert len(json_info) == 1
77            out = json_info[0]["NetworkSettings"]["IPAddress"]
78            if not out:
79                continue
80            return out
81        except subprocess.CalledProcessError as e:
82            pass
83    raise Exception(
84        "Non-retryable error: Failed to get ip address of container %s." % cid
85    )
86
87
88def wait_for_healthy(cid, shortname, timeout_seconds):
89    """Wait timeout_seconds for the container to become healthy"""
90    started = time.time()
91    while time.time() - started < timeout_seconds:
92        try:
93            output = subprocess.check_output(
94                [
95                    "docker",
96                    "inspect",
97                    '--format="{{.State.Health.Status}}"',
98                    cid,
99                ],
100                stderr=_DEVNULL,
101            ).decode()
102            if output.strip("\n") == "healthy":
103                return
104        except subprocess.CalledProcessError as e:
105            pass
106        time.sleep(1)
107    raise Exception(
108        "Timed out waiting for %s (%s) to pass health check" % (shortname, cid)
109    )
110
111
112def finish_jobs(jobs, suppress_failure=True):
113    """Kills given docker containers and waits for corresponding jobs to finish"""
114    for job in jobs:
115        job.kill(suppress_failure=suppress_failure)
116
117    while any(job.is_running() for job in jobs):
118        time.sleep(1)
119
120
121def image_exists(image):
122    """Returns True if given docker image exists."""
123    return (
124        subprocess.call(
125            ["docker", "inspect", image],
126            stdin=subprocess.PIPE,
127            stdout=_DEVNULL,
128            stderr=subprocess.STDOUT,
129        )
130        == 0
131    )
132
133
134def remove_image(image, skip_nonexistent=False, max_retries=10):
135    """Attempts to remove docker image with retries."""
136    if skip_nonexistent and not image_exists(image):
137        return True
138    for attempt in range(0, max_retries):
139        if (
140            subprocess.call(
141                ["docker", "rmi", "-f", image],
142                stdin=subprocess.PIPE,
143                stdout=_DEVNULL,
144                stderr=subprocess.STDOUT,
145            )
146            == 0
147        ):
148            return True
149        time.sleep(2)
150    print("Failed to remove docker image %s" % image)
151    return False
152
153
154class DockerJob:
155    """Encapsulates a job"""
156
157    def __init__(self, spec):
158        self._spec = spec
159        self._job = jobset.Job(
160            spec, newline_on_success=True, travis=True, add_env={}
161        )
162        self._container_name = spec.container_name
163
164    def mapped_port(self, port):
165        return docker_mapped_port(self._container_name, port)
166
167    def ip_address(self):
168        return docker_ip_address(self._container_name)
169
170    def wait_for_healthy(self, timeout_seconds):
171        wait_for_healthy(
172            self._container_name, self._spec.shortname, timeout_seconds
173        )
174
175    def kill(self, suppress_failure=False):
176        """Sends kill signal to the container."""
177        if suppress_failure:
178            self._job.suppress_failure_message()
179        return docker_kill(self._container_name)
180
181    def is_running(self):
182        """Polls a job and returns True if given job is still running."""
183        return self._job.state() == jobset._RUNNING
184