• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import dbus, grp, os, pwd, stat
6from dbus.mainloop.glib import DBusGMainLoop
7
8from autotest_lib.client.bin import test
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.common_lib.cros import policy, session_manager
11from autotest_lib.client.cros import cros_ui, cryptohome, ownership
12
13
14class login_UserPolicyKeys(test.test):
15    """Verifies that, after user policy is pushed, the user policy key winds
16       up stored in the right place.
17    """
18    version = 1
19
20    def _can_read(self, uid, gid, info):
21        """Returns true if uid or gid can read a file with the info stat."""
22        if uid == info.st_uid:
23            return info.st_mode & stat.S_IRUSR
24        if gid == info.st_gid:
25            return info.st_mode & stat.S_IRGRP
26        return info.st_mode & stat.S_IROTH
27
28
29    def _can_execute(self, uid, gid, info):
30        """Returns true if uid or gid can execute a file with the info stat."""
31        if uid == info.st_uid:
32            return info.st_mode & stat.S_IXUSR
33        if gid == info.st_gid:
34            return info.st_mode & stat.S_IXGRP
35        return info.st_mode & stat.S_IXOTH
36
37
38    def _verify_key_file(self, key_file):
39        """Verifies that the key file has been created and is readable."""
40        if not os.path.isfile(key_file):
41            raise error.TestFail('%s does not exist!' % key_file)
42        # And is readable by chronos.
43        chronos_uid = pwd.getpwnam('chronos').pw_uid
44        chronos_gid = grp.getgrnam('chronos').gr_gid
45        info = os.stat(key_file)
46        if not stat.S_ISREG(info.st_mode):
47            raise error.TestFail('%s is not a regular file' % key_file)
48        if not self._can_read(chronos_uid, chronos_gid, info):
49            raise error.TestFail('chronos can\' read %s, mode is %s' %
50                                 (key_file, oct(info.st_mode)))
51        # All the parent directories must be executable by chronos.
52        current = key_file
53        parent = os.path.dirname(current)
54        while current != parent:
55            current = parent
56            parent = os.path.dirname(parent)
57            info = os.stat(current)
58            mode = stat.S_IMODE(info.st_mode)
59            if not self._can_execute(chronos_uid, chronos_gid, info):
60                raise error.TestFail('chronos can\'t execute %s, mode is %s' %
61                                     (current, oct(info.st_mode)))
62
63
64    def initialize(self):
65        super(login_UserPolicyKeys, self).initialize()
66        policy.install_protobufs(self.autodir, self.job)
67        self._bus_loop = DBusGMainLoop(set_as_default=True)
68        self._cryptohome_proxy = cryptohome.CryptohomeProxy(
69            self._bus_loop, self.autodir, self.job)
70
71        # Clear the user's vault, to make sure the test starts without any
72        # policy or key lingering around. At this stage the session isn't
73        # started and there's no user signed in.
74        ownership.restart_ui_to_clear_ownership_files()
75        self._cryptohome_proxy.remove(ownership.TESTUSER)
76
77
78    def run_once(self):
79        # Mount the vault, connect to session_manager and start the session.
80        self._cryptohome_proxy.mount(ownership.TESTUSER,
81                                     ownership.TESTPASS,
82                                     create=True)
83        sm = session_manager.connect(self._bus_loop)
84        sm.StartSession(ownership.TESTUSER, '')
85
86        # No policy stored yet.
87        retrieved_policy = sm.RetrievePolicyEx(
88                session_manager.make_user_policy_descriptor(ownership.TESTUSER),
89                byte_arrays=True)
90        if retrieved_policy:
91            raise error.TestError('session_manager already has user policy!')
92
93        # And no user key exists.
94        key_file = ownership.get_user_policy_key_filename(ownership.TESTUSER)
95        if os.path.exists(key_file):
96            raise error.TestFail('%s exists before storing user policy!' %
97                                 key_file)
98
99        # Now store a policy. This is building a device policy protobuf, but
100        # that's fine as far as the session_manager is concerned; it's the
101        # outer PolicyFetchResponse that contains the public_key.
102        public_key = ownership.known_pubkey()
103        private_key = ownership.known_privkey()
104        policy_data = policy.build_policy_data()
105        policy_response = policy.generate_policy(private_key,
106                                                 public_key,
107                                                 policy_data)
108        try:
109            sm.StorePolicyEx(
110                session_manager.make_user_policy_descriptor(ownership.TESTUSER),
111                dbus.ByteArray(policy_response))
112        except dbus.exceptions.DBusException as e:
113            raise error.TestFail('Failed to store user policy', e)
114
115        # The policy key should have been created now.
116        self._verify_key_file(key_file)
117
118        # Restart the ui; the key should be deleted.
119        self._cryptohome_proxy.unmount(ownership.TESTUSER)
120        cros_ui.restart()
121        if os.path.exists(key_file):
122            raise error.TestFail('%s exists after restarting ui!' %
123                                 key_file)
124
125        # Starting a new session will restore the key that was previously
126        # stored. Reconnect to the session_manager, since the restart killed it.
127        self._cryptohome_proxy.mount(ownership.TESTUSER,
128                                     ownership.TESTPASS,
129                                     create=True)
130        sm = session_manager.connect(self._bus_loop)
131        sm.StartSession(ownership.TESTUSER, '')
132        self._verify_key_file(key_file)
133
134
135    def cleanup(self):
136        cros_ui.restart()
137        self._cryptohome_proxy.remove(ownership.TESTUSER)
138        super(login_UserPolicyKeys, self).cleanup()
139