# Copyright 2015 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Provides fakes for several of Telemetry's internal objects. These allow code like story_runner and Benchmark to be run and tested without compiling or starting a browser. Class names prepended with an underscore are intended to be implementation details, and should not be subclassed; however, some, like _FakeBrowser, have public APIs that may need to be called in tests. """ from telemetry.internal.backends.chrome_inspector import websocket from telemetry.internal.browser import browser_options from telemetry.internal.platform import system_info from telemetry.page import shared_page_state from telemetry.util import image_util from telemetry.testing.internal import fake_gpu_info # Classes and functions which are intended to be part of the public # fakes API. class FakePlatform(object): def __init__(self): self._network_controller = None self._tracing_controller = None @property def is_host_platform(self): raise NotImplementedError @property def network_controller(self): if self._network_controller is None: self._network_controller = _FakeNetworkController() return self._network_controller @property def tracing_controller(self): if self._tracing_controller is None: self._tracing_controller = _FakeTracingController() return self._tracing_controller def CanMonitorThermalThrottling(self): return False def IsThermallyThrottled(self): return False def HasBeenThermallyThrottled(self): return False def GetDeviceTypeName(self): raise NotImplementedError def GetArchName(self): raise NotImplementedError def GetOSName(self): raise NotImplementedError def GetOSVersionName(self): raise NotImplementedError def StopAllLocalServers(self): pass class FakeLinuxPlatform(FakePlatform): def __init__(self): super(FakeLinuxPlatform, self).__init__() self.screenshot_png_data = None self.http_server_directories = [] self.http_server = FakeHTTPServer() @property def is_host_platform(self): return True def GetDeviceTypeName(self): return 'Desktop' def GetArchName(self): return 'x86_64' def GetOSName(self): return 'linux' def GetOSVersionName(self): return 'trusty' def CanTakeScreenshot(self): return bool(self.screenshot_png_data) def TakeScreenshot(self, file_path): if not self.CanTakeScreenshot(): raise NotImplementedError img = image_util.FromBase64Png(self.screenshot_png_data) image_util.WritePngFile(img, file_path) return True def SetHTTPServerDirectories(self, paths): self.http_server_directories.append(paths) class FakeHTTPServer(object): def UrlOf(self, url): del url # unused return 'file:///foo' class FakePossibleBrowser(object): def __init__(self): self._returned_browser = _FakeBrowser(FakeLinuxPlatform()) self.browser_type = 'linux' self.supports_tab_control = False self.is_remote = False @property def returned_browser(self): """The browser object that will be returned through later API calls.""" return self._returned_browser def Create(self, finder_options): del finder_options # unused return self.returned_browser @property def platform(self): """The platform object from the returned browser. To change this or set it up, change the returned browser's platform. """ return self.returned_browser.platform def IsRemote(self): return self.is_remote def SetCredentialsPath(self, _): pass class FakeSharedPageState(shared_page_state.SharedPageState): def __init__(self, test, finder_options, story_set): super(FakeSharedPageState, self).__init__(test, finder_options, story_set) def _GetPossibleBrowser(self, test, finder_options): p = FakePossibleBrowser() self.ConfigurePossibleBrowser(p) return p def ConfigurePossibleBrowser(self, possible_browser): """Override this to configure the PossibleBrowser. Can make changes to the browser's configuration here via e.g.: possible_browser.returned_browser.returned_system_info = ... """ pass def DidRunStory(self, results): # TODO(kbr): add a test which throws an exception from DidRunStory # to verify the fix from https://crrev.com/86984d5fc56ce00e7b37ebe . super(FakeSharedPageState, self).DidRunStory(results) class FakeSystemInfo(system_info.SystemInfo): def __init__(self, model_name='', gpu_dict=None): if gpu_dict == None: gpu_dict = fake_gpu_info.FAKE_GPU_INFO super(FakeSystemInfo, self).__init__(model_name, gpu_dict) class _FakeBrowserFinderOptions(browser_options.BrowserFinderOptions): def __init__(self, *args, **kwargs): browser_options.BrowserFinderOptions.__init__(self, *args, **kwargs) self.fake_possible_browser = FakePossibleBrowser() def CreateBrowserFinderOptions(browser_type=None): """Creates fake browser finder options for discovering a browser.""" return _FakeBrowserFinderOptions(browser_type=browser_type) # Internal classes. Note that end users may still need to both call # and mock out methods of these classes, but they should not be # subclassed. class _FakeBrowser(object): def __init__(self, platform): self._tabs = _FakeTabList(self) self._returned_system_info = FakeSystemInfo() self._platform = platform self._browser_type = 'release' @property def platform(self): return self._platform @platform.setter def platform(self, incoming): """Allows overriding of the fake browser's platform object.""" assert isinstance(incoming, FakePlatform) self._platform = incoming @property def returned_system_info(self): """The object which will be returned from calls to GetSystemInfo.""" return self._returned_system_info @returned_system_info.setter def returned_system_info(self, incoming): """Allows overriding of the returned SystemInfo object. Incoming argument must be an instance of FakeSystemInfo.""" assert isinstance(incoming, FakeSystemInfo) self._returned_system_info = incoming @property def browser_type(self): """The browser_type this browser claims to be ('debug', 'release', etc.)""" return self._browser_type @browser_type.setter def browser_type(self, incoming): """Allows setting of the browser_type.""" self._browser_type = incoming @property def credentials(self): return _FakeCredentials() def Close(self): pass @property def supports_system_info(self): return True def GetSystemInfo(self): return self.returned_system_info @property def supports_tab_control(self): return True @property def tabs(self): return self._tabs class _FakeCredentials(object): def WarnIfMissingCredentials(self, _): pass class _FakeTracingController(object): def __init__(self): self._is_tracing = False def StartTracing(self, tracing_config, timeout=10): self._is_tracing = True del tracing_config del timeout def StopTracing(self): self._is_tracing = False @property def is_tracing_running(self): return self._is_tracing def ClearStateIfNeeded(self): pass class _FakeNetworkController(object): def __init__(self): self.wpr_mode = None self.extra_wpr_args = None self.is_replay_active = False self.is_open = False def Open(self, wpr_mode, extra_wpr_args): self.wpr_mode = wpr_mode self.extra_wpr_args = extra_wpr_args self.is_open = True def Close(self): self.wpr_mode = None self.extra_wpr_args = None self.is_replay_active = False self.is_open = False def StartReplay(self, archive_path, make_javascript_deterministic=False): del make_javascript_deterministic # Unused. assert self.is_open self.is_replay_active = archive_path is not None def StopReplay(self): self.is_replay_active = False class _FakeTab(object): def __init__(self, browser, tab_id): self._browser = browser self._tab_id = str(tab_id) self._collect_garbage_count = 0 self.test_png = None @property def collect_garbage_count(self): return self._collect_garbage_count @property def id(self): return self._tab_id @property def browser(self): return self._browser def WaitForDocumentReadyStateToBeComplete(self, timeout=0): pass def Navigate(self, url, script_to_evaluate_on_commit=None, timeout=0): pass def WaitForDocumentReadyStateToBeInteractiveOrBetter(self, timeout=0): pass def IsAlive(self): return True def CloseConnections(self): pass def CollectGarbage(self): self._collect_garbage_count += 1 def Close(self): pass @property def screenshot_supported(self): return self.test_png is not None def Screenshot(self): assert self.screenshot_supported, 'Screenshot is not supported' return image_util.FromBase64Png(self.test_png) class _FakeTabList(object): _current_tab_id = 0 def __init__(self, browser): self._tabs = [] self._browser = browser def New(self, timeout=300): del timeout # unused type(self)._current_tab_id += 1 t = _FakeTab(self._browser, type(self)._current_tab_id) self._tabs.append(t) return t def __iter__(self): return self._tabs.__iter__() def __len__(self): return len(self._tabs) def __getitem__(self, index): return self._tabs[index] def GetTabById(self, identifier): """The identifier of a tab can be accessed with tab.id.""" for tab in self._tabs: if tab.id == identifier: return tab return None class FakeInspectorWebsocket(object): _NOTIFICATION_EVENT = 1 _NOTIFICATION_CALLBACK = 2 """A fake InspectorWebsocket. A fake that allows tests to send pregenerated data. Normal InspectorWebsockets allow for any number of domain handlers. This fake only allows up to 1 domain handler, and assumes that the domain of the response always matches that of the handler. """ def __init__(self, mock_timer): self._mock_timer = mock_timer self._notifications = [] self._response_handlers = {} self._pending_callbacks = {} self._handler = None def RegisterDomain(self, _, handler): self._handler = handler def AddEvent(self, method, params, time): if self._notifications: assert self._notifications[-1][1] < time, ( 'Current response is scheduled earlier than previous response.') response = {'method': method, 'params': params} self._notifications.append((response, time, self._NOTIFICATION_EVENT)) def AddAsyncResponse(self, method, result, time): if self._notifications: assert self._notifications[-1][1] < time, ( 'Current response is scheduled earlier than previous response.') response = {'method': method, 'result': result} self._notifications.append((response, time, self._NOTIFICATION_CALLBACK)) def AddResponseHandler(self, method, handler): self._response_handlers[method] = handler def SyncRequest(self, request, *args, **kwargs): del args, kwargs # unused handler = self._response_handlers[request['method']] return handler(request) if handler else None def AsyncRequest(self, request, callback): self._pending_callbacks.setdefault(request['method'], []).append(callback) def SendAndIgnoreResponse(self, request): pass def Connect(self, _): pass def DispatchNotifications(self, timeout): current_time = self._mock_timer.time() if not self._notifications: self._mock_timer.SetTime(current_time + timeout + 1) raise websocket.WebSocketTimeoutException() response, time, kind = self._notifications[0] if time - current_time > timeout: self._mock_timer.SetTime(current_time + timeout + 1) raise websocket.WebSocketTimeoutException() self._notifications.pop(0) self._mock_timer.SetTime(time + 1) if kind == self._NOTIFICATION_EVENT: self._handler(response) elif kind == self._NOTIFICATION_CALLBACK: callback = self._pending_callbacks.get(response['method']).pop(0) callback(response) else: raise Exception('Unexpected response type')