1# Lint as: python2, python3 2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6from __future__ import absolute_import 7from __future__ import division 8from __future__ import print_function 9 10import logging 11import os 12import re 13import shutil 14from six.moves import urllib 15from six.moves import range 16from six.moves import urllib 17import subprocess 18import tempfile 19import time 20 21from autotest_lib.client.bin import test 22from autotest_lib.client.bin import utils 23from autotest_lib.client.common_lib import error 24from autotest_lib.client.common_lib import file_utils 25from autotest_lib.client.cros.input_playback import input_playback 26 27 28class touch_playback_test_base(test.test): 29 """Base class for touch tests involving playback.""" 30 version = 1 31 32 _INPUTCONTROL = '/opt/google/input/inputcontrol' 33 34 35 @property 36 def _has_touchpad(self): 37 """True if device under test has a touchpad; else False.""" 38 return self.player.has('touchpad') 39 40 41 @property 42 def _has_touchscreen(self): 43 """True if device under test has a touchscreen; else False.""" 44 return self.player.has('touchscreen') 45 46 47 @property 48 def _has_mouse(self): 49 """True if device under test has or emulates a USB mouse; else False.""" 50 return self.player.has('mouse') 51 52 53 def warmup(self, mouse_props=None): 54 """Test setup. 55 56 Instantiate player object to find touch devices, if any. 57 These devices can be used for playback later. 58 Emulate a USB mouse if a property file is provided. 59 Check if the inputcontrol script is avaiable on the disk. 60 61 @param mouse_props: optional property file for a mouse to emulate. 62 Created using 'evemu-describe /dev/input/X'. 63 64 """ 65 self.player = input_playback.InputPlayback() 66 if mouse_props: 67 self.player.emulate(input_type='mouse', property_file=mouse_props) 68 self.player.find_connected_inputs() 69 70 self._autotest_ext = None 71 self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL) 72 self._platform = utils.get_board() 73 if 'cheets' in self._platform: 74 self._platform = self._platform[:-len('-cheets')] 75 76 77 def _find_test_files(self, input_type, gestures): 78 """Determine where the playback gesture files for this test are. 79 80 Expected file format is: <boardname>_<input type>_<hwid>_<gesture name> 81 e.g. samus_touchpad_164.17_scroll_down 82 83 @param input_type: device type, e.g. 'touchpad' 84 @param gestures: list of gesture name strings used in filename 85 86 @returns: None if not all files are found. Dictionary of filepaths if 87 they are found, indexed by gesture names as given. 88 @raises: error.TestError if no device is found or if device should have 89 a hw_id but does not. 90 91 """ 92 if type(gestures) is not list: 93 raise error.TestError('find_test_files() takes a LIST, not a ' 94 '%s!' % type(gestures)) 95 96 if not self.player.has(input_type): 97 raise error.TestError('Device does not have a %s!' % input_type) 98 99 if input_type in ['touchpad', 'touchscreen', 'stylus']: 100 hw_id = self.player.devices[input_type].hw_id 101 if not hw_id: 102 raise error.TestError('No valid hw_id for %s!' % input_type) 103 filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id) 104 105 else: 106 device_name = self.player.devices[input_type].name 107 filename_fmt = '%s_%s' % (device_name, input_type) 108 109 filepaths = {} 110 for gesture in gestures: 111 filename = '%s_%s' % (filename_fmt, gesture) 112 filepath = self._download_remote_test_file(filename, input_type) 113 if not filepath: 114 logging.info('Did not find files for this device!') 115 return None 116 117 filepaths[gesture] = filepath 118 119 return filepaths 120 121 122 def _find_test_files_from_directions(self, input_type, fmt_str, directions): 123 """Find gesture files given a list of directions and name format. 124 125 @param input_type: device type, e.g. 'touchpad' 126 @param fmt_str: format string for filename, e.g. 'scroll-%s' 127 @param directions: list of directions for fmt_string 128 129 @returns: None if not all files are found. Dictionary of filepaths if 130 they are found, indexed by directions as given. 131 @raises: error.TestError if no hw_id is found. 132 133 """ 134 gestures = [fmt_str % d for d in directions] 135 temp_filepaths = self._find_test_files(input_type, gestures) 136 137 filepaths = {} 138 if temp_filepaths: 139 filepaths = {d: temp_filepaths[fmt_str % d] for d in directions} 140 141 return filepaths 142 143 144 def _download_remote_test_file(self, filename, input_type): 145 """Download a file from the remote touch playback folder. 146 147 @param filename: string of filename 148 @param input_type: device type, e.g. 'touchpad' 149 150 @returns: Path to local file or None if file is not found. 151 152 """ 153 REMOTE_STORAGE_URL = ('https://storage.googleapis.com/' 154 'chromiumos-test-assets-public/touch_playback') 155 filename = urllib.parse.quote(filename) 156 157 if input_type in ['touchpad', 'touchscreen', 'stylus']: 158 url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename) 159 else: 160 url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename) 161 local_file = os.path.join(self.bindir, filename) 162 163 logging.info('Looking for %s', url) 164 try: 165 file_utils.download_file(url, local_file) 166 except urllib.error.URLError as e: 167 logging.info('File download failed!') 168 logging.debug(e.msg) 169 return None 170 171 return local_file 172 173 174 def _emulate_mouse(self, property_file=None): 175 """Emulate a mouse with the given property file. 176 177 player will use default mouse if no file is provided. 178 179 """ 180 self.player.emulate(input_type='mouse', property_file=property_file) 181 self.player.find_connected_inputs() 182 if not self._has_mouse: 183 raise error.TestError('Mouse emulation failed!') 184 185 def _playback(self, filepath, touch_type='touchpad'): 186 """Playback a given input file on the given input.""" 187 self.player.playback(filepath, touch_type) 188 189 190 def _blocking_playback(self, filepath, touch_type='touchpad'): 191 """Playback a given input file on the given input; block until done.""" 192 self.player.blocking_playback(filepath, touch_type) 193 194 195 def _set_touch_setting_by_inputcontrol(self, setting, value): 196 """Set a given touch setting the given value by inputcontrol. 197 198 @param setting: Name of touch setting, e.g. 'tapclick'. 199 @param value: True for enabled, False for disabled. 200 201 """ 202 cmd_value = 1 if value else 0 203 utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value)) 204 logging.info('%s turned %s.', setting, 'on' if value else 'off') 205 206 207 def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting, 208 value): 209 """Set a given touch setting the given value. 210 211 @param inputcontrol_setting: Name of touch setting for the inputcontrol 212 script, e.g. 'tapclick'. 213 @param autotest_ext_setting: Name of touch setting for the autotest 214 extension, e.g. 'TapToClick'. 215 @param value: True for enabled, False for disabled. 216 217 """ 218 if self._has_inputcontrol: 219 self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value) 220 elif self._autotest_ext is not None: 221 self._autotest_ext.EvaluateJavaScript( 222 'chrome.autotestPrivate.set%s(%s);' 223 % (autotest_ext_setting, ("%s" % value).lower())) 224 # TODO: remove this sleep once checking for value is available. 225 time.sleep(1) 226 else: 227 raise error.TestFail('Both inputcontrol and the autotest ' 228 'extension are not availble.') 229 230 231 def _set_australian_scrolling(self, value): 232 """Set australian scrolling to the given value. 233 234 @param value: True for enabled, False for disabled. 235 236 """ 237 self._set_touch_setting('australian_scrolling', 'NaturalScroll', value) 238 239 240 def _set_tap_to_click(self, value): 241 """Set tap-to-click to the given value. 242 243 @param value: True for enabled, False for disabled. 244 245 """ 246 self._set_touch_setting('tapclick', 'TapToClick', value) 247 248 249 def _set_tap_dragging(self, value): 250 """Set tap dragging to the given value. 251 252 @param value: True for enabled, False for disabled. 253 254 """ 255 self._set_touch_setting('tapdrag', 'TapDragging', value) 256 257 258 def _set_autotest_ext(self, ext): 259 """Set the autotest extension. 260 261 @ext: the autotest extension object. 262 263 """ 264 self._autotest_ext = ext 265 266 267 def _open_test_page(self, cr, filename='test_page.html'): 268 """Prepare test page for testing. Set self._tab with page. 269 270 @param cr: chrome.Chrome() object 271 @param filename: name of file in self.bindir to open 272 273 """ 274 self._test_page = TestPage(cr, self.bindir, filename) 275 self._tab = self._test_page._tab 276 277 278 def _open_events_page(self, cr): 279 """Open the test events page. Set self._events with EventsPage class. 280 281 Also set self._tab as this page and self.bindir as the http server dir. 282 283 @param cr: chrome.Chrome() object 284 285 """ 286 self._events = EventsPage(cr, self.bindir) 287 self._tab = self._events._tab 288 289 290 def _center_cursor(self): 291 """Playback mouse movement to center cursor. 292 293 Requres that self._emulate_mouse() has been called. 294 295 """ 296 self.player.blocking_playback_of_default_file( 297 'mouse_center_cursor_gesture', input_type='mouse') 298 299 300 def _get_kernel_events_recorder(self, input_type): 301 """Return a kernel event recording object for the given input type. 302 303 @param input_type: device type, e.g. 'touchpad' 304 305 @returns: KernelEventsRecorder instance. 306 307 """ 308 node = self.player.devices[input_type].node 309 return KernelEventsRecorder(node) 310 311 312 def cleanup(self): 313 """ clean up """ 314 self.player.close() 315 316 317class KernelEventsRecorder(object): 318 """Object to record kernel events for a particular device.""" 319 320 def __init__(self, node): 321 """Setup to record future evtest output for this node. 322 323 @param input_type: the device which to inspect, e.g. 'mouse' 324 325 """ 326 self.node = node 327 self.fh = tempfile.NamedTemporaryFile() 328 self.evtest_process = None 329 330 331 def start(self): 332 """Start recording events.""" 333 self.evtest_process = subprocess.Popen( 334 ['evtest', self.node], stdout=self.fh) 335 336 # Wait until the initial output has finished before returning. 337 def find_exit(): 338 """Polling function for end of output.""" 339 interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' % 340 self.fh.name) 341 line_count = utils.run(interrupt_cmd).stdout.strip() 342 return line_count != '0' 343 utils.poll_for_condition(find_exit) 344 345 346 def clear(self): 347 """Clear previous events.""" 348 self.stop() 349 self.fh.close() 350 self.fh = tempfile.NamedTemporaryFile() 351 352 353 def stop(self): 354 """Stop recording events.""" 355 if self.evtest_process: 356 self.evtest_process.kill() 357 self.evtest_process = None 358 359 360 def get_recorded_events(self): 361 """Get the evtest output since object was created.""" 362 self.fh.seek(0) 363 events = self.fh.read() 364 return events 365 366 367 def log_recorded_events(self): 368 """Save recorded events into logs.""" 369 events = self.get_recorded_events() 370 logging.info('Kernel events seen:\n%s', events) 371 372 373 def get_last_event_timestamp(self, filter_str=''): 374 """Return the timestamp of the last event since recording started. 375 376 Events are in the form "Event: time <epoch time>, <info>\n" 377 378 @param filter_str: a regex string to match to the <info> section. 379 380 @returns: floats matching 381 382 """ 383 events = self.get_recorded_events() 384 findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str, 385 events.decode('utf-8'), re.MULTILINE) 386 re.findall(r' time (.*?), [^\n]*?%s' % filter_str, 387 events.decode('utf-8'), re.MULTILINE) 388 if not findall: 389 self.log_recorded_events() 390 raise error.TestError('Could not find any kernel timestamps!' 391 ' Filter: %s' % filter_str) 392 return float(findall[-1]) 393 394 395 def close(self): 396 """Clean up this class.""" 397 self.stop() 398 self.fh.close() 399 400 401class TestPage(object): 402 """Wrapper around a Telemtry tab for utility functions. 403 404 Provides functions such as reload and setting scroll height on page. 405 406 """ 407 _DEFAULT_SCROLL = 5000 408 409 def __init__(self, cr, httpdir, filename): 410 """Open a given test page in the given httpdir. 411 412 @param cr: chrome.Chrome() object 413 @param httpdir: the directory to use for SetHTTPServerDirectories 414 @param filename: path to the file to open, relative to httpdir 415 416 """ 417 cr.browser.platform.SetHTTPServerDirectories(httpdir) 418 self._tab = cr.browser.tabs[0] 419 self._tab.Navigate(cr.browser.platform.http_server.UrlOf( 420 os.path.join(httpdir, filename))) 421 self.wait_for_page_ready() 422 423 424 def reload_page(self): 425 """Reloads test page.""" 426 self._tab.Navigate(self._tab.url) 427 self.wait_for_page_ready() 428 429 430 def wait_for_page_ready(self): 431 """Wait for a variable pageReady on the test page to be true. 432 433 Presuposes that a pageReady variable exists on this page. 434 435 @raises error.TestError if page is not ready after timeout. 436 437 """ 438 self._tab.WaitForDocumentReadyStateToBeComplete() 439 utils.poll_for_condition( 440 lambda: self._tab.EvaluateJavaScript('pageReady'), 441 exception=error.TestError('Test page is not ready!')) 442 443 444 def expand_page(self): 445 """Expand the page to be very large, to allow scrolling.""" 446 page_width = self._DEFAULT_SCROLL * 5 447 cmd = 'document.body.style.%s = "%dpx"' % ('%s', page_width) 448 self._tab.ExecuteJavaScript(cmd % 'width') 449 self._tab.ExecuteJavaScript(cmd % 'height') 450 451 452 def set_scroll_position(self, value, scroll_vertical=True): 453 """Set scroll position to given value. 454 455 @param value: integer value in pixels. 456 @param scroll_vertical: True for vertical scroll, 457 False for horizontal Scroll. 458 459 """ 460 cmd = 'window.scrollTo(%d, %d);' 461 if scroll_vertical: 462 self._tab.ExecuteJavaScript(cmd % (0, value)) 463 else: 464 self._tab.ExecuteJavaScript(cmd % (value, 0)) 465 466 467 def set_default_scroll_position(self, scroll_vertical=True): 468 """Set scroll position of page to default. 469 470 @param scroll_vertical: True for vertical scroll, 471 False for horizontal Scroll. 472 @raise: TestError if page is not set to default scroll position 473 474 """ 475 total_tries = 2 476 for i in range(total_tries): 477 try: 478 self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical) 479 self.wait_for_default_scroll_position(scroll_vertical) 480 except error.TestError as e: 481 if i == total_tries - 1: 482 pos = self.get_scroll_position(scroll_vertical) 483 logging.error('SCROLL POSITION: %s', pos) 484 raise e 485 else: 486 self.expand_page() 487 else: 488 break 489 490 491 def get_scroll_position(self, scroll_vertical=True): 492 """Return current scroll position of page. 493 494 @param scroll_vertical: True for vertical scroll, 495 False for horizontal Scroll. 496 497 """ 498 if scroll_vertical: 499 return int(self._tab.EvaluateJavaScript('window.scrollY')) 500 else: 501 return int(self._tab.EvaluateJavaScript('window.scrollX')) 502 503 504 def wait_for_default_scroll_position(self, scroll_vertical=True): 505 """Wait for page to be at the default scroll position. 506 507 @param scroll_vertical: True for vertical scroll, 508 False for horizontal scroll. 509 510 @raise: TestError if page either does not move or does not stop moving. 511 512 """ 513 utils.poll_for_condition( 514 lambda: self.get_scroll_position( scroll_vertical) in 515 [self._DEFAULT_SCROLL,self._DEFAULT_SCROLL-1], 516 exception=error.TestError('Page not set to default scroll!')) 517 518 519 def wait_for_scroll_position_to_settle(self, scroll_vertical=True): 520 """Wait for page to move and then stop moving. 521 522 @param scroll_vertical: True for Vertical scroll and 523 False for horizontal scroll. 524 525 @raise: TestError if page either does not move or does not stop moving. 526 527 """ 528 # Wait until page starts moving. 529 utils.poll_for_condition( 530 lambda: self.get_scroll_position( 531 scroll_vertical) != self._DEFAULT_SCROLL, 532 exception=error.TestError('No scrolling occurred!'), timeout=30) 533 534 # Wait until page has stopped moving. 535 self._previous = self._DEFAULT_SCROLL 536 def _movement_stopped(): 537 current = self.get_scroll_position() 538 result = current == self._previous 539 self._previous = current 540 return result 541 542 utils.poll_for_condition( 543 lambda: _movement_stopped(), sleep_interval=1, 544 exception=error.TestError('Page did not stop moving!'), 545 timeout=30) 546 547 548 def get_page_zoom(self): 549 """Return window.innerWidth for this page.""" 550 return float(self._tab.EvaluateJavaScript( 551 'window.visualViewport.scale')) 552 553 554class EventsPage(TestPage): 555 """Functions to monitor input events on the DUT, as seen by a webpage. 556 557 A subclass of TestPage which uses and interacts with a specific page. 558 559 """ 560 def __init__(self, cr, httpdir): 561 """Open the website and save the tab in self._tab. 562 563 @param cr: chrome.Chrome() object 564 @param httpdir: the directory to use for SetHTTPServerDirectories 565 566 """ 567 filename = 'touch_events_test_page.html' 568 current_dir = os.path.dirname(os.path.realpath(__file__)) 569 shutil.copyfile(os.path.join(current_dir, filename), 570 os.path.join(httpdir, filename)) 571 572 super(EventsPage, self).__init__(cr, httpdir, filename) 573 574 575 def clear_previous_events(self): 576 """Wipe the test page back to its original state.""" 577 self._tab.ExecuteJavaScript('pageReady = false') 578 self._tab.ExecuteJavaScript('clearPreviousEvents()') 579 self.wait_for_page_ready() 580 581 582 def get_events_log(self): 583 """Return the event log from the test page.""" 584 return self._tab.EvaluateJavaScript('eventLog') 585 586 587 def log_events(self): 588 """Put the test page's event log into logging.info.""" 589 logging.info('EVENTS LOG:') 590 logging.info(self.get_events_log()) 591 592 593 def get_time_of_last_event(self): 594 """Return the timestamp of the last seen event (if any).""" 595 return self._tab.EvaluateJavaScript('timeOfLastEvent') 596 597 598 def get_event_count(self): 599 """Return the number of events that the test page has seen.""" 600 return self._tab.EvaluateJavaScript('eventCount') 601 602 603 def get_scroll_delta(self, is_vertical): 604 """Return the net scrolling the test page has seen. 605 606 @param is_vertical: True for vertical scrolling; False for horizontal. 607 608 """ 609 axis = 'y' if is_vertical else 'x' 610 return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis) 611 612 613 def get_click_count(self): 614 """Return the number of clicks the test page has seen.""" 615 return self._tab.EvaluateJavaScript('clickCount') 616 617 618 def wait_for_events_to_complete(self, delay_secs=1, timeout=60): 619 """Wait until test page stops seeing events for delay_secs seconds. 620 621 @param delay_secs: the polling frequency in seconds. 622 @param timeout: the number of seconds to wait for events to complete. 623 @raises: error.TestError if no events occurred. 624 @raises: error.TestError if events did not stop after timeout seconds. 625 626 """ 627 self._tmp_previous_event_count = -1 628 def _events_stopped_coming(): 629 most_recent_event_count = self.get_event_count() 630 delta = most_recent_event_count - self._tmp_previous_event_count 631 self._tmp_previous_event_count = most_recent_event_count 632 return most_recent_event_count != 0 and delta == 0 633 634 try: 635 utils.poll_for_condition( 636 _events_stopped_coming, exception=error.TestError(), 637 sleep_interval=delay_secs, timeout=timeout) 638 except error.TestError: 639 if self._tmp_previous_event_count == 0: 640 raise error.TestError('No touch event was seen!') 641 else: 642 self.log_events() 643 raise error.TestError('Touch events did not stop!') 644 645 646 def set_prevent_defaults(self, value): 647 """Set whether to allow default event actions to go through. 648 649 E.g. if this is True, a two finger horizontal scroll will not actually 650 produce history navigation on the browser. 651 652 @param value: True for prevent defaults; False to allow them. 653 654 """ 655 js_value = str(value).lower() 656 self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value) 657