# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import json import os import time import urllib2 import selenium from autotest_lib.client.bin import utils from extension_pages import e2e_test_utils from extension_pages import options class TestUtils(object): """Contains all the helper functions for Chrome mirroring automation.""" short_wait_secs = 3 step_timeout_secs = 60 device_info = 'http://%s:8008/setup/eureka_info' cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq', 'softirq', 'steal', 'guest', 'guest_nice'] cpu_idle_fields = ['idle', 'iowait'] def __init__(self): """Constructor""" def _connect_extension(self, driver, extension_id): """Connects to extension. @param driver: The webdriver instance. @param extension_id: The id of Media Router extension. """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.go_to_page() e2e_test_utils_page.execute_script( 'chrome.runtime.connect("%s")' % extension_id) def start_mirroring_media_router( self, driver, extension_id, receiver_ip, url, fullscreen, udp_proxy_server=None, network_profile=None): """Starts a mirroring session using Media Router extension. @param driver: The webdriver instance. @param extension_id: The id of Media Router extension. @param receiver_ip: The IP of the receiver to launch the activity. @param url: The URL to navigate to. @param fullscreen: Click the fullscreen button or not. @param udp_proxy_server: Address of udp proxy server, in http address format, http://:. @param network_profile: Network profile to use. @return True if the function finishes. """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) tab_handles = driver.window_handles device_name = self._get_device_name(receiver_ip) self._connect_extension(driver, extension_id) e2e_test_utils_page.execute_script( 'chrome.extension.getBackgroundPage().e2eTestService.start()') time.sleep(self.short_wait_secs * 3) # Wait for discovery self._connect_extension(driver, extension_id) if network_profile and udp_proxy_server: e2e_test_utils_page.execute_script( 'chrome.extension.getBackgroundPage().e2eTestService.' 'mirrorTabViaCastStreaming("%s", "%s", "%s", "%s")' % ( device_name, url, udp_proxy_server, network_profile)) else: e2e_test_utils_page.execute_script( 'chrome.extension.getBackgroundPage().e2eTestService.' 'mirrorTabViaCastStreaming("%s", "%s")' % ( device_name, url)) time.sleep(self.short_wait_secs) # Wait for mirroring to start all_handles = driver.window_handles video_handle = [x for x in all_handles if x not in tab_handles].pop() driver.switch_to_window(video_handle) self.navigate_to_test_url(driver, url, fullscreen) return True def stop_mirroring_media_router(self, driver, extension_id): """Stops a mirroring session on a device using Media Router extension. @param driver: The webdriver instance. @param extension_id: The id of Media Router extension. """ e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.go_to_page() self._connect_extension(driver, extension_id) e2e_test_utils_page.execute_script( ('chrome.extension.getBackgroundPage()' '.e2eTestService.stopMirroring()')) # Wait for receiver to go back to home screen time.sleep(self.short_wait_secs) def enable_automatic_send_usage(self, driver): """Enables automatic send usage statistics and crash reports in Chrome. @param driver: The webdriver instance of the browser. """ driver.get('chrome://settings-frame') driver.find_element_by_id('advanced-settings-expander').click() usage_check_box = driver.find_element_by_id('metrics-reporting-enabled') utils.poll_for_condition( usage_check_box.is_displayed, timeout=10, sleep_interval=1, desc='Wait for automatic send usage checkbox.') if not usage_check_box.is_selected(): usage_check_box.click() def set_local_storage_mr_mirroring( self, driver, extension_id, frame_rate=60): """Enables extension fine log in Chrome for debugging. @param driver: The webdriver instance of the browser. @param extension_id: The extension ID of Media Router extension. @param frame_rate: The capture frame rate of Media Router extension. """ scripts = ('localStorage["debug.console"] = true;' 'localStorage["debug.logs"] = "fine";') scripts += ('localStorage["mr.mirror.Settings.Overrides"] =' ' \'{"maxFrameRate":%s}\';') % str(frame_rate) self._execute_script_reload_extension( driver, extension_id, scripts) def upload_mirroring_logs_media_router(self, driver, extension_id): """Uploads MR mirroring logs for the latest mirroring session. @param driver: The webdriver instance of the browser. @param extension_id: The extension ID of Media Router extension. @return The report id in crash staging server or empty if there is nothing. """ report_id = None wait_time = 0 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.go_to_page() while not report_id and wait_time < 90: report_id = e2e_test_utils_page.execute_script( ('return localStorage["e2eTestService' '.castStreamingMirrorLogId"]')) time.sleep(self.short_wait_secs) wait_time += self.short_wait_secs if report_id: return report_id else: return '' def get_chrome_version(self, driver): """Returns the Chrome version that is being used for running test. @param driver: The chromedriver instance. @return The Chrome version. """ get_chrome_version_js = 'return window.navigator.appVersion;' app_version = driver.execute_script(get_chrome_version_js) for item in app_version.split(): if 'Chrome/' in item: return item.split('/')[1] return None def get_chrome_revision(self, driver): """Returns Chrome revision number that is being used for running test. @param driver: The chromedriver instance. @return The Chrome revision number. """ get_chrome_revision_js = ('return document.getElementById("version").' 'getElementsByTagName("span")[2].innerHTML;') driver.get('chrome://version') return driver.execute_script(get_chrome_revision_js) def get_extension_id_from_flag(self, extra_flags): """Gets the extension ID based on the whitelisted extension id flag. @param extra_flags: A string which contains all the extra chrome flags. @return The ID of the extension. Return None if nothing is found. """ extra_flags_list = extra_flags.split() for flag in extra_flags_list: if 'whitelisted-extension-id=' in flag: return flag.split('=')[1] return None def navigate_to_test_url(self, driver, url, fullscreen): """Navigates to a given URL. Click fullscreen button if needed. @param driver: The chromedriver instance. @param url: The URL of the site to navigate to. @param fullscreen: True and the video will play in full screen mode. Otherwise, set to False. """ driver.get(url) driver.refresh() if fullscreen: self.request_full_screen(driver) def request_full_screen(self, driver): """Requests full screen. @param driver: The chromedriver instance. """ try: driver.find_element_by_id('fsbutton').click() except selenium.common.exceptions.NoSuchElementException as err_msg: print 'Full screen button is not found. ' + str(err_msg) def set_focus_tab(self, driver, tab_handle): """Sets the focus on a tab. @param driver: The chromedriver instance. @param tab_handle: The chrome driver handle of the tab. """ driver.switch_to_window(tab_handle) driver.get_screenshot_as_base64() def output_dict_to_file(self, dictionary, file_name, path=None, sort_keys=False): """Outputs a dictionary into a file. @param dictionary: The dictionary to be output as JSON. @param file_name: The name of the file that is being output. @param path: The path of the file. The default is None. @param sort_keys: Sort dictionary by keys when output. False by default. """ if path is None: path = os.path.abspath(os.path.dirname(__file__)) # if json file exists, read the existing one and append to it json_file = os.path.join(path, file_name) if os.path.isfile(json_file) and dictionary: with open(json_file, 'r') as existing_json_data: json_data = json.load(existing_json_data) dictionary = dict(json_data.items() + dictionary.items()) output_json = json.dumps(dictionary, sort_keys=sort_keys) with open(json_file, 'w') as file_handler: file_handler.write(output_json) def compute_cpu_utilization(self, cpu_dict): """Generates the upper/lower bound and the average CPU consumption. @param cpu_dict: The dictionary that contains CPU usage every sec. @returns A dict that contains upper/lower bound and average cpu usage. """ cpu_bound = {} cpu_usage = sorted(cpu_dict.values()) cpu_bound['lower_bound'] = ( '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)]) cpu_bound['upper_bound'] = ( '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)]) cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage))) return cpu_bound def cpu_usage_interval(self, duration, interval=1): """Gets the CPU usage over a period of time based on interval. @param duration: The duration of getting the CPU usage. @param interval: The interval to check the CPU usage. Default is 1 sec. @return A dict that contains CPU usage over different time intervals. """ current_time = 0 cpu_usage = {} while current_time < duration: pre_times = self._get_system_times() time.sleep(interval) post_times = self._get_system_times() cpu_usage[current_time] = self._get_avg_cpu_usage( pre_times, post_times) current_time += interval return cpu_usage def _execute_script_reload_extension(self, driver, extension_id, script): """Executes javascript in the extension and reload it afterwards. @param driver: The chromedriver instance. @param extension_id: The id of Media Router extension. @param script: JavaScript to be executed. """ script += 'chrome.runtime.reload();' current_handle = driver.current_window_handle new_tab_handle = self._open_new_tab(driver) e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( driver, extension_id) e2e_test_utils_page.execute_script(script) driver.switch_to_window(new_tab_handle) def _get_avg_cpu_usage(self, pre_times, post_times): """Calculates the average CPU usage of two different periods of time. @param pre_times: The CPU usage information of the start point. @param post_times: The CPU usage information of the end point. @return Average CPU usage over a time period. """ diff_times = {} for field in self.cpu_fields: diff_times[field] = post_times[field] - pre_times[field] idle_time = sum(diff_times[field] for field in self.cpu_idle_fields) total_time = sum(diff_times[field] for field in self.cpu_fields) return float(total_time - idle_time) / total_time * 100.0 def _get_device_name(self, device_ip): """Gets all the Chromecast information through eureka_info page. @param device_ip: string, the IP address the device. @return Name of the device. """ response = urllib2.urlopen(self.device_info % device_ip).read() return json.loads(response).get('name') def _get_system_times(self): """Gets the CPU information from the system times. @return An list with CPU usage of different processes. """ proc_stat = utils.read_file('/proc/stat') for line in proc_stat.split('\n'): if line.startswith('cpu '): times = line[4:].strip().split(' ') times = [int(jiffies) for jiffies in times] return dict(zip(self.cpu_fields, times)) def _open_new_tab(self, driver): """Opens a new tab in Chrome. @param driver: The webdriver instance of the browser. @return Handle of the new tab. """ current_handles = driver.window_handles driver.execute_script('window.open("chrome://newtab")') utils.poll_for_condition( lambda: len(driver.window_handles) > len(current_handles), timeout=10, sleep_interval=1, desc='Wait for new tab to open.') return list(set(driver.window_handles) - set(current_handles))[0]