# Lint as: python2, python3 # Copyright 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Facade to access the display-related functionality.""" from __future__ import absolute_import from __future__ import division from __future__ import print_function import logging import multiprocessing import numpy import os import re import shutil import time import json from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import utils as common_utils from autotest_lib.client.common_lib.cros import retry from autotest_lib.client.cros import constants from autotest_lib.client.cros.graphics import graphics_utils from autotest_lib.client.cros.multimedia import facade_resource from autotest_lib.client.cros.multimedia import image_generator from autotest_lib.client.cros.power import sys_power from six.moves import range from telemetry.internal.browser import web_contents class TimeoutException(Exception): """Timeout Exception class.""" pass _FLAKY_CALL_RETRY_TIMEOUT_SEC = 60 _FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC = 2 _retry_display_call = retry.retry( (KeyError, error.CmdError), timeout_min=_FLAKY_CALL_RETRY_TIMEOUT_SEC / 60.0, delay_sec=_FLAKY_DISPLAY_CALL_RETRY_DELAY_SEC) class DisplayFacadeNative(object): """Facade to access the display-related functionality. The methods inside this class only accept Python native types. """ CALIBRATION_IMAGE_PATH = '/tmp/calibration.png' MINIMUM_REFRESH_RATE_EXPECTED = 25.0 DELAY_TIME = 3 MAX_TYPEC_PORT = 6 def __init__(self, resource): """Initializes a DisplayFacadeNative. @param resource: A FacadeResource object. """ self._resource = resource self._image_generator = image_generator.ImageGenerator() @facade_resource.retry_chrome_call def get_display_info(self): """Gets the display info from Chrome.system.display API. @return array of dict for display info. """ extension = self._resource.get_extension( constants.DISPLAY_TEST_EXTENSION) extension.ExecuteJavaScript('window.__display_info = null;') extension.ExecuteJavaScript( "chrome.system.display.getInfo(function(info) {" "window.__display_info = info;})") utils.wait_for_value(lambda: ( extension.EvaluateJavaScript("window.__display_info") != None), expected_value=True) return extension.EvaluateJavaScript("window.__display_info") @facade_resource.retry_chrome_call def get_window_info(self): """Gets the current window info from Chrome.system.window API. @return a dict for the information of the current window. """ extension = self._resource.get_extension() extension.ExecuteJavaScript('window.__window_info = null;') extension.ExecuteJavaScript( "chrome.windows.getCurrent(function(info) {" "window.__window_info = info;})") utils.wait_for_value(lambda: ( extension.EvaluateJavaScript("window.__window_info") != None), expected_value=True) return extension.EvaluateJavaScript("window.__window_info") @facade_resource.retry_chrome_call def create_window(self, url='chrome://newtab'): """Creates a new window from chrome.windows.create API. @param url: Optional URL for the new window. @return Identifier for the new window. @raise TimeoutException if it fails. """ extension = self._resource.get_extension() extension.ExecuteJavaScript( """ var __new_window_id = null; chrome.windows.create( {url: '%s'}, function(win) { __new_window_id = win.id}); """ % (url) ) extension.WaitForJavaScriptCondition( "__new_window_id !== null", timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT) return extension.EvaluateJavaScript("__new_window_id") @facade_resource.retry_chrome_call def update_window(self, window_id, state=None, bounds=None): """Updates an existing window using the chrome.windows.update API. @param window_id: Identifier for the window to update. @param state: Optional string to set the state such as 'normal', 'maximized', or 'fullscreen'. @param bounds: Optional dictionary with keys top, left, width, and height to reposition the window. @return True if success. @raise TimeoutException if it fails. """ extension = self._resource.get_extension() params = {} if state: params['state'] = state if bounds: params['top'] = bounds['top'] params['left'] = bounds['left'] params['width'] = bounds['width'] params['height'] = bounds['height'] if not params: logging.info('Nothing to update for window_id={}'.format(window_id)) return True extension.ExecuteJavaScript( """ var __status = 'Running'; chrome.windows.update(%d, %s, function(win) { __status = 'Done'}); """ % (window_id, json.dumps(params)) ) extension.WaitForJavaScriptCondition( "__status == 'Done'", timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT) return True def _get_display_by_id(self, display_id): """Gets a display by ID. @param display_id: id of the display. @return: A dict of various display info. """ for display in self.get_display_info(): if display['id'] == display_id: return display raise RuntimeError('Cannot find display ' + display_id) def get_display_modes(self, display_id): """Gets all the display modes for the specified display. @param display_id: id of the display to get modes from. @return: A list of DisplayMode dicts. """ display = self._get_display_by_id(display_id) return display['modes'] def get_display_rotation(self, display_id): """Gets the display rotation for the specified display. @param display_id: id of the display to get modes from. @return: Degree of rotation. """ display = self._get_display_by_id(display_id) return display['rotation'] def get_display_notifications(self): """Gets the display notifications @return: Returns a list of display related notifications only. """ display_notifications = [] for notification in self._resource.get_visible_notifications(): if notification['id'] == 'chrome://settings/display': display_notifications.append(notification) return display_notifications def set_display_rotation(self, display_id, rotation, delay_before_rotation=0, delay_after_rotation=0): """Sets the display rotation for the specified display. @param display_id: id of the display to get modes from. @param rotation: degree of rotation @param delay_before_rotation: time in second for delay before rotation @param delay_after_rotation: time in second for delay after rotation """ time.sleep(delay_before_rotation) extension = self._resource.get_extension( constants.DISPLAY_TEST_EXTENSION) extension.ExecuteJavaScript( """ window.__set_display_rotation_has_error = null; chrome.system.display.setDisplayProperties('%(id)s', {"rotation": %(rotation)d}, () => { if (chrome.runtime.lastError) { console.error('Failed to set display rotation', chrome.runtime.lastError); window.__set_display_rotation_has_error = "failure"; } else { window.__set_display_rotation_has_error = "success"; } }); """ % {'id': display_id, 'rotation': rotation} ) utils.wait_for_value(lambda: ( extension.EvaluateJavaScript( 'window.__set_display_rotation_has_error') != None), expected_value=True) time.sleep(delay_after_rotation) result = extension.EvaluateJavaScript( 'window.__set_display_rotation_has_error') if result != 'success': raise RuntimeError('Failed to set display rotation: %r' % result) def get_available_resolutions(self, display_id): """Gets the resolutions from the specified display. @return a list of (width, height) tuples. """ display = self._get_display_by_id(display_id) modes = display['modes'] if 'widthInNativePixels' not in modes[0]: raise RuntimeError('Cannot find widthInNativePixels attribute') if display['isInternal']: logging.info("Getting resolutions of internal display") return list(set([(mode['width'], mode['height']) for mode in modes])) return list(set([(mode['widthInNativePixels'], mode['heightInNativePixels']) for mode in modes])) def has_internal_display(self): """Returns whether the device has an internal display. @return whether the device has an internal display """ return len([d for d in self.get_display_info() if d['isInternal']]) > 0 def get_internal_display_id(self): """Gets the internal display id. @return the id of the internal display. """ for display in self.get_display_info(): if display['isInternal']: return display['id'] raise RuntimeError('Cannot find internal display') def get_first_external_display_id(self): """Gets the first external display id. @return the id of the first external display; -1 if not found. """ # Get the first external and enabled display for display in self.get_display_info(): if display['isEnabled'] and not display['isInternal']: return display['id'] return -1 def set_resolution(self, display_id, width, height, timeout=3): """Sets the resolution of the specified display. @param display_id: id of the display to set resolution for. @param width: width of the resolution @param height: height of the resolution @param timeout: maximal time in seconds waiting for the new resolution to settle in. @raise TimeoutException when the operation is timed out. """ extension = self._resource.get_extension( constants.DISPLAY_TEST_EXTENSION) extension.ExecuteJavaScript( """ window.__set_resolution_progress = null; chrome.system.display.getInfo((info_array) => { var mode; for (var info of info_array) { if (info['id'] == '%(id)s') { for (var m of info['modes']) { if (m['width'] == %(width)d && m['height'] == %(height)d) { mode = m; break; } } break; } } if (mode === undefined) { console.error('Failed to select the resolution ' + '%(width)dx%(height)d'); window.__set_resolution_progress = "mode not found"; return; } chrome.system.display.setDisplayProperties('%(id)s', {'displayMode': mode}, () => { if (chrome.runtime.lastError) { window.__set_resolution_progress = "failed: " + chrome.runtime.lastError.message; } else { window.__set_resolution_progress = "succeeded"; } } ); }); """ % {'id': display_id, 'width': width, 'height': height} ) utils.wait_for_value(lambda: ( extension.EvaluateJavaScript( 'window.__set_resolution_progress') != None), expected_value=True) result = extension.EvaluateJavaScript( 'window.__set_resolution_progress') if result != 'succeeded': raise RuntimeError('Failed to set resolution: %r' % result) @_retry_display_call def get_external_resolution(self): """Gets the resolution of the external screen. @return The resolution tuple (width, height) """ return graphics_utils.get_external_resolution() def get_internal_resolution(self): """Gets the resolution of the internal screen. @return The resolution tuple (width, height) or None if internal screen is not available """ for display in self.get_display_info(): if display['isInternal']: bounds = display['bounds'] return (bounds['width'], bounds['height']) return None def set_content_protection(self, state): """Sets the content protection of the external screen. @param state: One of the states 'Undesired', 'Desired', or 'Enabled' """ connector = self.get_external_connector_name() graphics_utils.set_content_protection(connector, state) def get_content_protection(self): """Gets the state of the content protection. @param output: The output name as a string. @return: A string of the state, like 'Undesired', 'Desired', or 'Enabled'. False if not supported. """ connector = self.get_external_connector_name() return graphics_utils.get_content_protection(connector) def get_external_crtc_id(self): """Gets the external crtc. @return The id of the external crtc.""" return graphics_utils.get_external_crtc_id() def get_internal_crtc_id(self): """Gets the internal crtc. @retrun The id of the internal crtc.""" return graphics_utils.get_internal_crtc_id() def take_internal_screenshot(self, path): """Takes internal screenshot. @param path: path to image file. """ self.take_screenshot_crtc(path, self.get_internal_crtc_id()) def take_external_screenshot(self, path): """Takes external screenshot. @param path: path to image file. """ self.take_screenshot_crtc(path, self.get_external_crtc_id()) def take_screenshot_crtc(self, path, id): """Captures the DUT screenshot, use id for selecting screen. @param path: path to image file. @param id: The id of the crtc to screenshot. """ graphics_utils.take_screenshot_crop(path, crtc_id=id) return True def save_calibration_image(self, path): """Save the calibration image to the given path. @param path: path to image file. """ shutil.copy(self.CALIBRATION_IMAGE_PATH, path) return True def take_tab_screenshot(self, output_path, url_pattern=None): """Takes a screenshot of the tab specified by the given url pattern. @param output_path: A path of the output file. @param url_pattern: A string of url pattern used to search for tabs. Default is to look for .svg image. """ if url_pattern is None: # If no URL pattern is provided, defaults to capture the first # tab that shows SVG image. url_pattern = '.svg' tabs = self._resource.get_tabs() for i in range(0, len(tabs)): if url_pattern in tabs[i].url: data = tabs[i].Screenshot(timeout=5) # Flip the colors from BGR to RGB. data = numpy.fliplr(data.reshape(-1, 3)).reshape(data.shape) data.tofile(output_path) break return True def toggle_mirrored(self): """Toggles mirrored.""" graphics_utils.screen_toggle_mirrored() return True def hide_cursor(self): """Hides mouse cursor.""" graphics_utils.hide_cursor() return True def hide_typing_cursor(self): """Hides typing cursor.""" graphics_utils.hide_typing_cursor() return True def is_mirrored_enabled(self): """Checks the mirrored state. @return True if mirrored mode is enabled. """ return bool(self.get_display_info()[0]['mirroringSourceId']) def set_mirrored(self, is_mirrored): """Sets mirrored mode. @param is_mirrored: True or False to indicate mirrored state. @return True if success, False otherwise. """ if self.is_mirrored_enabled() == is_mirrored: return True retries = 4 while retries > 0: self.toggle_mirrored() result = utils.wait_for_value(self.is_mirrored_enabled, expected_value=is_mirrored, timeout_sec=3) if result == is_mirrored: return True retries -= 1 return False def is_display_primary(self, internal=True): """Checks if internal screen is primary display. @param internal: is internal/external screen primary status requested @return boolean True if internal display is primary. """ for info in self.get_display_info(): if info['isInternal'] == internal and info['isPrimary']: return True return False def suspend_resume(self, suspend_time=10): """Suspends the DUT for a given time in second. @param suspend_time: Suspend time in second. """ sys_power.do_suspend(suspend_time) return True def suspend_resume_bg(self, suspend_time=10): """Suspends the DUT for a given time in second in the background. @param suspend_time: Suspend time in second. """ process = multiprocessing.Process(target=self.suspend_resume, args=(suspend_time,)) process.start() return True @_retry_display_call def get_external_connector_name(self): """Gets the name of the external output connector. @return The external output connector name as a string, if any. Otherwise, return False. """ return graphics_utils.get_external_connector_name() def get_internal_connector_name(self): """Gets the name of the internal output connector. @return The internal output connector name as a string, if any. Otherwise, return False. """ return graphics_utils.get_internal_connector_name() def wait_external_display_connected(self, display): """Waits for the specified external display to be connected. @param display: The display name as a string, like 'HDMI1', or False if no external display is expected. @return: True if display is connected; False otherwise. """ result = utils.wait_for_value(self.get_external_connector_name, expected_value=display) return result == display @facade_resource.retry_chrome_call def move_to_display(self, display_id): """Moves the current window to the indicated display. @param display_id: The id of the indicated display. @return True if success. @raise TimeoutException if it fails. """ display_info = self._get_display_by_id(display_id) if not display_info['isEnabled']: raise RuntimeError('Cannot find the indicated display') target_bounds = display_info['bounds'] extension = self._resource.get_extension() # If the area of bounds is empty (here we achieve this by setting # width and height to zero), the window_sizer will automatically # determine an area which is visible and fits on the screen. # For more details, see chrome/browser/ui/window_sizer.cc # Without setting state to 'normal', if the current state is # 'minimized', 'maximized' or 'fullscreen', the setting of # 'left', 'top', 'width' and 'height' will be ignored. # For more details, see chrome/browser/extensions/api/tabs/tabs_api.cc extension.ExecuteJavaScript( """ var __status = 'Running'; chrome.windows.update( chrome.windows.WINDOW_ID_CURRENT, {left: %d, top: %d, width: 0, height: 0, state: 'normal'}, function(info) { if (info.left == %d && info.top == %d && info.state == 'normal') __status = 'Done'; }); """ % (target_bounds['left'], target_bounds['top'], target_bounds['left'], target_bounds['top']) ) extension.WaitForJavaScriptCondition( "__status == 'Done'", timeout=web_contents.DEFAULT_WEB_CONTENTS_TIMEOUT) return True def is_fullscreen_enabled(self): """Checks the fullscreen state. @return True if fullscreen mode is enabled. """ return self.get_window_info()['state'] == 'fullscreen' def set_fullscreen(self, is_fullscreen): """Sets the current window to full screen. @param is_fullscreen: True or False to indicate fullscreen state. @return True if success, False otherwise. """ extension = self._resource.get_extension() if not extension: raise RuntimeError('Autotest extension not found') if is_fullscreen: window_state = "fullscreen" else: window_state = "normal" extension.ExecuteJavaScript( """ var __status = 'Running'; chrome.windows.update( chrome.windows.WINDOW_ID_CURRENT, {state: '%s'}, function() { __status = 'Done'; }); """ % window_state) utils.wait_for_value(lambda: ( extension.EvaluateJavaScript('__status') == 'Done'), expected_value=True) return self.is_fullscreen_enabled() == is_fullscreen def load_url(self, url): """Loads the given url in a new tab. The new tab will be active. @param url: The url to load as a string. @return a str, the tab descriptor of the opened tab. """ return self._resource.load_url(url) def load_calibration_image(self, resolution): """Opens a new tab and loads a full screen calibration image from the HTTP server. @param resolution: A tuple (width, height) of resolution. @return a str, the tab descriptor of the opened tab. """ path = self.CALIBRATION_IMAGE_PATH self._image_generator.generate_image(resolution[0], resolution[1], path) os.chmod(path, 0o644) tab_descriptor = self.load_url('file://%s' % path) return tab_descriptor def load_color_sequence(self, tab_descriptor, color_sequence): """Displays a series of colors on full screen on the tab. tab_descriptor is returned by any open tab API of display facade. e.g., tab_descriptor = load_url('about:blank') load_color_sequence(tab_descriptor, color) @param tab_descriptor: Indicate which tab to test. @param color_sequence: An integer list for switching colors. @return A list of the timestamp for each switch. """ tab = self._resource.get_tab_by_descriptor(tab_descriptor) color_sequence_for_java_script = ( 'var color_sequence = [' + ','.join("'#%06X'" % x for x in color_sequence) + '];') # Paints are synchronized to the fresh rate of the screen by # window.requestAnimationFrame. tab.ExecuteJavaScript(color_sequence_for_java_script + """ function render(timestamp) { window.timestamp_list.push(timestamp); if (window.count < color_sequence.length) { document.body.style.backgroundColor = color_sequence[count]; window.count++; window.requestAnimationFrame(render); } } window.count = 0; window.timestamp_list = []; window.requestAnimationFrame(render); """) # Waiting time is decided by following concerns: # 1. MINIMUM_REFRESH_RATE_EXPECTED: the minimum refresh rate # we expect it to be. Real refresh rate is related to # not only hardware devices but also drivers and browsers. # Most graphics devices support at least 60fps for a single # monitor, and under mirror mode, since the both frames # buffers need to be updated for an input frame, the refresh # rate will decrease by half, so here we set it to be a # little less than 30 (= 60/2) to make it more tolerant. # 2. DELAY_TIME: extra wait time for timeout. tab.WaitForJavaScriptCondition( 'window.count == color_sequence.length', timeout=( (len(color_sequence) / self.MINIMUM_REFRESH_RATE_EXPECTED) + self.DELAY_TIME)) return tab.EvaluateJavaScript("window.timestamp_list") def close_tab(self, tab_descriptor): """Disables fullscreen and closes the tab of the given tab descriptor. tab_descriptor is returned by any open tab API of display facade. e.g., 1. tab_descriptor = load_url(url) close_tab(tab_descriptor) 2. tab_descriptor = load_calibration_image(resolution) close_tab(tab_descriptor) @param tab_descriptor: Indicate which tab to be closed. """ if tab_descriptor: # set_fullscreen(False) is necessary here because currently there # is a bug in tabs.Close(). If the current state is fullscreen and # we call close_tab() without setting state back to normal, it will # cancel fullscreen mode without changing system configuration, and # so that the next time someone calls set_fullscreen(True), the # function will find that current state is already 'fullscreen' # (though it is not) and do nothing, which will break all the # following tests. self.set_fullscreen(False) self._resource.close_tab(tab_descriptor) else: logging.error('close_tab: not a valid tab_descriptor') return True def reset_connector_if_applicable(self, connector_type): """Resets Type-C video connector from host end if applicable. It's the workaround sequence since sometimes Type-C dongle becomes corrupted and needs to be re-plugged. @param connector_type: A string, like "VGA", "DVI", "HDMI", or "DP". """ if connector_type != 'HDMI' and connector_type != 'DP': return # Decide if we need to add --name=cros_pd usbpd_command = 'ectool --name=cros_pd usbpd' try: common_utils.run('%s 0' % usbpd_command) except error.CmdError: usbpd_command = 'ectool usbpd' port = 0 while port < self.MAX_TYPEC_PORT: # We use usbpd to get Role information and then power cycle the # SRC one. command = '%s %d' % (usbpd_command, port) try: output = common_utils.run(command).stdout if re.compile('Role.*SRC').search(output): logging.info('power-cycle Type-C port %d', port) common_utils.run('%s sink' % command) common_utils.run('%s auto' % command) port += 1 except error.CmdError: break