1#!/usr/bin/env python3 2# Copyright (C) 2019 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15''' Runs the given job in an isolated docker container. 16 17Also streams stdout/err onto the firebase realtime DB. 18''' 19 20import fcntl 21import logging 22import os 23import queue 24import signal 25import socket 26import subprocess 27import sys 28import threading 29import time 30 31from datetime import datetime, timedelta 32from oauth2client.client import GoogleCredentials 33from config import DB, SANDBOX_IMG 34from common_utils import init_logging, req, ConcurrentModificationError, SCOPES 35 36CUR_DIR = os.path.dirname(__file__) 37SCOPES.append('https://www.googleapis.com/auth/firebase.database') 38SCOPES.append('https://www.googleapis.com/auth/userinfo.email') 39 40 41def read_nonblock(fd): 42 fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) 43 res = '' 44 while True: 45 try: 46 buf = os.read(fd.fileno(), 8192) 47 if not buf: 48 break 49 # There are two reasons for the errors='ignore' here: 50 # 1: By reading the pipe in chunks of N bytes, we can end up truncating 51 # a valid multi-byte character and cause an "unexpected end of data". 52 # This means that we will skip valid unicode chars if they happen to 53 # span across two read() chunks. 54 # 2: The job output might just emit some invalid unicode in stdout. We 55 # don't want to crash when that happens. 56 # See b/194053229 for more context. 57 res += buf.decode('utf-8', errors='ignore') 58 except OSError: 59 break 60 return res 61 62 63def log_thread(job_id, queue): 64 ''' Uploads stdout/stderr from the queue to the firebase DB. 65 66 Each line is logged as an invidivual entry in the DB, as follows: 67 MMMMMM-NNNN log line, where M: hex-encodeed timestamp, N: monotonic counter. 68 ''' 69 uri = '%s/logs/%s.json' % (DB, job_id) 70 req('DELETE', uri) 71 while True: 72 batch = queue.get() 73 if batch is None: 74 break # EOF 75 req('PATCH', uri, body=batch) 76 logging.debug('Uploader thread terminated') 77 78 79def main(argv): 80 init_logging() 81 if len(argv) != 2: 82 print('Usage: %s job_id' % argv[0]) 83 return 1 84 85 job_id = argv[1] 86 res = 42 87 88 # The container name will be worker-N-sandbox. 89 container = socket.gethostname() + '-sandbox' 90 91 # Remove stale jobs, if any. 92 subprocess.call(['sudo', 'docker', 'rm', '-f', container]) 93 94 q = queue.Queue() 95 96 # Conversely to real programs, signal handlers in python aren't really async 97 # but are queued on the main thread. Hence We need to keep the main thread 98 # responsive to react to signals. This is to handle timeouts and graceful 99 # termination of the worker container, which dispatches a SIGTERM on stop. 100 def sig_handler(sig, _): 101 logging.warning('Job runner got signal %s, terminating job %s', sig, job_id) 102 subprocess.call(['sudo', 'docker', 'kill', container]) 103 os._exit(1) # sys.exit throws a SystemExit exception, _exit really exits. 104 105 signal.signal(signal.SIGTERM, sig_handler) 106 107 log_thd = threading.Thread(target=log_thread, args=(job_id, q)) 108 log_thd.start() 109 110 # SYS_PTRACE is required for gtest death tests and LSan. 111 cmd = [ 112 'sudo', 'docker', 'run', '--name', container, '--hostname', container, 113 '--cap-add', 'SYS_PTRACE', '--rm', '--env', 114 'PERFETTO_TEST_JOB=%s' % job_id, '--tmpfs', '/tmp:exec' 115 ] 116 117 # Propagate environment variables coming from the job config. 118 for kv in [kv for kv in os.environ.items() if kv[0].startswith('PERFETTO_')]: 119 cmd += ['--env', '%s=%s' % kv] 120 121 # We use the tmpfs mount created by gce-startup-script.sh, if present. The 122 # problem is that Docker doesn't allow to both override the tmpfs-size and 123 # prevent the "-o noexec". In turn the default tmpfs-size depends on the host 124 # phisical memory size. 125 if os.getenv('SANDBOX_TMP'): 126 cmd += ['-v', '%s:/ci/ramdisk' % os.getenv('SANDBOX_TMP')] 127 else: 128 cmd += ['--tmpfs', '/ci/ramdisk:exec'] 129 130 # Rationale for the conditional branches below: when running in the real GCE 131 # environment, the gce-startup-script.sh mounts these directories in the right 132 # locations, so that they are shared between all workers. 133 # When running the worker container outside of GCE (i.e.for local testing) we 134 # leave these empty. The VOLUME directive in the dockerfile will cause docker 135 # to automatically mount a scratch volume for those. 136 # This is so that the CI containers can be tested without having to do the 137 # work that gce-startup-script.sh does. 138 if os.getenv('SHARED_WORKER_CACHE'): 139 cmd += ['--volume=%s:/ci/cache' % os.getenv('SHARED_WORKER_CACHE')] 140 141 artifacts_dir = None 142 if os.getenv('ARTIFACTS_DIR'): 143 artifacts_dir = os.path.join(os.getenv('ARTIFACTS_DIR'), job_id) 144 subprocess.call(['sudo', 'rm', '-rf', artifacts_dir]) 145 os.mkdir(artifacts_dir) 146 cmd += ['--volume=%s:/ci/artifacts' % artifacts_dir] 147 148 cmd += os.getenv('SANDBOX_NETWORK_ARGS', '').split() 149 cmd += [SANDBOX_IMG] 150 151 logging.info('Starting %s', ' '.join(cmd)) 152 proc = subprocess.Popen( 153 cmd, 154 stdin=open(os.devnull), 155 stdout=subprocess.PIPE, 156 stderr=subprocess.STDOUT, 157 bufsize=65536) 158 stdout = '' 159 tstart = time.time() 160 while True: 161 ms_elapsed = int((time.time() - tstart) * 1000) 162 stdout += read_nonblock(proc.stdout) 163 164 # stdout/err pipes are not atomic w.r.t. '\n'. Extract whole lines out into 165 # |olines| and keep the last partial line (-1) in the |stdout| buffer. 166 lines = stdout.split('\n') 167 stdout = lines[-1] 168 lines = lines[:-1] 169 170 # Each line has a key of the form <time-from-start><out|err><counter> 171 # |counter| is relative to the batch and is only used to disambiguate lines 172 # fetched at the same time, preserving the ordering. 173 batch = {} 174 for counter, line in enumerate(lines): 175 batch['%06x-%04x' % (ms_elapsed, counter)] = line 176 if batch: 177 q.put(batch) 178 if proc.poll() is not None: 179 res = proc.returncode 180 logging.info('Job subprocess terminated with code %s', res) 181 break 182 183 # Large sleeps favour batching in the log uploader. 184 # Small sleeps favour responsiveness of the signal handler. 185 time.sleep(1) 186 187 q.put(None) # EOF maker 188 log_thd.join() 189 190 if artifacts_dir: 191 artifacts_uploader = os.path.join(CUR_DIR, 'artifacts_uploader.py') 192 cmd = ['setsid', artifacts_uploader, '--job-id=%s' % job_id, '--rm'] 193 subprocess.call(cmd) 194 195 return res 196 197 198if __name__ == '__main__': 199 sys.exit(main(sys.argv)) 200