1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import logging 6import os 7import re 8import shutil 9import subprocess 10import tempfile 11import time 12import urllib 13import urllib2 14 15from autotest_lib.client.bin import test 16from autotest_lib.client.bin import utils 17from autotest_lib.client.common_lib import error 18from autotest_lib.client.common_lib import file_utils 19from autotest_lib.client.cros.input_playback import input_playback 20 21 22class touch_playback_test_base(test.test): 23 """Base class for touch tests involving playback.""" 24 version = 1 25 26 _INPUTCONTROL = '/opt/google/input/inputcontrol' 27 28 29 @property 30 def _has_touchpad(self): 31 """True if device under test has a touchpad; else False.""" 32 return self.player.has('touchpad') 33 34 35 @property 36 def _has_touchscreen(self): 37 """True if device under test has a touchscreen; else False.""" 38 return self.player.has('touchscreen') 39 40 41 @property 42 def _has_mouse(self): 43 """True if device under test has or emulates a USB mouse; else False.""" 44 return self.player.has('mouse') 45 46 47 def warmup(self, mouse_props=None): 48 """Test setup. 49 50 Instantiate player object to find touch devices, if any. 51 These devices can be used for playback later. 52 Emulate a USB mouse if a property file is provided. 53 Check if the inputcontrol script is avaiable on the disk. 54 55 @param mouse_props: optional property file for a mouse to emulate. 56 Created using 'evemu-describe /dev/input/X'. 57 58 """ 59 self.player = input_playback.InputPlayback() 60 if mouse_props: 61 self.player.emulate(input_type='mouse', property_file=mouse_props) 62 self.player.find_connected_inputs() 63 64 self._autotest_ext = None 65 self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL) 66 self._platform = utils.get_board() 67 if 'cheets' in self._platform: 68 self._platform = self._platform[:-len('-cheets')] 69 70 71 def _find_test_files(self, input_type, gestures): 72 """Determine where the playback gesture files for this test are. 73 74 Expected file format is: <boardname>_<input type>_<hwid>_<gesture name> 75 e.g. samus_touchpad_164.17_scroll_down 76 77 @param input_type: device type, e.g. 'touchpad' 78 @param gestures: list of gesture name strings used in filename 79 80 @returns: None if not all files are found. Dictionary of filepaths if 81 they are found, indexed by gesture names as given. 82 @raises: error.TestError if no device is found or if device should have 83 a hw_id but does not. 84 85 """ 86 if type(gestures) is not list: 87 raise error.TestError('find_test_files() takes a LIST, not a ' 88 '%s!' % type(gestures)) 89 90 if not self.player.has(input_type): 91 raise error.TestError('Device does not have a %s!' % input_type) 92 93 if input_type in ['touchpad', 'touchscreen', 'stylus']: 94 hw_id = self.player.devices[input_type].hw_id 95 if not hw_id: 96 raise error.TestError('No valid hw_id for %s!' % input_type) 97 filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id) 98 99 else: 100 device_name = self.player.devices[input_type].name 101 filename_fmt = '%s_%s' % (device_name, input_type) 102 103 filepaths = {} 104 for gesture in gestures: 105 filename = '%s_%s' % (filename_fmt, gesture) 106 filepath = self._download_remote_test_file(filename, input_type) 107 if not filepath: 108 logging.info('Did not find files for this device!') 109 return None 110 111 filepaths[gesture] = filepath 112 113 return filepaths 114 115 116 def _find_test_files_from_directions(self, input_type, fmt_str, directions): 117 """Find gesture files given a list of directions and name format. 118 119 @param input_type: device type, e.g. 'touchpad' 120 @param fmt_str: format string for filename, e.g. 'scroll-%s' 121 @param directions: list of directions for fmt_string 122 123 @returns: None if not all files are found. Dictionary of filepaths if 124 they are found, indexed by directions as given. 125 @raises: error.TestError if no hw_id is found. 126 127 """ 128 gestures = [fmt_str % d for d in directions] 129 temp_filepaths = self._find_test_files(input_type, gestures) 130 131 filepaths = {} 132 if temp_filepaths: 133 filepaths = {d: temp_filepaths[fmt_str % d] for d in directions} 134 135 return filepaths 136 137 138 def _download_remote_test_file(self, filename, input_type): 139 """Download a file from the remote touch playback folder. 140 141 @param filename: string of filename 142 @param input_type: device type, e.g. 'touchpad' 143 144 @returns: Path to local file or None if file is not found. 145 146 """ 147 REMOTE_STORAGE_URL = ('https://storage.googleapis.com/' 148 'chromiumos-test-assets-public/touch_playback') 149 filename = urllib.quote(filename) 150 151 if input_type in ['touchpad', 'touchscreen', 'stylus']: 152 url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename) 153 else: 154 url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename) 155 local_file = os.path.join(self.bindir, filename) 156 157 logging.info('Looking for %s', url) 158 try: 159 file_utils.download_file(url, local_file) 160 except urllib2.URLError as e: 161 logging.info('File download failed!') 162 logging.debug(e.msg) 163 return None 164 165 return local_file 166 167 168 def _emulate_mouse(self, property_file=None): 169 """Emulate a mouse with the given property file. 170 171 player will use default mouse if no file is provided. 172 173 """ 174 self.player.emulate(input_type='mouse', property_file=property_file) 175 self.player.find_connected_inputs() 176 if not self._has_mouse: 177 raise error.TestError('Mouse emulation failed!') 178 179 180 def _playback(self, filepath, touch_type='touchpad'): 181 """Playback a given input file on the given input.""" 182 self.player.playback(filepath, touch_type) 183 184 185 def _blocking_playback(self, filepath, touch_type='touchpad'): 186 """Playback a given input file on the given input; block until done.""" 187 self.player.blocking_playback(filepath, touch_type) 188 189 190 def _set_touch_setting_by_inputcontrol(self, setting, value): 191 """Set a given touch setting the given value by inputcontrol. 192 193 @param setting: Name of touch setting, e.g. 'tapclick'. 194 @param value: True for enabled, False for disabled. 195 196 """ 197 cmd_value = 1 if value else 0 198 utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value)) 199 logging.info('%s turned %s.', setting, 'on' if value else 'off') 200 201 202 def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting, 203 value): 204 """Set a given touch setting the given value. 205 206 @param inputcontrol_setting: Name of touch setting for the inputcontrol 207 script, e.g. 'tapclick'. 208 @param autotest_ext_setting: Name of touch setting for the autotest 209 extension, e.g. 'TapToClick'. 210 @param value: True for enabled, False for disabled. 211 212 """ 213 if self._has_inputcontrol: 214 self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value) 215 elif self._autotest_ext is not None: 216 self._autotest_ext.EvaluateJavaScript( 217 'chrome.autotestPrivate.set%s(%s);' 218 % (autotest_ext_setting, ("%s" % value).lower())) 219 # TODO: remove this sleep once checking for value is available. 220 time.sleep(1) 221 else: 222 raise error.TestFail('Both inputcontrol and the autotest ' 223 'extension are not availble.') 224 225 226 def _set_australian_scrolling(self, value): 227 """Set australian scrolling to the given value. 228 229 @param value: True for enabled, False for disabled. 230 231 """ 232 self._set_touch_setting('australian_scrolling', 'NaturalScroll', value) 233 234 235 def _set_tap_to_click(self, value): 236 """Set tap-to-click to the given value. 237 238 @param value: True for enabled, False for disabled. 239 240 """ 241 self._set_touch_setting('tapclick', 'TapToClick', value) 242 243 244 def _set_tap_dragging(self, value): 245 """Set tap dragging to the given value. 246 247 @param value: True for enabled, False for disabled. 248 249 """ 250 self._set_touch_setting('tapdrag', 'TapDragging', value) 251 252 253 def _set_autotest_ext(self, ext): 254 """Set the autotest extension. 255 256 @ext: the autotest extension object. 257 258 """ 259 self._autotest_ext = ext 260 261 262 def _open_test_page(self, cr, filename='test_page.html'): 263 """Prepare test page for testing. Set self._tab with page. 264 265 @param cr: chrome.Chrome() object 266 @param filename: name of file in self.bindir to open 267 268 """ 269 self._test_page = TestPage(cr, self.bindir, filename) 270 self._tab = self._test_page._tab 271 272 273 def _open_events_page(self, cr): 274 """Open the test events page. Set self._events with EventsPage class. 275 276 Also set self._tab as this page and self.bindir as the http server dir. 277 278 @param cr: chrome.Chrome() object 279 280 """ 281 self._events = EventsPage(cr, self.bindir) 282 self._tab = self._events._tab 283 284 285 def _center_cursor(self): 286 """Playback mouse movement to center cursor. 287 288 Requres that self._emulate_mouse() has been called. 289 290 """ 291 self.player.blocking_playback_of_default_file( 292 'mouse_center_cursor_gesture', input_type='mouse') 293 294 295 def _get_kernel_events_recorder(self, input_type): 296 """Return a kernel event recording object for the given input type. 297 298 @param input_type: device type, e.g. 'touchpad' 299 300 @returns: KernelEventsRecorder instance. 301 302 """ 303 node = self.player.devices[input_type].node 304 return KernelEventsRecorder(node) 305 306 307 def cleanup(self): 308 self.player.close() 309 310 311class KernelEventsRecorder(object): 312 """Object to record kernel events for a particular device.""" 313 314 def __init__(self, node): 315 """Setup to record future evtest output for this node. 316 317 @param input_type: the device which to inspect, e.g. 'mouse' 318 319 """ 320 self.node = node 321 self.fh = tempfile.NamedTemporaryFile() 322 self.evtest_process = None 323 324 325 def start(self): 326 """Start recording events.""" 327 self.evtest_process = subprocess.Popen( 328 ['evtest', self.node], stdout=self.fh) 329 330 # Wait until the initial output has finished before returning. 331 def find_exit(): 332 """Polling function for end of output.""" 333 interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' % 334 self.fh.name) 335 line_count = utils.run(interrupt_cmd).stdout.strip() 336 return line_count != '0' 337 utils.poll_for_condition(find_exit) 338 339 340 def clear(self): 341 """Clear previous events.""" 342 self.stop() 343 self.fh.close() 344 self.fh = tempfile.NamedTemporaryFile() 345 346 347 def stop(self): 348 """Stop recording events.""" 349 if self.evtest_process: 350 self.evtest_process.kill() 351 self.evtest_process = None 352 353 354 def get_recorded_events(self): 355 """Get the evtest output since object was created.""" 356 self.fh.seek(0) 357 events = self.fh.read() 358 return events 359 360 361 def log_recorded_events(self): 362 """Save recorded events into logs.""" 363 events = self.get_recorded_events() 364 logging.info('Kernel events seen:\n%s', events) 365 366 367 def get_last_event_timestamp(self, filter_str=''): 368 """Return the timestamp of the last event since recording started. 369 370 Events are in the form "Event: time <epoch time>, <info>\n" 371 372 @param filter_str: a regex string to match to the <info> section. 373 374 @returns: floats matching 375 376 """ 377 events = self.get_recorded_events() 378 findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str, 379 events, re.MULTILINE) 380 re.findall(r' time (.*?), [^\n]*?%s' % filter_str, events, re.MULTILINE) 381 if not findall: 382 self.log_recorded_events() 383 raise error.TestError('Could not find any kernel timestamps!' 384 ' Filter: %s' % filter_str) 385 return float(findall[-1]) 386 387 388 def close(self): 389 """Clean up this class.""" 390 self.stop() 391 self.fh.close() 392 393 394class TestPage(object): 395 """Wrapper around a Telemtry tab for utility functions. 396 397 Provides functions such as reload and setting scroll height on page. 398 399 """ 400 _DEFAULT_SCROLL = 5000 401 402 def __init__(self, cr, httpdir, filename): 403 """Open a given test page in the given httpdir. 404 405 @param cr: chrome.Chrome() object 406 @param httpdir: the directory to use for SetHTTPServerDirectories 407 @param filename: path to the file to open, relative to httpdir 408 409 """ 410 cr.browser.platform.SetHTTPServerDirectories(httpdir) 411 self._tab = cr.browser.tabs[0] 412 self._tab.Navigate(cr.browser.platform.http_server.UrlOf( 413 os.path.join(httpdir, filename))) 414 self.wait_for_page_ready() 415 416 417 def reload_page(self): 418 """Reloads test page.""" 419 self._tab.Navigate(self._tab.url) 420 self.wait_for_page_ready() 421 422 423 def wait_for_page_ready(self): 424 """Wait for a variable pageReady on the test page to be true. 425 426 Presuposes that a pageReady variable exists on this page. 427 428 @raises error.TestError if page is not ready after timeout. 429 430 """ 431 self._tab.WaitForDocumentReadyStateToBeComplete() 432 utils.poll_for_condition( 433 lambda: self._tab.EvaluateJavaScript('pageReady'), 434 exception=error.TestError('Test page is not ready!')) 435 436 437 def expand_page(self): 438 """Expand the page to be very large, to allow scrolling.""" 439 page_width = self._DEFAULT_SCROLL * 5 440 cmd = 'document.body.style.%s = "%dpx"' % ('%s', page_width) 441 self._tab.ExecuteJavaScript(cmd % 'width') 442 self._tab.ExecuteJavaScript(cmd % 'height') 443 444 445 def set_scroll_position(self, value, scroll_vertical=True): 446 """Set scroll position to given value. 447 448 @param value: integer value in pixels. 449 @param scroll_vertical: True for vertical scroll, 450 False for horizontal Scroll. 451 452 """ 453 cmd = 'window.scrollTo(%d, %d);' 454 if scroll_vertical: 455 self._tab.ExecuteJavaScript(cmd % (0, value)) 456 else: 457 self._tab.ExecuteJavaScript(cmd % (value, 0)) 458 459 460 def set_default_scroll_position(self, scroll_vertical=True): 461 """Set scroll position of page to default. 462 463 @param scroll_vertical: True for vertical scroll, 464 False for horizontal Scroll. 465 @raise: TestError if page is not set to default scroll position 466 467 """ 468 total_tries = 2 469 for i in xrange(total_tries): 470 try: 471 self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical) 472 self.wait_for_default_scroll_position(scroll_vertical) 473 except error.TestError as e: 474 if i == total_tries - 1: 475 pos = self.get_scroll_position(scroll_vertical) 476 logging.error('SCROLL POSITION: %s', pos) 477 raise e 478 else: 479 self.expand_page() 480 else: 481 break 482 483 484 def get_scroll_position(self, scroll_vertical=True): 485 """Return current scroll position of page. 486 487 @param scroll_vertical: True for vertical scroll, 488 False for horizontal Scroll. 489 490 """ 491 if scroll_vertical: 492 return int(self._tab.EvaluateJavaScript('window.scrollY')) 493 else: 494 return int(self._tab.EvaluateJavaScript('window.scrollX')) 495 496 497 def wait_for_default_scroll_position(self, scroll_vertical=True): 498 """Wait for page to be at the default scroll position. 499 500 @param scroll_vertical: True for vertical scroll, 501 False for horizontal scroll. 502 503 @raise: TestError if page either does not move or does not stop moving. 504 505 """ 506 utils.poll_for_condition( 507 lambda: self.get_scroll_position( 508 scroll_vertical) == self._DEFAULT_SCROLL, 509 exception=error.TestError('Page not set to default scroll!')) 510 511 512 def wait_for_scroll_position_to_settle(self, scroll_vertical=True): 513 """Wait for page to move and then stop moving. 514 515 @param scroll_vertical: True for Vertical scroll and 516 False for horizontal scroll. 517 518 @raise: TestError if page either does not move or does not stop moving. 519 520 """ 521 # Wait until page starts moving. 522 utils.poll_for_condition( 523 lambda: self.get_scroll_position( 524 scroll_vertical) != self._DEFAULT_SCROLL, 525 exception=error.TestError('No scrolling occurred!'), timeout=30) 526 527 # Wait until page has stopped moving. 528 self._previous = self._DEFAULT_SCROLL 529 def _movement_stopped(): 530 current = self.get_scroll_position() 531 result = current == self._previous 532 self._previous = current 533 return result 534 535 utils.poll_for_condition( 536 lambda: _movement_stopped(), sleep_interval=1, 537 exception=error.TestError('Page did not stop moving!'), 538 timeout=30) 539 540 541 def get_page_width(self): 542 """Return window.innerWidth for this page.""" 543 return int(self._tab.EvaluateJavaScript('window.innerWidth')) 544 545 546class EventsPage(TestPage): 547 """Functions to monitor input events on the DUT, as seen by a webpage. 548 549 A subclass of TestPage which uses and interacts with a specific page. 550 551 """ 552 def __init__(self, cr, httpdir): 553 """Open the website and save the tab in self._tab. 554 555 @param cr: chrome.Chrome() object 556 @param httpdir: the directory to use for SetHTTPServerDirectories 557 558 """ 559 filename = 'touch_events_test_page.html' 560 current_dir = os.path.dirname(os.path.realpath(__file__)) 561 shutil.copyfile(os.path.join(current_dir, filename), 562 os.path.join(httpdir, filename)) 563 564 super(EventsPage, self).__init__(cr, httpdir, filename) 565 566 567 def clear_previous_events(self): 568 """Wipe the test page back to its original state.""" 569 self._tab.ExecuteJavaScript('pageReady = false') 570 self._tab.ExecuteJavaScript('clearPreviousEvents()') 571 self.wait_for_page_ready() 572 573 574 def get_events_log(self): 575 """Return the event log from the test page.""" 576 return self._tab.EvaluateJavaScript('eventLog') 577 578 579 def log_events(self): 580 """Put the test page's event log into logging.info.""" 581 logging.info('EVENTS LOG:') 582 logging.info(self.get_events_log()) 583 584 585 def get_time_of_last_event(self): 586 """Return the timestamp of the last seen event (if any).""" 587 return self._tab.EvaluateJavaScript('timeOfLastEvent') 588 589 590 def get_event_count(self): 591 """Return the number of events that the test page has seen.""" 592 return self._tab.EvaluateJavaScript('eventCount') 593 594 595 def get_scroll_delta(self, is_vertical): 596 """Return the net scrolling the test page has seen. 597 598 @param is_vertical: True for vertical scrolling; False for horizontal. 599 600 """ 601 axis = 'y' if is_vertical else 'x' 602 return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis) 603 604 605 def get_click_count(self): 606 """Return the number of clicks the test page has seen.""" 607 return self._tab.EvaluateJavaScript('clickCount') 608 609 610 def wait_for_events_to_complete(self, delay_secs=1, timeout=60): 611 """Wait until test page stops seeing events for delay_secs seconds. 612 613 @param delay_secs: the polling frequency in seconds. 614 @param timeout: the number of seconds to wait for events to complete. 615 @raises: error.TestError if no events occurred. 616 @raises: error.TestError if events did not stop after timeout seconds. 617 618 """ 619 self._tmp_previous_event_count = -1 620 def _events_stopped_coming(): 621 most_recent_event_count = self.get_event_count() 622 delta = most_recent_event_count - self._tmp_previous_event_count 623 self._tmp_previous_event_count = most_recent_event_count 624 return most_recent_event_count != 0 and delta == 0 625 626 try: 627 utils.poll_for_condition( 628 _events_stopped_coming, exception=error.TestError(), 629 sleep_interval=delay_secs, timeout=timeout) 630 except error.TestError: 631 if self._tmp_previous_event_count == 0: 632 raise error.TestError('No touch event was seen!') 633 else: 634 self.log_events() 635 raise error.TestError('Touch events did not stop!') 636 637 638 def set_prevent_defaults(self, value): 639 """Set whether to allow default event actions to go through. 640 641 E.g. if this is True, a two finger horizontal scroll will not actually 642 produce history navigation on the browser. 643 644 @param value: True for prevent defaults; False to allow them. 645 646 """ 647 js_value = str(value).lower() 648 self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value) 649