# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """A test verifying Address Space Layout Randomization Uses system calls to get important pids and then gets information about the pids in /proc//maps. Restarts the tested processes and reads information about them again. If ASLR is enabled, memory mappings should change. """ from autotest_lib.client.bin import test from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.cros import upstart import logging import time import pprint import re def _pidof(exe_name): """Returns the PID of the first process with the given name.""" pid = utils.system_output('pidof %s' % exe_name, ignore_status=True).strip() if len(pid.split()) > 1: pid = pid.split()[0] return pid class Process(object): """Holds information about a process. Stores basic information about a process. This class is a base for UpstartProcess and SystemdProcess declared below. Attributes: _name: String name of process. _service_name: Name of the service corresponding to the process. _parent: String name of process's parent. Defaults to None. """ _START_POLL_INTERVAL_SECONDS = 1 _START_TIMEOUT = 30 def __init__(self, name, service_name, parent=None): self._name = name self._service_name = service_name self._parent = parent def get_name(self): return self._name def get_pid(self): """Gets pid of process, waiting for it if not found. Raises: error.TestFail: corresponding process is not found. """ retries = 0 ps_results = "" while retries < self._START_TIMEOUT: if self._parent is None: ps_results = _pidof(self._name) else: ppid = _pidof(self._parent) get_pid_command = ('ps -C %s -o pid,ppid | grep " %s$"' ' | awk \'{print $1}\'') % (self._name, ppid) ps_results = utils.system_output(get_pid_command).strip() if ps_results != "": return ps_results # The process could not be found. We then sleep, hoping the # process is just slow to initially start. time.sleep(self._START_POLL_INTERVAL_SECONDS) retries += 1 # We never saw the process, so abort with details on who was missing. raise error.TestFail('Never saw a pid for "%s"' % (self._name)) class UpstartProcess(Process): """Represents an Upstart service.""" def exists(self): """Checks if the service is present in Upstart configuration.""" return upstart.has_service(self._service_name) def restart(self): """Restarts the process via initctl.""" utils.system('initctl restart %s' % self._service_name) class SystemdProcess(Process): """Represents an systemd service.""" def exists(self): """Checks if the service is present in systemd configuration.""" cmd = 'systemctl show -p LoadState %s.service' % self._service_name output = utils.system_output(cmd, ignore_status=True).strip() return output == 'LoadState=loaded' def restart(self): """Restarts the process via systemctl.""" utils.system('systemctl restart %s' % self._service_name) class Mapping(object): """Holds information about a process's address mapping. Stores information about one memory mapping for a process. Attributes: _name: String name of process/memory occupying the location. _start: String containing memory address range start. """ def __init__(self, name, start): self._start = start self._name = name def set_start(self, new_value): self._start = new_value def get_start(self): return self._start def __repr__(self): return "" % (self._name, self._start) class security_ASLR(test.test): """Runs ASLR tests See top document comments for more information. Attributes: version: Current version of the test. """ version = 1 _TEST_ITERATION_COUNT = 5 _ASAN_SYMBOL = "__asan_init" # 'update_engine' should at least be present on all boards. _PROCESS_LIST = [UpstartProcess('chrome', 'ui', parent='session_manager'), UpstartProcess('debugd', 'debugd'), UpstartProcess('update_engine', 'update-engine'), SystemdProcess('update_engine', 'update-engine')] def get_processes_to_test(self): """Gets processes to test for main function. Called by run_once to get processes for this program to test. Filters binaries that actually exist on the system. This has to be a method because it constructs process objects. Returns: A list of process objects to be tested (see below for definition of process class). """ return [p for p in self._PROCESS_LIST if p.exists()] def running_on_asan(self): """Returns whether we're running on ASan.""" # -q, --quiet * Only output 'bad' things # -F, --format * Use specified format for output # -g, --gmatch * Use regex rather than string compare (with -s) # -s, --symbol * Find a specified symbol scanelf_command = "scanelf -qF'%s#F'" scanelf_command += " -gs %s `which debugd`" % self._ASAN_SYMBOL symbol = utils.system_output(scanelf_command) logging.debug("running_on_asan(): symbol: '%s', _ASAN_SYMBOL: '%s'", symbol, self._ASAN_SYMBOL) return symbol != "" def test_randomization(self, process): """Tests ASLR of a single process. This is the main test function for the program. It creates data structures out of useful information from sampling /proc//maps after restarting the process and then compares address starting locations of all executable, stack, and heap memory from each iteration. @param process: a process object representing the process to be tested. Returns: A dict containing a Boolean for whether or not the test passed and a list of string messages about passing/failing cases. """ test_result = dict([('pass', True), ('results', []), ('cases', dict())]) name = process.get_name() mappings = list() pid = -1 for i in range(self._TEST_ITERATION_COUNT): new_pid = process.get_pid() if pid == new_pid: raise error.TestFail( 'Service "%s" retained PID %d after restart.' % (name, pid)) pid = new_pid mappings.append(self.map(pid)) process.restart() logging.debug('Complete mappings dump for process %s:\n%s', name, pprint.pformat(mappings, 4)) initial_map = mappings[0] for i, mapping in enumerate(mappings[1:]): logging.debug('Iteration %d', i) for key in mapping.iterkeys(): # Set default case result to fail, pass when an address change # occurs. if not test_result['cases'].has_key(key): test_result['cases'][key] = dict([('pass', False), ('number', 0), ('total', self._TEST_ITERATION_COUNT)]) was_same = (initial_map.has_key(key) and initial_map[key].get_start() == mapping[key].get_start()) if was_same: logging.debug("Bad: %s address didn't change", key) else: logging.debug('Good: %s address changed', key) test_result['cases'][key]['number'] += 1 test_result['cases'][key]['pass'] = True for case, result in test_result['cases'].iteritems(): if result['pass']: test_result['results'].append( '[PASS] Address for %s ' 'successfully changed' % case) else: test_result['results'].append('[FAIL] Address for %s had ' 'deterministic value: %s' % (case, mappings[0][case].get_start())) test_result['pass'] = test_result['pass'] and result['pass'] return test_result def map(self, pid): """Creates data structure from table in /proc//maps. Gets all data from /proc//maps, parses each entry, and saves entries corresponding to executable, stack, or heap memory into a dictionary. @param pid: a string containing the pid to be tested. Returns: A dict mapping names to mapping objects (see above for mapping definition). """ memory_map = dict() maps_file = open("/proc/%s/maps" % pid) for maps_line in maps_file: result = self.parse_result(maps_line) if result is None: continue name = result['name'] start = result['start'] perms = result['perms'] is_memory = name == '[heap]' or name == '[stack]' is_useful = re.search('x', perms) is not None or is_memory if not is_useful: continue if not name in memory_map: memory_map[name] = Mapping(name, start) elif memory_map[name].get_start() < start: memory_map[name].set_start(start) return memory_map def parse_result(self, result): """Builds dictionary from columns of a line of /proc//maps Uses regular expressions to determine column separations. Puts column data into a dict mapping column names to their string values. @param result: one line of /proc//maps as a string, for any . Returns: None if the regular expression wasn't matched. Otherwise: A dict of string column names mapped to their string values. For example: {'start': '9e981700000', 'end': '9e981800000', 'perms': 'rwxp', 'something': '00000000', 'major': '00', 'minor': '00', 'inode': '00'} """ # Build regex to parse one line of proc maps table. memory = r'(?P\w+)-(?P\w+)' perms = r'(?P(r|-)(w|-)(x|-)(s|p))' something = r'(?P\w+)' devices = r'(?P\w+):(?P\w+)' inode = r'(?P[0-9]+)' name = r'(?P([a-zA-Z0-9/]+|\[heap\]|\[stack\]))' regex = r'%s +%s +%s +%s +%s +%s' % (memory, perms, something, devices, inode, name) found_match = re.match(regex, result) if found_match is None: return None parsed_result = found_match.groupdict() return parsed_result def run_once(self): """Main function. Called when test is run. Gets processes to test and calls test on them. Raises: error.TestFail if any processes' memory mapping addresses are the same after restarting. """ if self.running_on_asan(): logging.warning("security_ASLR is not available on ASan.") return processes = self.get_processes_to_test() # If we don't find any of the processes we wanted to test, we fail. if len(processes) == 0: proc_names = ", ".join([p.get_name() for p in self._PROCESS_LIST]) raise error.TestFail( 'Could not find any of "%s" processes to test' % proc_names) aslr_enabled = True full_results = dict() for process in processes: test_results = self.test_randomization(process) full_results[process.get_name()] = test_results['results'] if not test_results['pass']: aslr_enabled = False logging.debug('SUMMARY:') for process_name, results in full_results.iteritems(): logging.debug('Results for %s:', process_name) for result in results: logging.debug(result) if not aslr_enabled: raise error.TestFail('One or more processes had deterministic ' 'memory mappings')