1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import gobject, os, sys 6from dbus.mainloop.glib import DBusGMainLoop 7 8from autotest_lib.client.bin import test, utils 9from autotest_lib.client.common_lib import error 10from autotest_lib.client.common_lib.cros import chrome, session_manager 11from autotest_lib.client.cros import constants, ownership 12 13 14class login_OwnershipTaken(test.test): 15 """Sign in and ensure that ownership of the device is taken.""" 16 version = 1 17 18 19 def setup(self): 20 os.chdir(self.srcdir) 21 utils.make('OUT_DIR=.') 22 23 24 def initialize(self): 25 super(login_OwnershipTaken, self).initialize() 26 ownership.restart_ui_to_clear_ownership_files() 27 if (os.access(constants.OWNER_KEY_FILE, os.F_OK) or 28 os.access(constants.SIGNED_POLICY_FILE, os.F_OK)): 29 raise error.TestError('Ownership already taken!') 30 31 32 def _validate_policy(self, retrieved_policy, username): 33 # Pull in protobuf definitions. 34 sys.path.append(self.srcdir) 35 from chrome_device_policy_pb2 import ChromeDeviceSettingsProto 36 from chrome_device_policy_pb2 import UserWhitelistProto 37 from device_management_backend_pb2 import PolicyData 38 from device_management_backend_pb2 import PolicyFetchResponse 39 40 response_proto = PolicyFetchResponse() 41 response_proto.ParseFromString(retrieved_policy) 42 ownership.assert_has_policy_data(response_proto) 43 44 poldata = PolicyData() 45 poldata.ParseFromString(response_proto.policy_data) 46 ownership.assert_has_device_settings(poldata) 47 ownership.assert_username(poldata, username) 48 49 polval = ChromeDeviceSettingsProto() 50 polval.ParseFromString(poldata.policy_value) 51 ownership.assert_new_users(polval, True) 52 ownership.assert_users_on_whitelist(polval, (username,)) 53 54 55 def run_once(self): 56 bus_loop = DBusGMainLoop(set_as_default=True) 57 listener = session_manager.OwnershipSignalListener(gobject.MainLoop()) 58 listener.listen_for_new_key_and_policy() 59 with chrome.Chrome() as cr: 60 listener.wait_for_signals(desc='Owner settings written to disk.') 61 62 sm = session_manager.connect(bus_loop) 63 retrieved_policy = sm.RetrievePolicy(byte_arrays=True) 64 if retrieved_policy is None: 65 raise error.TestFail('Policy not found.') 66 self._validate_policy(retrieved_policy, cr.username) 67