1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import errno 6import logging 7import os 8import os.path 9import pyudev 10import stat 11import subprocess 12import tempfile 13import threading 14from autotest_lib.client.bin import test, utils 15from autotest_lib.client.common_lib import error 16from autotest_lib.client.cros import device_jail_utils 17 18 19class security_DeviceJail_Filesystem(test.test): 20 """ 21 Ensures that if the device jail filesystem is present, it restricts 22 the devices that can be seen and instantiates jails. 23 """ 24 version = 1 25 26 SHUTDOWN_TIMEOUT_SEC = 5 27 28 def warmup(self): 29 super(security_DeviceJail_Filesystem, self).warmup() 30 if not os.path.exists(device_jail_utils.JAIL_CONTROL_PATH): 31 raise error.TestNAError('Device jail is not present') 32 33 self._mount = tempfile.mkdtemp(prefix='djfs_test_') 34 logging.debug('Attempting to mount device_jail_fs on %s', self._mount) 35 try: 36 self._subprocess = subprocess.Popen(['device_jail_fs', self._mount]) 37 except OSError as e: 38 if e.errno == errno.ENOENT: 39 raise error.TestNAError('Device jail FS is not present') 40 else: 41 raise error.TestError( 42 'Device jail FS failed to start: %s' % e.strerror) 43 44 45 def _is_jail_device(self, filename): 46 context = pyudev.Context() 47 device = pyudev.Device.from_device_file(context, filename) 48 return device.subsystem == 'device_jail' 49 50 51 def _check_device(self, filename): 52 logging.debug('Checking device %s', filename) 53 # Devices outside of the FS are fine. 54 if not filename.startswith(self._mount): 55 return 56 57 # Remove mount from full path. 58 relative_dev_path = filename[len(self._mount) + 1:] 59 60 # Check if this is a passthrough device. 61 if relative_dev_path in ['null', 'full', 'zero', 'urandom']: 62 return 63 64 # Ensure USB devices are jailed. 65 if relative_dev_path.startswith('bus/usb'): 66 if self._is_jail_device(filename): 67 return 68 else: 69 raise error.TestError('Device should be jailed: %s' % filename) 70 71 # All other devices should be hidden. 72 raise error.TestError('Device should be hidden: %s' % filename) 73 74 75 def run_once(self): 76 for dirpath, _, filenames in os.walk(self._mount): 77 for filename in filenames: 78 real_path = os.path.realpath(os.path.join(dirpath, filename)) 79 try: 80 mode = os.stat(real_path).st_mode 81 except OSError as e: 82 continue 83 84 if stat.S_ISCHR(mode): 85 self._check_device(real_path) 86 87 88 def _clean_shutdown(self): 89 subprocess.check_call(['fusermount', '-u', self._mount]) 90 self._subprocess.wait() 91 92 93 def _hard_shutdown(self): 94 logging.warn('Timeout expired, killing device_jail_fs') 95 self._subprocess.kill() 96 self._subprocess.wait() 97 98 99 def cleanup(self): 100 super(security_DeviceJail_Filesystem, self).cleanup() 101 if hasattr(self, '_subprocess'): 102 logging.info('Waiting %d seconds for device_jail_fs shutdown', 103 self.SHUTDOWN_TIMEOUT_SEC) 104 timeout = threading.Timer(self.SHUTDOWN_TIMEOUT_SEC, 105 self._hard_shutdown) 106 timeout.start() 107 self._clean_shutdown() 108 timeout.cancel() 109 110 try: 111 os.rmdir(self._mount) 112 except OSError as e: 113 raise error.TestError('Failed to remove temp dir: %s' % e.strerror) 114