1# Lint as: python2, python3 2# Copyright 2016 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6import json, logging, os, re, tempfile, time, zipfile 7 8from six.moves import urllib 9 10from autotest_lib.client.bin import test 11from autotest_lib.client.common_lib import error, file_utils 12from autotest_lib.client.common_lib.cros import chromedriver 13 14 15UPDATE_CHECK_URL = ('https://clients2.google.com/service/update2/' 16 'crx?x=id%%3D%s%%26v%%3D0%%26uc&prodversion=32.0.0.0') 17EXTENSION_ID_BETA = 'dliochdbjfkdbacpmhlcpmleaejidimm' 18CHROME_EXTENSION_BASE = 'chrome-extension://' 19EXTENSION_OPTIONS_PAGE = 'options.html' 20RECEIVER_LOG = 'receiver_debug.log' 21EXTENSION_TEST_UTILS_PAGE = 'e2e_test_utils.html' 22TEST_URL = 'http://www.vimeo.com' 23GET_LOG_URL = 'http://%s:8008/setup/get_log_report' 24KEY = ('MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC+hlN5FB+tjCsBszmBIvIcD/djLLQm2z' 25 'ZfFygP4U4/o++ZM91EWtgII10LisoS47qT2TIOg4Un4+G57elZ9PjEIhcJfANqkYrD3t9d' 26 'pEzMNr936TLB2u683B5qmbB68Nq1Eel7KVc+F0BqhBondDqhvDvGPEV0vBsbErJFlNH7SQ' 27 'IDAQAB') 28WAIT_SECONDS = 3 29MIRRORING_DURATION_SECONDS = 20 30 31 32class network_CastTDLS(test.test): 33 """Test loading the sonic extension through chromedriver.""" 34 version = 1 35 36 def _navigate_url(self, driver, url): 37 """Go to options page if current page is not options page. 38 39 @param driver: The chromedriver instance. 40 @param url: The URL to navigate to. 41 """ 42 if driver.current_url != url: 43 driver.get(url) 44 driver.refresh() 45 46 def _set_focus_tab(self, driver, tab_handle): 47 """Set the focus on a tab. 48 49 @param driver: The chromedriver instance. 50 @param tab_handle: The chrome driver handle of the tab. 51 """ 52 driver.switch_to_window(tab_handle) 53 driver.get_screenshot_as_base64() 54 55 def _block_setup_dialog(self, driver, extension_id): 56 """Tab cast through the extension. 57 58 @param driver: A chromedriver instance that has the extension loaded. 59 @param extension_id: Id of the extension to use. 60 """ 61 test_utils_page = '%s%s/%s' % ( 62 CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE) 63 self._navigate_url(driver, test_utils_page) 64 time.sleep(WAIT_SECONDS) 65 driver.execute_script( 66 'localStorage["blockChromekeySetupAutoLaunchOnInstall"] = "true"') 67 68 def _close_popup_tabs(self, driver): 69 """Close any popup windows the extension might open by default. 70 71 Since we're going to handle the extension ourselves all we need is 72 the main browser window with a single tab. The safest way to handle 73 the popup however, is to close the currently active tab, so we don't 74 mess with chromedrivers ui debugger. 75 76 @param driver: Chromedriver instance. 77 @raises Exception If you close the tab associated with the ui debugger. 78 """ 79 current_tab_handle = driver.current_window_handle 80 for handle in driver.window_handles: 81 if handle != current_tab_handle: 82 try: 83 time.sleep(WAIT_SECONDS) 84 driver.switch_to_window(handle) 85 driver.close() 86 except: 87 pass 88 driver.switch_to_window(current_tab_handle) 89 90 def _download_extension_crx(self, output_file): 91 """Download Cast (Beta) extension from Chrome Web Store to a file. 92 93 @param output_file: The output file of the extension. 94 """ 95 update_check_url = UPDATE_CHECK_URL % EXTENSION_ID_BETA 96 response = urllib.request.urlopen(update_check_url).read() 97 logging.info('Response: %s', response) 98 pattern = r'codebase="(.*crx)"' 99 regex = re.compile(pattern) 100 match = regex.search(response) 101 extension_download_url = match.groups()[0] 102 logging.info('Extension download link: %s', extension_download_url) 103 file_utils.download_file(extension_download_url, output_file) 104 105 def _unzip_file(self, zip_file, output_folder): 106 """Unzip a zip file to a folder. 107 108 @param zip_file: Path of the zip file. 109 @param output_folder: Output file path. 110 """ 111 zip_ref = zipfile.ZipFile(zip_file, 'r') 112 zip_ref.extractall(output_folder) 113 zip_ref.close() 114 115 def _modify_manifest(self, extension_folder): 116 """Update the Cast extension manifest file by adding a key to it. 117 118 @param extension_folder: Path to the unzipped extension folder. 119 """ 120 manifest_file = os.path.join(extension_folder, 'manifest.json') 121 with open(manifest_file) as f: 122 manifest_dict = json.load(f) 123 manifest_dict['key'] = KEY 124 with open(manifest_file, 'wb') as f: 125 json.dump(manifest_dict, f) 126 127 def _turn_on_tdls(self, driver, extension_id): 128 """Turn on TDLS in the extension option page. 129 130 @param driver: The chromedriver instance of the test. 131 @param extension_id: The id of the Cast extension. 132 """ 133 turn_on_tdls_url = ('%s%s/%s''?disableTDLS=false') % ( 134 CHROME_EXTENSION_BASE, extension_id, EXTENSION_OPTIONS_PAGE) 135 self._navigate_url(driver, turn_on_tdls_url) 136 time.sleep(WAIT_SECONDS) 137 138 def _start_mirroring(self, driver, extension_id, device_ip, url): 139 """Use test util page to start mirroring session on specific device. 140 141 @param driver: The chromedriver instance. 142 @param extension_id: The id of the Cast extension. 143 @param device_ip: The IP of receiver device to launch mirroring. 144 @param url: The URL to mirror. 145 """ 146 test_utils_page = '%s%s/%s' % ( 147 CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE) 148 self._navigate_url(driver, test_utils_page) 149 time.sleep(WAIT_SECONDS) 150 tab_handles = driver.window_handles 151 driver.find_element_by_id('receiverIpAddressV2').send_keys(device_ip) 152 driver.find_element_by_id('urlToOpenV2').send_keys(url) 153 time.sleep(WAIT_SECONDS) 154 driver.find_element_by_id('mirrorUrlV2').click() 155 time.sleep(WAIT_SECONDS) 156 all_handles = driver.window_handles 157 test_handle = [x for x in all_handles if x not in tab_handles].pop() 158 driver.switch_to_window(test_handle) 159 driver.refresh() 160 161 def _stop_mirroring(self, driver, extension_id): 162 """Use test util page to stop a mirroring session on a specific device. 163 164 @param driver: The chromedriver instance. 165 @param extension_id: The id of the Cast extension. 166 """ 167 test_utils_page = '%s%s/%s' % ( 168 CHROME_EXTENSION_BASE, extension_id, EXTENSION_TEST_UTILS_PAGE) 169 self._navigate_url(driver, test_utils_page) 170 time.sleep(WAIT_SECONDS) 171 driver.find_element_by_id('stopV2Mirroring').click() 172 173 def _check_tdls_status(self, device_ip): 174 """Check the TDLS status of a mirroring session using receiver's log. 175 176 @param device_ip: IP of receiver device used in the mirroring session. 177 @raises error.TestFail If there is log about TDLS status. 178 @raises error.TestFail If TDLS status is invalid. 179 @raises error.TestFail TDLS is not being used in the mirroring session. 180 """ 181 response = urllib.request.urlopen(GET_LOG_URL % device_ip).read() 182 logging.info('Receiver log is under: %s', self.debugdir) 183 with open(os.path.join(self.debugdir, RECEIVER_LOG), 'wb') as f: 184 f.write(response) 185 tdls_status_list = re.findall( 186 r'.*TDLS status.*last TDLS status:.*', response) 187 if not len(tdls_status_list): 188 raise error.TestFail('There is no TDLS log on %s.', device_ip) 189 pattern = r'last TDLS status: (.*)' 190 regex = re.compile(pattern) 191 match = regex.search(tdls_status_list[-1]) 192 if not len(match.groups()): 193 raise error.TestFail('Invalid TDLS status.') 194 if match.groups()[0].lower() != 'enabled': 195 raise error.TestFail( 196 'TDLS is not initiated in the last mirroring session.') 197 198 def initialize(self): 199 """Initialize the test. 200 201 @param extension_dir: Directory of a custom extension. 202 If one isn't supplied, the latest ToT extension is 203 downloaded and loaded into chromedriver. 204 @param live: Use a live url if True. Start a test server 205 and server a hello world page if False. 206 """ 207 super(network_CastTDLS, self).initialize() 208 self._tmp_folder = tempfile.mkdtemp() 209 210 211 def cleanup(self): 212 """Clean up the test environment, e.g., stop local http server.""" 213 super(network_CastTDLS, self).cleanup() 214 215 216 def run_once(self, device_ip): 217 """Run the test code. 218 219 @param device_ip: Device IP to use in the test. 220 """ 221 logging.info('Download Cast extension ... ...') 222 crx_file = tempfile.NamedTemporaryFile( 223 dir=self._tmp_folder, suffix='.crx').name 224 unzip_crx_folder = tempfile.mkdtemp(dir=self._tmp_folder) 225 self._download_extension_crx(crx_file) 226 self._unzip_file(crx_file, unzip_crx_folder) 227 self._modify_manifest(unzip_crx_folder) 228 kwargs = { 'extension_paths': [unzip_crx_folder] } 229 with chromedriver.chromedriver(**kwargs) as chromedriver_instance: 230 driver = chromedriver_instance.driver 231 extension_id = chromedriver_instance.get_extension( 232 unzip_crx_folder).extension_id 233 # Close popup dialog if there is any 234 self._close_popup_tabs(driver) 235 self._block_setup_dialog(driver, extension_id) 236 time.sleep(WAIT_SECONDS) 237 logging.info('Enable TDLS in extension: %s', extension_id) 238 self._turn_on_tdls(driver, extension_id) 239 extension_tab_handle = driver.current_window_handle 240 logging.info('Start mirroring on device: %s', device_ip) 241 self._start_mirroring( 242 driver, extension_id, device_ip, TEST_URL) 243 time.sleep(MIRRORING_DURATION_SECONDS) 244 self._set_focus_tab(driver, extension_tab_handle) 245 driver.switch_to_window(extension_tab_handle) 246 logging.info('Stop mirroring on device: %s', device_ip) 247 self._stop_mirroring(driver, extension_id) 248 time.sleep(WAIT_SECONDS * 3) 249 logging.info('Verify TDLS status in the mirroring session ... ... ') 250 self._check_tdls_status(device_ip) 251