1# Copyright (c) 2012 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import re 8 9from collections import namedtuple 10 11from autotest_lib.client.bin import test, utils 12from autotest_lib.client.common_lib import error 13 14ShmRecord = namedtuple('ShmRecord', ['owner', 'perms', 'attached']) 15SemaphoreRecord = namedtuple('SemaphoreRecord', ['owner', 'perms']) 16 17class security_SysVIPC(test.test): 18 """Detect emergence of new attack surfaces in SysV IPC.""" 19 version = 1 20 expected_shm = set([ShmRecord(owner='cras', perms='640', 21 attached=('/usr/bin/cras',))]) 22 expected_sem = set([SemaphoreRecord(owner='root', perms='600')]) 23 24 def dump_ipcs_to_results(self): 25 """Writes a copy of the 'ipcs' output to the autotest results dir.""" 26 utils.system_output('ipcs > "%s/ipcs-output.txt"' % self.resultsdir) 27 28 29 def find_attached(self, shmid): 30 """Find programs attached to a given shared memory segment. 31 32 Returns full paths to each program identified. 33 34 Args: 35 @param shmid: the id as shown in ipcs and related utilities. 36 """ 37 # This finds /proc/*/exe entries where maps shows they have 38 # attached to the specified shm segment. 39 cmd = 'grep "%s */SYSV" /proc/*/maps | sed "s/maps.*/exe/g"' % shmid 40 # Then we just need to readlink each of the links. Even though 41 # we ultimately convert to a sorted tuple, we use a set to avoid 42 # accumulating duplicates as we go along. 43 exes = set() 44 for link in utils.system_output(cmd).splitlines(): 45 exes.add(os.readlink(link)) 46 return tuple(sorted(exes)) 47 48 49 def observe_shm(self): 50 """Return a set of ShmRecords representing current system shm usage.""" 51 seen = set() 52 cmd = 'ipcs -m | grep ^0' 53 for line in utils.system_output(cmd, ignore_status=True).splitlines(): 54 fields = re.split('\s+', line) 55 shmid = fields[1] 56 owner = fields[2] 57 perms = fields[3] 58 attached = self.find_attached(shmid) 59 seen.add(ShmRecord(owner=owner, perms=perms, attached=attached)) 60 return seen 61 62 63 def observe_sems(self): 64 """Return a set of SemaphoreRecords representing current usage.""" 65 seen = set() 66 cmd = 'ipcs -s | grep ^0' 67 for line in utils.system_output(cmd, ignore_status=True).splitlines(): 68 fields = re.split('\s+', line) 69 seen.add(SemaphoreRecord(owner=fields[2], perms=fields[3])) 70 return seen 71 72 73 def run_once(self): 74 """Main entry point to run the security_SysVIPC autotest.""" 75 test_fail = False 76 self.dump_ipcs_to_results() 77 # Check Shared Memory. 78 observed_shm = self.observe_shm() 79 missing = self.expected_shm.difference(observed_shm) 80 extra = observed_shm.difference(self.expected_shm) 81 if missing: 82 logging.error('Expected shm(s) not found:') 83 logging.error(missing) 84 if extra: 85 test_fail = True 86 logging.error('Unexpected shm(s) found:') 87 logging.error(extra) 88 89 # Check Semaphores. 90 observed_sem = self.observe_sems() 91 missing = self.expected_sem.difference(observed_sem) 92 extra = observed_sem.difference(self.expected_sem) 93 if missing: 94 logging.error('Expected semaphore(s) not found:') 95 logging.error(missing) 96 if extra: 97 test_fail = True 98 logging.error('Unexpected semaphore(s) found:') 99 logging.error(extra) 100 101 # Also check Message Queues. Since we currently expect 102 # none, we can avoid over-engineering this check. 103 queues = utils.system_output('ipcs -q | grep ^0', ignore_status=True) 104 if queues: 105 test_fail = True 106 logging.error('Unexpected message queues found:') 107 logging.error(queues) 108 109 if test_fail: 110 raise error.TestFail('SysV IPCs did not match expectations') 111