1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5 6import json 7import os 8import time 9 10import selenium 11 12from autotest_lib.client.bin import utils 13from extension_pages import e2e_test_utils 14from extension_pages import options 15 16 17class TestUtils(object): 18 """Contains all the helper functions for Chrome mirroring automation.""" 19 20 short_wait_secs = 3 21 step_timeout_secs = 60 22 cpu_fields = ['user', 'nice', 'system', 'idle', 'iowait', 'irq', 23 'softirq', 'steal', 'guest', 'guest_nice'] 24 cpu_idle_fields = ['idle', 'iowait'] 25 26 def __init__(self): 27 """Constructor""" 28 29 30 def set_mirroring_options(self, driver, extension_id, settings): 31 """Apply all the settings given by the user to the option page. 32 33 @param driver: The chromedriver instance of the test 34 @param extension_id: The id of the Cast extension 35 @param settings: The settings and information about the test 36 """ 37 options_page = options.OptionsPage(driver, extension_id) 38 options_page.open_hidden_options_menu() 39 time.sleep(self.short_wait_secs) 40 for key in settings.keys(): 41 options_page.set_value(key, settings[key]) 42 43 44 def start_v2_mirroring_test_utils( 45 self, driver, extension_id, receiver_ip, url, fullscreen, 46 udp_proxy_server=None, network_profile=None): 47 """Use test util page to start mirroring session on specific device. 48 49 @param driver: The chromedriver instance 50 @param extension_id: The id of the Cast extension 51 @param receiver_ip: The ip of the Eureka dongle to launch the activity 52 @param url: The URL to navigate to 53 @param fullscreen: click the fullscreen button or not 54 @param udp_proxy_server: the address of udp proxy server, 55 it should be a http address, http://<ip>:<port> 56 @param network_profile: the network profile, 57 it should be one of wifi, bad and evil. 58 @return True if the function finishes 59 """ 60 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 61 driver, extension_id) 62 time.sleep(self.short_wait_secs) 63 tab_handles = driver.window_handles 64 e2e_test_utils_page.receiver_ip_or_name_v2_text_box().set_value( 65 receiver_ip) 66 e2e_test_utils_page.url_to_open_v2_text_box().set_value(url) 67 if udp_proxy_server: 68 e2e_test_utils_page.udp_proxy_server_text_box().set_value( 69 udp_proxy_server) 70 if network_profile: 71 e2e_test_utils_page.network_profile_text_box().set_value( 72 network_profile) 73 time.sleep(self.short_wait_secs) 74 e2e_test_utils_page.open_then_mirror_v2_button().click() 75 time.sleep(self.short_wait_secs) 76 all_handles = driver.window_handles 77 video_handle = [x for x in all_handles if x not in tab_handles].pop() 78 driver.switch_to_window(video_handle) 79 self.navigate_to_test_url(driver, url, fullscreen) 80 return True 81 82 83 def stop_v2_mirroring_test_utils(self, driver, extension_id): 84 """Use test util page to stop a mirroring session on a specific device. 85 86 @param driver: The chromedriver instance 87 @param extension_id: The id of the Cast extension 88 @param activity_id: The id of the mirroring activity 89 """ 90 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage(driver, 91 extension_id) 92 e2e_test_utils_page.go_to_page() 93 time.sleep(self.short_wait_secs) 94 e2e_test_utils_page.stop_v2_mirroring_button().click() 95 96 97 def start_v2_mirroring_sdk(self, driver, device_ip, url, extension_id): 98 """Use SDK to start a mirroring session on a specific device. 99 100 @param driver: The chromedriver instance 101 @param device_ip: The IP of the Eureka device 102 @param url: The URL to navigate to 103 @param extension_id: The id of the Cast extension 104 @return True if the function finishes 105 @raise RuntimeError for timeouts 106 """ 107 self.set_auto_testing_ip(driver, extension_id, device_ip) 108 self.nagviate(driver, url, False) 109 time.sleep(self.short_wait_secs) 110 driver.execute_script('loadScript()') 111 self._wait_for_result( 112 lambda: driver.execute_script('return isSuccessful'), 113 'Timeout when initiating mirroring... ...') 114 driver.execute_script('startV2Mirroring()') 115 self._wait_for_result( 116 lambda: driver.execute_script('return isSuccessful'), 117 'Timeout when triggering mirroring... ...') 118 return True 119 120 121 def stop_v2_mirroring_sdk(self, driver, activity_id=None): 122 """Use SDK to stop the mirroring activity in Chrome. 123 124 @param driver: The chromedriver instance 125 @param activity_id: The id of the mirroring activity 126 @raise RuntimeError for timeouts 127 """ 128 driver.execute_script('stopV2Mirroring()') 129 self._wait_for_result( 130 lambda: driver.execute_script('return isSuccessful'), 131 self.step_timeout_secs) 132 133 134 def set_auto_testing_ip(self, driver, extension_id, device_ip): 135 """Set the auto testing IP on the extension page. 136 137 @param driver: The chromedriver instance 138 @param extension_id: The id of the Cast extension 139 @param device_ip: The IP of the device to test against 140 """ 141 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 142 driver, extension_id) 143 e2e_test_utils_page.execute_script( 144 'localStorage["AutoTestingIp"] = "%s";' % device_ip) 145 146 147 def upload_v2_mirroring_logs(self, driver, extension_id): 148 """Upload v2 mirroring logs for the latest mirroring session. 149 150 @param driver: The chromedriver instance of the browser 151 @param extension_id: The extension ID of the Cast extension 152 @return The report id in crash staging server. 153 @raises RuntimeError if an error occurred during uploading 154 """ 155 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 156 driver, extension_id) 157 e2e_test_utils_page.go_to_page() 158 time.sleep(self.short_wait_secs) 159 e2e_test_utils_page.upload_v2_mirroring_logs_button().click() 160 report_id = self._wait_for_result( 161 e2e_test_utils_page.v2_mirroring_logs_scroll_box().get_value, 162 'Failed to get v2 mirroring logs') 163 if 'Failed to upload logs' in report_id: 164 raise RuntimeError('Failed to get v2 mirroring logs') 165 return report_id 166 167 168 def get_chrome_version(self, driver): 169 """Return the Chrome version that is being used for running test. 170 171 @param driver: The chromedriver instance 172 @return The Chrome version 173 """ 174 get_chrome_version_js = 'return window.navigator.appVersion;' 175 app_version = driver.execute_script(get_chrome_version_js) 176 for item in app_version.split(): 177 if 'Chrome/' in item: 178 return item.split('/')[1] 179 return None 180 181 182 def get_chrome_revision(self, driver): 183 """Return Chrome revision number that is being used for running test. 184 185 @param driver: The chromedriver instance 186 @return The Chrome revision number 187 """ 188 get_chrome_revision_js = ('return document.getElementById("version").' 189 'getElementsByTagName("span")[2].innerHTML;') 190 driver.get('chrome://version') 191 return driver.execute_script(get_chrome_revision_js) 192 193 194 def get_extension_id_from_flag(self, extra_flags): 195 """Gets the extension ID based on the whitelisted extension id flag. 196 197 @param extra_flags: A string which contains all the extra chrome flags 198 @return The ID of the extension. Return None if nothing is found. 199 """ 200 extra_flags_list = extra_flags.split() 201 for flag in extra_flags_list: 202 if 'whitelisted-extension-id=' in flag: 203 return flag.split('=')[1] 204 return None 205 206 207 def navigate_to_test_url(self, driver, url, fullscreen): 208 """Navigate to a given URL. Click fullscreen button if needed. 209 210 @param driver: The chromedriver instance 211 @param url: The URL of the site to navigate to 212 @param fullscreen: True and the video will play in full screen mode. 213 Otherwise, set to False 214 """ 215 driver.get(url) 216 driver.refresh() 217 if fullscreen: 218 self.request_full_screen(driver) 219 220 221 def request_full_screen(self, driver): 222 """Request full screen. 223 224 @param driver: The chromedriver instance 225 """ 226 try: 227 time.sleep(self.short_wait_secs) 228 driver.find_element_by_id('fsbutton').click() 229 except selenium.common.exceptions.NoSuchElementException as error_message: 230 print 'Full screen button is not found. ' + str(error_message) 231 232 233 def set_focus_tab(self, driver, tab_handle): 234 """Set the focus on a tab. 235 236 @param driver: The chromedriver instance 237 @param tab_handle: The chrome driver handle of the tab 238 """ 239 driver.switch_to_window(tab_handle) 240 driver.get_screenshot_as_base64() 241 242 243 def block_setup_dialog(self, driver, extension_id): 244 """Tab cast through the extension. 245 246 @param driver: A chromedriver instance that has the extension loaded. 247 @param extension_id: Id of the extension to use. 248 """ 249 e2e_test_utils_page = e2e_test_utils.E2ETestUtilsPage( 250 driver, extension_id) 251 e2e_test_utils_page.go_to_page() 252 time.sleep(self.short_wait_secs) 253 driver.execute_script( 254 'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"') 255 256 257 def close_popup_tabs(self, driver): 258 """Close any popup windows the extension might open by default. 259 260 Since we're going to handle the extension ourselves all we need is 261 the main browser window with a single tab. The safest way to handle 262 the popup however, is to close the currently active tab, so we don't 263 mess with chromedrivers ui debugger. 264 265 @param driver: Chromedriver instance. 266 @raises Exception If you close the tab associated with 267 the ui debugger. 268 """ 269 # TODO: There are several, albeit hacky ways, to handle this popup 270 # that might need to change with different versions of the extension 271 # until the core issue is resolved. See crbug.com/338399. 272 current_tab_handle = driver.current_window_handle 273 for handle in driver.window_handles: 274 if current_tab_handle != handle: 275 try: 276 time.sleep(self.short_wait_secs) 277 driver.switch_to_window(handle) 278 driver.close() 279 except: 280 pass 281 driver.switch_to_window(current_tab_handle) 282 283 284 def output_dict_to_file(self, dictionary, file_name, 285 path=None, sort_keys=False): 286 """Output a dictionary into a file. 287 288 @param dictionary: The dictionary to be output as JSON 289 @param file_name: The name of the file that is being output 290 @param path: The path of the file. The default is None 291 @param sort_keys: Sort dictionary by keys when output. False by default 292 """ 293 if path is None: 294 path = os.path.abspath(os.path.dirname(__file__)) 295 # if json file exists, read the existing one and append to it 296 json_file = os.path.join(path, file_name) 297 if os.path.isfile(json_file) and dictionary: 298 with open(json_file, 'r') as existing_json_data: 299 json_data = json.load(existing_json_data) 300 dictionary = dict(json_data.items() + dictionary.items()) 301 output_json = json.dumps(dictionary, sort_keys=sort_keys) 302 with open(json_file, 'w') as file_handler: 303 file_handler.write(output_json) 304 305 306 def compute_cpu_utilization(self, cpu_dict): 307 """Generate the upper/lower bound and the average CPU consumption. 308 309 @param cpu_dict: The dictionary that contains CPU usage every sec. 310 @returns A dict that contains upper/lower bound and average cpu usage. 311 """ 312 cpu_bound = {} 313 cpu_usage = sorted(cpu_dict.values()) 314 cpu_bound['lower_bound'] = ( 315 '%.2f' % cpu_usage[int(len(cpu_usage) * 10.0 / 100.0)]) 316 cpu_bound['upper_bound'] = ( 317 '%.2f' % cpu_usage[int(len(cpu_usage) * 90.0 / 100.0)]) 318 cpu_bound['average'] = '%.2f' % (sum(cpu_usage) / float(len(cpu_usage))) 319 return cpu_bound 320 321 322 def cpu_usage_interval(self, duration, interval=1): 323 """Get the CPU usage over a period of time based on interval. 324 325 @param duration: The duration of getting the CPU usage. 326 @param interval: The interval to check the CPU usage. Default is 1 sec. 327 @return A dict that contains CPU usage over different time intervals. 328 """ 329 current_time = 0 330 cpu_usage = {} 331 while current_time < duration: 332 pre_times = self._get_system_times() 333 time.sleep(interval) 334 post_times = self._get_system_times() 335 cpu_usage[current_time] = self._get_avg_cpu_usage( 336 pre_times, post_times) 337 current_time += interval 338 return cpu_usage 339 340 341 def _get_avg_cpu_usage(self, pre_times, post_times): 342 """Calculate the average CPU usage of two different periods of time. 343 344 @param pre_times: The CPU usage information of the start point. 345 @param post_times: The CPU usage information of the end point. 346 @return Average CPU usage over a time period. 347 """ 348 diff_times = {} 349 for field in self.cpu_fields: 350 diff_times[field] = post_times[field] - pre_times[field] 351 352 idle_time = sum(diff_times[field] for field in self.cpu_idle_fields) 353 total_time = sum(diff_times[field] for field in self.cpu_fields) 354 return float(total_time - idle_time) / total_time * 100.0 355 356 357 def _get_system_times(self): 358 """Get the CPU information from the system times. 359 360 @return An list with CPU usage of different processes. 361 """ 362 proc_stat = utils.read_file('/proc/stat') 363 for line in proc_stat.split('\n'): 364 if line.startswith('cpu '): 365 times = line[4:].strip().split(' ') 366 times = [int(jiffies) for jiffies in times] 367 return dict(zip(self.cpu_fields, times)) 368 369 370 def _wait_for_result(self, get_result, error_message): 371 """Wait for the result. 372 373 @param get_result: the function to get result. 374 @param error_message: the error message in the exception 375 if it is failed to get result. 376 @return The result. 377 @raises RuntimeError if it is failed to get result within 378 self.step_timeout_secs. 379 """ 380 start = time.time() 381 while (((time.time() - start) < self.step_timeout_secs) 382 and not get_result()): 383 time.sleep(self.step_timeout_secs/10.0) 384 if not get_result(): 385 raise RuntimeError(error_message) 386 return get_result() 387