1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import pyudev 8import re 9import select 10import struct 11import subprocess 12import threading 13import time 14from autotest_lib.client.common_lib import error 15 16 17JAIL_CONTROL_PATH = '/dev/jail-control' 18JAIL_REQUEST_PATH = '/dev/jail-request' 19 20# From linux/device_jail.h. 21REQUEST_ALLOW = 0 22REQUEST_ALLOW_WITH_LOCKDOWN = 1 23REQUEST_ALLOW_WITH_DETACH = 2 24REQUEST_DENY = 3 25 26 27class OSFile: 28 """Simple context manager for file descriptors.""" 29 def __init__(self, path, flag): 30 self._fd = os.open(path, flag) 31 32 def close(self): 33 os.close(self._fd) 34 35 def __enter__(self): 36 """Returns the fd so it can be used in with-blocks.""" 37 return self._fd 38 39 def __exit__(self, exc_type, exc_val, traceback): 40 self.close() 41 42 43class ConcurrentFunc: 44 """Simple context manager that starts and joins a thread.""" 45 def __init__(self, target_func, timeout_func): 46 self._thread = threading.Thread(target=target_func) 47 self._timeout_func = timeout_func 48 self._target_name = target_func.__name__ 49 50 def __enter__(self): 51 self._thread.start() 52 53 def __exit__(self, exc_type, exc_val, traceback): 54 self._thread.join(self._timeout_func()) 55 if self._thread.is_alive() and not exc_val: 56 raise error.TestError('Function %s timed out' % self._target_name) 57 58 59class JailDevice: 60 TIMEOUT_SEC = 3 61 PATH_MAX = 4096 62 63 def __init__(self, path_to_jail): 64 self._path_to_jail = path_to_jail 65 66 67 def __enter__(self): 68 """ 69 Creates a jail device for the device located at self._path_to_jail. 70 If the jail already exists, don't take ownership of it. 71 """ 72 try: 73 output = subprocess.check_output( 74 ['device_jail_utility', 75 '--add={0}'.format(self._path_to_jail)], 76 stderr=subprocess.STDOUT) 77 78 match = re.search('created jail at (.*)', output) 79 if match: 80 self._path = match.group(1) 81 self._owns_device = True 82 return self 83 84 match = re.search('jail already exists at (.*)', output) 85 if match: 86 self._path = match.group(1) 87 self._owns_device = False 88 return self 89 90 raise error.TestError('Failed to create device jail') 91 except subprocess.CalledProcessError as e: 92 raise error.TestError('Failed to call device_jail_utility') 93 94 95 def expect_open(self, verdict): 96 """ 97 Tries to open the jail device. This method mocks out the 98 device_jail request server which is normally run by permission_broker. 99 This allows us to set the verdict we want to test. Since the open 100 call will block until we return the verdict, we have to use a 101 separate thread to perform the open call, as well. 102 """ 103 # Python 2 does not support "nonlocal" so this closure can't 104 # set the values of identifiers it closes over unless they 105 # are in global scope. Work around this by using a list and 106 # value-mutation. 107 dev_file_wrapper = [None] 108 def open_device(): 109 try: 110 dev_file_wrapper[0] = OSFile(self._path, os.O_RDWR) 111 except OSError as e: 112 # We don't throw an error because this might be intentional, 113 # such as when the verdict is REQUEST_DENY. 114 logging.info("Failed to open jail device: %s", e.strerror) 115 116 # timeout_sec should be used for the timeouts below. 117 # This ensures we don't spend much longer than TIMEOUT_SEC in 118 # this method. 119 deadline = time.time() + self.TIMEOUT_SEC 120 def timeout_sec(): 121 return max(deadline - time.time(), 0.01) 122 123 # We have to use FDs because polling works with FDs and 124 # buffering is silly. 125 try: 126 req_f = OSFile(JAIL_REQUEST_PATH, os.O_RDWR) 127 except OSError as e: 128 raise error.TestError( 129 'Failed to open request device: %s' % e.strerror) 130 131 with req_f as req_fd: 132 poll_obj = select.poll() 133 poll_obj.register(req_fd, select.POLLIN) 134 135 # Starting open_device should ensure we have a request waiting 136 # on the request device. 137 with ConcurrentFunc(open_device, timeout_sec): 138 ready_fds = poll_obj.poll(timeout_sec() * 1000) 139 if not ready_fds: 140 raise error.TestError('Timed out waiting for jail-request') 141 142 # Sanity check the request. 143 path = os.read(req_fd, self.PATH_MAX) 144 logging.info('Received jail-request for path %s', path) 145 if path != self._path_to_jail: 146 raise error.TestError('Got request for the wrong path') 147 148 os.write(req_fd, struct.pack('I', verdict)) 149 logging.info('Responded to jail-request') 150 151 return dev_file_wrapper[0] 152 153 154 def __exit__(self, exc_type, exc_val, traceback): 155 if self._owns_device: 156 subprocess.call(['device_jail_utility', 157 '--remove={0}'.format(self._path)]) 158 159 160def get_usb_devices(): 161 context = pyudev.Context() 162 return [device for device in context.list_devices() 163 if device.device_node and device.device_node.startswith('/dev/bus/usb')] 164