#!/usr/bin/env python # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import cgi import json import logging import logging.handlers import os import sys import common from autotest_lib.client.bin import utils from autotest_lib.client.common_lib.cros import chrome, xmlrpc_server from autotest_lib.client.cros import constants class InteractiveXmlRpcDelegate(xmlrpc_server.XmlRpcDelegate): """Exposes methods called remotely to create interactive tests. All instance methods of this object without a preceding '_' are exposed via an XML-RPC server. This is not a stateless handler object, which means that if you store state inside the delegate, that state will remain around for future calls. """ def login(self): """Login to the system and open a tab. The tab opened is used by other methods on this server to interact with the user. @return True. """ self._chrome = chrome.Chrome(init_network_controller=True) self._chrome.browser.platform.SetHTTPServerDirectories( os.path.dirname(sys.argv[0])) self._tab = self._chrome.browser.tabs[0] self._tab.Navigate( self._chrome.browser.platform.http_server.UrlOf('shell.html')) return True def set_output(self, html): """Replace the contents of the tab. @param html: HTML document to replace tab contents with. @return True. """ # JSON does a better job of escaping HTML for JavaScript than we could # with string.replace(). html_escaped = json.dumps(html) # Use JavaScript to append the output and scroll to the bottom of the # open tab. self._tab.ExecuteJavaScript('document.body.innerHTML = %s; ' % html_escaped) self._tab.Activate() self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() return True def append_output(self, html): """Append HTML to the contents of the tab. @param html: HTML to append to the existing tab contents. @return True. """ # JSON does a better job of escaping HTML for JavaScript than we could # with string.replace(). html_escaped = json.dumps(html) # Use JavaScript to append the output and scroll to the bottom of the # open tab. self._tab.ExecuteJavaScript( ('document.body.innerHTML += %s; ' % html_escaped) + 'window.scrollTo(0, document.body.scrollHeight);') self._tab.Activate() self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() return True def append_buttons(self, *args): """Append confirmation buttons to the tab. Each button is given an index, 0 for the first button, 1 for the second, and so on. @param title...: Title of button to append. @return True. """ html = '' index = 0 for title in args: onclick = 'submit_button(%d)' % index html += ('' % ( cgi.escape(title), cgi.escape(onclick))) index += 1 return self.append_output(html) def wait_for_button(self, timeout): """Wait for a button to be clicked. Call append_buttons() before this to add buttons to the document. @param timeout: Maximum time, in seconds, to wait for a click. @return index of button that was clicked. """ # Wait for the button to be clicked. utils.poll_for_condition( condition=lambda: self._tab.EvaluateJavaScript('window.__ready') == 1, desc='User clicked on button.', timeout=timeout) # Fetch the result. result = self._tab.EvaluateJavaScript('window.__result') # Reset for the next button. self._tab.ExecuteJavaScript( 'window.__ready = 0; ' 'window.__result = null;') return result def check_for_button(self): """Check whether a button has been clicked. Call append_buttons() before this to add buttons to the document. @return index of button that was clicked or -1 if no button has been clicked. """ if not self._tab.EvaluateJavaScript('window.__ready'): return -1 # Fetch the result. result = self._tab.EvaluateJavaScript('window.__result') # Reset for the next button. self._tab.ExecuteJavaScript( 'window.__ready = 0; ' 'window.__result = null;') return result def append_list(self, name): """Append a results list to the contents of the tab. @param name: Name to use for making modifications to the list. @return True. """ html = '
' % cgi.escape(name) return self.append_output(html) def append_list_item(self, list_name, item_name, html): """Append an item to a results list. @param list_name: Name of list provided to append_list(). @param item_name: Name to use for making modifications to the item. @param html: HTML to place in the list item. @return True. """ # JSON does a better job of escaping HTML for JavaScript than we could # with string.replace(). item_html = '""' % cgi.escape(item_name) # Use JavaScript to append the output. self._tab.ExecuteJavaScript( 'document.getElementById("%s").innerHTML += %s; ' % ( cgi.escape(list_name), item_html)) self._tab.Activate() self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() return self.replace_list_item(item_name, html) def replace_list_item(self, item_name, html): """Replace an item in a results list. @param item_name: Name of item provided to append_list_item(). @param html: HTML to place in the list item. @return True. """ # JSON does a better job of escaping HTML for JavaScript than we could # with string.replace(). html_escaped = json.dumps(html) # Use JavaScript to append the output. self._tab.ExecuteJavaScript( 'document.getElementById("%s").innerHTML = %s; ' % ( cgi.escape(item_name), html_escaped)) self._tab.Activate() self._tab.WaitForDocumentReadyStateToBeInteractiveOrBetter() return True def close(self): """Close the browser. @return True. """ if hasattr(self, '_chrome'): self._chrome.close(); return True if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) handler = logging.handlers.SysLogHandler(address='/dev/log') formatter = logging.Formatter( 'interactive_xmlrpc_server: [%(levelname)s] %(message)s') handler.setFormatter(formatter) logging.getLogger().addHandler(handler) logging.debug('interactive_xmlrpc_server main...') server = xmlrpc_server.XmlRpcServer( 'localhost', constants.INTERACTIVE_XMLRPC_SERVER_PORT) server.register_delegate(InteractiveXmlRpcDelegate()) server.run()