# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import json import logging import os from autotest_lib.client.bin import test from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros import chrome from autotest_lib.client.cros import cryptohome from autotest_lib.client.cros import httpd from autotest_lib.client.cros.enterprise import enterprise_fake_dmserver CROSQA_FLAGS = [ '--gaia-url=https://gaiastaging.corp.google.com', '--lso-url=https://gaiastaging.corp.google.com', '--google-apis-url=https://www-googleapis-test.sandbox.google.com', '--oauth2-client-id=236834563817.apps.googleusercontent.com', '--oauth2-client-secret=RsKv5AwFKSzNgE0yjnurkPVI', ('--cloud-print-url=' 'https://cloudprint-nightly-ps.sandbox.google.com/cloudprint'), '--ignore-urlfetcher-cert-requests'] CROSALPHA_FLAGS = [ ('--cloud-print-url=' 'https://cloudprint-nightly-ps.sandbox.google.com/cloudprint'), '--ignore-urlfetcher-cert-requests'] TESTDMS_FLAGS = [ '--ignore-urlfetcher-cert-requests', '--disable-policy-key-verification'] FLAGS_DICT = { 'prod': [], 'crosman-qa': CROSQA_FLAGS, 'crosman-alpha': CROSALPHA_FLAGS, 'dm-test': TESTDMS_FLAGS, 'dm-fake': TESTDMS_FLAGS } DMS_URL_DICT = { 'prod': 'http://m.google.com/devicemanagement/data/api', 'crosman-qa': 'https://crosman-qa.sandbox.google.com/devicemanagement/data/api', 'crosman-alpha': 'https://crosman-alpha.sandbox.google.com/devicemanagement/data/api', 'dm-test': 'http://chromium-dm-test.appspot.com/d/%s', 'dm-fake': 'http://127.0.0.1:%d/' } DMSERVER = '--device-management-url=%s' # Username and password for the fake dm server can be anything, since # they are not used to authenticate against GAIA. USERNAME = 'fake-user@managedchrome.com' PASSWORD = 'fakepassword' class EnterprisePolicyTest(test.test): """Base class for Enterprise Policy Tests.""" WEB_PORT = 8080 WEB_HOST = 'http://localhost:%d' % WEB_PORT CHROME_POLICY_PAGE = 'chrome://policy' def setup(self): os.chdir(self.srcdir) utils.make() def initialize(self, **kwargs): self._initialize_enterprise_policy_test(**kwargs) def _initialize_enterprise_policy_test( self, case, env='dm-fake', dms_name=None, username=USERNAME, password=PASSWORD): """Initialize test parameters, fake DM Server, and Chrome flags. @param case: String name of the test case to run. @param env: String environment of DMS and Gaia servers. @param username: String user name login credential. @param password: String password login credential. @param dms_name: String name of test DM Server. """ self.case = case self.env = env self.username = username self.password = password self.dms_name = dms_name self.dms_is_fake = (env == 'dm-fake') self._enforce_variable_restrictions() # Initialize later variables to prevent error after an early failure. self._web_server = None self.cr = None # Start AutoTest DM Server if using local fake server. if self.dms_is_fake: self.fake_dm_server = enterprise_fake_dmserver.FakeDMServer( self.srcdir) self.fake_dm_server.start(self.tmpdir, self.debugdir) # Get enterprise directory of shared resources. client_dir = os.path.dirname(os.path.dirname(self.bindir)) self.enterprise_dir = os.path.join(client_dir, 'cros/enterprise') # Log the test context parameters. logging.info('Test Context Parameters:') logging.info(' Case: %r', self.case) logging.info(' Environment: %r', self.env) logging.info(' Username: %r', self.username) logging.info(' Password: %r', self.password) logging.info(' Test DMS Name: %r', self.dms_name) def cleanup(self): # Clean up AutoTest DM Server if using local fake server. if self.dms_is_fake: self.fake_dm_server.stop() # Stop web server if it was started. if self._web_server: self._web_server.stop() # Close Chrome instance if opened. if self.cr: self.cr.close() def start_webserver(self): """Set up HTTP Server to serve pages from enterprise directory.""" self._web_server = httpd.HTTPListener( self.WEB_PORT, docroot=self.enterprise_dir) self._web_server.run() def _enforce_variable_restrictions(self): """Validate class-level test context parameters. @raises error.TestError if context parameter has an invalid value, or a combination of parameters have incompatible values. """ # Verify |env| is a valid environment. if self.env not in FLAGS_DICT: raise error.TestError('Environment is invalid: %s' % self.env) # Verify test |dms_name| is given iff |env| is 'dm-test'. if self.env == 'dm-test' and not self.dms_name: raise error.TestError('dms_name must be given when using ' 'env=dm-test.') if self.env != 'dm-test' and self.dms_name: raise error.TestError('dms_name must not be given when not using ' 'env=dm-test.') def setup_case(self, policy_name, policy_value, mandatory_policies={}, suggested_policies={}, policy_name_is_suggested=False, skip_policy_value_verification=False): """Set up and confirm the preconditions of a test case. If the AutoTest fake DM Server is used, make a JSON policy blob and upload it to the fake DM server. Launch Chrome and sign in to Chrome OS. Examine the user's cryptohome vault, to confirm user is signed in successfully. Open the Policies page, and confirm that it shows the specified |policy_name| and has the correct |policy_value|. @param policy_name: Name of the policy under test. @param policy_value: Expected value for the policy under test. @param mandatory_policies: optional dict of mandatory policies (not policy_name) in name -> value format. @param suggested_policies: optional dict of suggested policies (not policy_name) in name -> value format. @param policy_name_is_suggested: True if policy_name a suggested policy. @param skip_policy_value_verification: True if setup_case should not verify that the correct policy value shows on policy page. @raises error.TestError if cryptohome vault is not mounted for user. @raises error.TestFail if |policy_name| and |policy_value| are not shown on the Policies page. """ logging.info('Setting up case: (%s: %s)', policy_name, policy_value) logging.info('Mandatory policies: %s', mandatory_policies) logging.info('Suggested policies: %s', suggested_policies) if self.dms_is_fake: if policy_name_is_suggested: suggested_policies[policy_name] = policy_value else: mandatory_policies[policy_name] = policy_value self.fake_dm_server.setup_policy(self._make_json_blob( mandatory_policies, suggested_policies)) self._launch_chrome_browser() if not cryptohome.is_vault_mounted(user=self.username, allow_fail=True): raise error.TestError('Expected to find a mounted vault for %s.' % self.username) if not skip_policy_value_verification: self.verify_policy_value(policy_name, policy_value) def _make_json_blob(self, mandatory_policies, suggested_policies): """Create JSON policy blob from mandatory and suggested policies. For the status of a policy to be shown as "Not set" on the chrome://policy page, the policy dictionary must contain no NVP for for that policy. Remove policy NVPs if value is None. @param mandatory_policies: dict of mandatory policies -> values. @param suggested_policies: dict of suggested policies -> values. @returns: JSON policy blob to send to the fake DM server. """ # Remove "Not set" policies and json-ify dicts because the # FakeDMServer expects "policy": "{value}" not "policy": {value}. for policies_dict in [mandatory_policies, suggested_policies]: policies_to_pop = [] for policy in policies_dict: value = policies_dict[policy] if value is None: policies_to_pop.append(policy) elif isinstance(value, dict): policies_dict[policy] = encode_json_string(value) elif isinstance(value, list): if len(value) > 0 and isinstance(value[0], dict): policies_dict[policy] = encode_json_string(value) for policy in policies_to_pop: policies_dict.pop(policy) modes_dict = {} if mandatory_policies: modes_dict['mandatory'] = mandatory_policies if suggested_policies: modes_dict['suggested'] = suggested_policies device_management_dict = { 'google/chromeos/user': modes_dict, 'managed_users': ['*'], 'policy_user': self.username, 'current_key_index': 0, 'invalidation_source': 16, 'invalidation_name': 'test_policy' } logging.info('Created policy blob: %s', device_management_dict) return encode_json_string(device_management_dict) def _get_policy_value_shown(self, policy_tab, policy_name): """Get the value shown for |policy_name| from the |policy_tab| page. Return the policy value for the policy given by |policy_name|, from from the chrome://policy page given by |policy_tab|. CAVEAT: the policy page does not display proper JSON. For example, lists are generally shown without the [ ] and cannot be distinguished from strings. This function decodes what it can and returns the string it found when in doubt. @param policy_tab: Tab displaying the Policies page. @param policy_name: The name of the policy. @returns: The decoded value shown for the policy on the Policies page, with the aforementioned caveat. """ row_values = policy_tab.EvaluateJavaScript(''' var section = document.getElementsByClassName( "policy-table-section")[0]; var table = section.getElementsByTagName('table')[0]; rowValues = ''; for (var i = 1, row; row = table.rows[i]; i++) { if (row.className !== 'expanded-value-container') { var name_div = row.getElementsByClassName('name elide')[0]; var name = name_div.textContent; if (name === '%s') { var value_span = row.getElementsByClassName('value')[0]; var value = value_span.textContent; var status_div = row.getElementsByClassName( 'status elide')[0]; var status = status_div.textContent; rowValues = [name, value, status]; break; } } } rowValues; ''' % policy_name) value_shown = row_values[1].encode('ascii', 'ignore') status_shown = row_values[2].encode('ascii', 'ignore') logging.debug('Policy %s row: %s', policy_name, row_values) if status_shown == 'Not set.': return None return decode_json_string(value_shown) def _get_policy_value_from_new_tab(self, policy_name): """Get the policy value for |policy_name| from the Policies page. @param policy_name: string of policy name. @returns: decoded value of the policy as shown on chrome://policy. """ values = self._get_policy_values_from_new_tab([policy_name]) return values[policy_name] def _get_policy_values_from_new_tab(self, policy_names): """Get a given policy value by opening a new tab then closing it. @param policy_names: list of strings of policy names. @returns: dict of policy name mapped to decoded values of the policy as shown on chrome://policy. """ values = {} tab = self.navigate_to_url(self.CHROME_POLICY_PAGE) for policy_name in policy_names: values[policy_name] = self._get_policy_value_shown(tab, policy_name) tab.Close() return values def verify_policy_value(self, policy_name, expected_value): """ Verify that the correct policy values shows in chrome://policy. @param policy_name: the policy we are checking. @param expected_value: the expected value for policy_name. @raises error.TestError if value does not match expected. """ value_shown = self._get_policy_value_from_new_tab(policy_name) logging.info('Value decoded from chrome://policy: %s', value_shown) # If we expect a list and don't have a list, modify the value_shown. if isinstance(expected_value, list): if isinstance(value_shown, str): if '{' in value_shown: # List of dicts. value_shown = decode_json_string('[%s]' % value_shown) elif ',' in value_shown: # List of strs. value_shown = value_shown.split(',') else: # List with one str. value_shown = [value_shown] elif not isinstance(value_shown, list): # List with one element. value_shown = [value_shown] if not expected_value == value_shown: raise error.TestError('chrome://policy shows the incorrect value ' 'for %s! Expected %s, got %s.' % ( policy_name, expected_value, value_shown)) def _initialize_chrome_extra_flags(self): """ Initialize flags used to create Chrome instance. @returns: list of extra Chrome flags. """ # Construct DM Server URL flags if not using production server. env_flag_list = [] if self.env != 'prod': if self.dms_is_fake: # Use URL provided by the fake AutoTest DM server. dmserver_str = (DMSERVER % self.fake_dm_server.server_url) else: # Use URL defined in the DMS URL dictionary. dmserver_str = (DMSERVER % (DMS_URL_DICT[self.env])) if self.env == 'dm-test': dmserver_str = (dmserver_str % self.dms_name) # Merge with other flags needed by non-prod enviornment. env_flag_list = ([dmserver_str] + FLAGS_DICT[self.env]) return env_flag_list def _launch_chrome_browser(self): """Launch Chrome browser and sign in.""" extra_flags = self._initialize_chrome_extra_flags() logging.info('Chrome Browser Arguments:') logging.info(' extra_browser_args: %s', extra_flags) logging.info(' username: %s', self.username) logging.info(' password: %s', self.password) logging.info(' gaia_login: %s', not self.dms_is_fake) self.cr = chrome.Chrome(extra_browser_args=extra_flags, username=self.username, password=self.password, gaia_login=not self.dms_is_fake, disable_gaia_services=self.dms_is_fake, autotest_ext=True) def navigate_to_url(self, url, tab=None): """Navigate tab to the specified |url|. Create new tab if none given. @param url: URL of web page to load. @param tab: browser tab to load (if any). @returns: browser tab loaded with web page. """ logging.info('Navigating to URL: %r', url) if not tab: tab = self.cr.browser.tabs.New() tab.Activate() tab.Navigate(url, timeout=8) tab.WaitForDocumentReadyStateToBeComplete() return tab def get_elements_from_page(self, tab, cmd): """Get collection of page elements that match the |cmd| filter. @param tab: tab containing the page to be scraped. @param cmd: JavaScript command to evaluate on the page. @returns object containing elements on page that match the cmd. @raises: TestFail if matching elements are not found on the page. """ try: elements = tab.EvaluateJavaScript(cmd) except Exception as err: raise error.TestFail('Unable to find matching elements on ' 'the test page: %s\n %r' %(tab.url, err)) return elements def run_once(self): """The run_once() method is required by all AutoTest tests. run_once() is defined herein to automatically determine which test case in the test class to run. The test class must have a public run_test_case() method defined. Note: The test class may override run_once() if it determines which test case to run. """ logging.info('Running test case: %s', self.case) self.run_test_case(self.case) def encode_json_string(object_value): """Convert given value to JSON format string. @param object_value: object to be converted. @returns: string in JSON format. """ return json.dumps(object_value) def decode_json_string(json_string): """Convert given JSON format string to an object. If no object is found, return json_string instead. This is to allow us to "decode" items off the policy page that aren't real JSON. @param json_string: the JSON string to be decoded. @returns: Python object represented by json_string or json_string. """ def _decode_list(json_list): result = [] for value in json_list: if isinstance(value, unicode): value = value.encode('ascii') if isinstance(value, list): value = _decode_list(value) if isinstance(value, dict): value = _decode_dict(value) result.append(value) return result def _decode_dict(json_dict): result = {} for key, value in json_dict.iteritems(): if isinstance(key, unicode): key = key.encode('ascii') if isinstance(value, unicode): value = value.encode('ascii') elif isinstance(value, list): value = _decode_list(value) result[key] = value return result try: # Decode JSON turning all unicode strings into ascii. # object_hook will get called on all dicts, so also handle lists. result = json.loads(json_string, encoding='ascii', object_hook=_decode_dict) if isinstance(result, list): result = _decode_list(result) return result except ValueError as e: # Input not valid, e.g. '1, 2, "c"' instead of '[1, 2, "c"]'. logging.warning('Could not unload: %s (%s)', json_string, e) return json_string