• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Lint as: python2, python3
2# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6from __future__ import absolute_import
7from __future__ import division
8from __future__ import print_function
9
10import logging
11import os
12import re
13import shutil
14from six.moves import urllib
15from six.moves import range
16from six.moves import urllib
17import subprocess
18import tempfile
19import time
20
21from autotest_lib.client.bin import test
22from autotest_lib.client.bin import utils
23from autotest_lib.client.common_lib import error
24from autotest_lib.client.common_lib import file_utils
25from autotest_lib.client.cros.input_playback import input_playback
26
27
28class touch_playback_test_base(test.test):
29    """Base class for touch tests involving playback."""
30    version = 1
31
32    _INPUTCONTROL = '/opt/google/input/inputcontrol'
33
34
35    @property
36    def _has_touchpad(self):
37        """True if device under test has a touchpad; else False."""
38        return self.player.has('touchpad')
39
40
41    @property
42    def _has_touchscreen(self):
43        """True if device under test has a touchscreen; else False."""
44        return self.player.has('touchscreen')
45
46
47    @property
48    def _has_mouse(self):
49        """True if device under test has or emulates a USB mouse; else False."""
50        return self.player.has('mouse')
51
52
53    def warmup(self, mouse_props=None):
54        """Test setup.
55
56        Instantiate player object to find touch devices, if any.
57        These devices can be used for playback later.
58        Emulate a USB mouse if a property file is provided.
59        Check if the inputcontrol script is avaiable on the disk.
60
61        @param mouse_props: optional property file for a mouse to emulate.
62                            Created using 'evemu-describe /dev/input/X'.
63
64        """
65        self.player = input_playback.InputPlayback()
66        if mouse_props:
67            self.player.emulate(input_type='mouse', property_file=mouse_props)
68        self.player.find_connected_inputs()
69
70        self._autotest_ext = None
71        self._has_inputcontrol = os.path.isfile(self._INPUTCONTROL)
72        self._platform = utils.get_board()
73        if 'cheets' in self._platform:
74            self._platform = self._platform[:-len('-cheets')]
75
76
77    def _find_test_files(self, input_type, gestures):
78        """Determine where the playback gesture files for this test are.
79
80        Expected file format is: <boardname>_<input type>_<hwid>_<gesture name>
81            e.g. samus_touchpad_164.17_scroll_down
82
83        @param input_type: device type, e.g. 'touchpad'
84        @param gestures: list of gesture name strings used in filename
85
86        @returns: None if not all files are found.  Dictionary of filepaths if
87                  they are found, indexed by gesture names as given.
88        @raises: error.TestError if no device is found or if device should have
89                 a hw_id but does not.
90
91        """
92        if type(gestures) is not list:
93            raise error.TestError('find_test_files() takes a LIST, not a '
94                                   '%s!' % type(gestures))
95
96        if not self.player.has(input_type):
97            raise error.TestError('Device does not have a %s!' % input_type)
98
99        if input_type in ['touchpad', 'touchscreen', 'stylus']:
100            hw_id = self.player.devices[input_type].hw_id
101            if not hw_id:
102                raise error.TestError('No valid hw_id for %s!' % input_type)
103            filename_fmt = '%s_%s_%s' % (self._platform, input_type, hw_id)
104
105        else:
106            device_name = self.player.devices[input_type].name
107            filename_fmt = '%s_%s' % (device_name, input_type)
108
109        filepaths = {}
110        for gesture in gestures:
111            filename = '%s_%s' % (filename_fmt, gesture)
112            filepath = self._download_remote_test_file(filename, input_type)
113            if not filepath:
114                logging.info('Did not find files for this device!')
115                return None
116
117            filepaths[gesture] = filepath
118
119        return filepaths
120
121
122    def _find_test_files_from_directions(self, input_type, fmt_str, directions):
123        """Find gesture files given a list of directions and name format.
124
125        @param input_type: device type, e.g. 'touchpad'
126        @param fmt_str: format string for filename, e.g. 'scroll-%s'
127        @param directions: list of directions for fmt_string
128
129        @returns: None if not all files are found.  Dictionary of filepaths if
130                  they are found, indexed by directions as given.
131        @raises: error.TestError if no hw_id is found.
132
133        """
134        gestures = [fmt_str % d for d in directions]
135        temp_filepaths = self._find_test_files(input_type, gestures)
136
137        filepaths = {}
138        if temp_filepaths:
139            filepaths = {d: temp_filepaths[fmt_str % d] for d in directions}
140
141        return filepaths
142
143
144    def _download_remote_test_file(self, filename, input_type):
145        """Download a file from the remote touch playback folder.
146
147        @param filename: string of filename
148        @param input_type: device type, e.g. 'touchpad'
149
150        @returns: Path to local file or None if file is not found.
151
152        """
153        REMOTE_STORAGE_URL = ('https://storage.googleapis.com/'
154                              'chromiumos-test-assets-public/touch_playback')
155        filename = urllib.parse.quote(filename)
156
157        if input_type in ['touchpad', 'touchscreen', 'stylus']:
158            url = '%s/%s/%s' % (REMOTE_STORAGE_URL, self._platform, filename)
159        else:
160            url = '%s/TYPE-%s/%s' % (REMOTE_STORAGE_URL, input_type, filename)
161        local_file = os.path.join(self.bindir, filename)
162
163        logging.info('Looking for %s', url)
164        try:
165            file_utils.download_file(url, local_file)
166        except urllib.error.URLError as e:
167            logging.info('File download failed!')
168            logging.debug(e.msg)
169            return None
170
171        return local_file
172
173
174    def _emulate_mouse(self, property_file=None):
175        """Emulate a mouse with the given property file.
176
177        player will use default mouse if no file is provided.
178
179        """
180        self.player.emulate(input_type='mouse', property_file=property_file)
181        self.player.find_connected_inputs()
182        if not self._has_mouse:
183            raise error.TestError('Mouse emulation failed!')
184
185    def _playback(self, filepath, touch_type='touchpad'):
186        """Playback a given input file on the given input."""
187        self.player.playback(filepath, touch_type)
188
189
190    def _blocking_playback(self, filepath, touch_type='touchpad'):
191        """Playback a given input file on the given input; block until done."""
192        self.player.blocking_playback(filepath, touch_type)
193
194
195    def _set_touch_setting_by_inputcontrol(self, setting, value):
196        """Set a given touch setting the given value by inputcontrol.
197
198        @param setting: Name of touch setting, e.g. 'tapclick'.
199        @param value: True for enabled, False for disabled.
200
201        """
202        cmd_value = 1 if value else 0
203        utils.run('%s --%s %d' % (self._INPUTCONTROL, setting, cmd_value))
204        logging.info('%s turned %s.', setting, 'on' if value else 'off')
205
206
207    def _set_touch_setting(self, inputcontrol_setting, autotest_ext_setting,
208                           value):
209        """Set a given touch setting the given value.
210
211        @param inputcontrol_setting: Name of touch setting for the inputcontrol
212                                     script, e.g. 'tapclick'.
213        @param autotest_ext_setting: Name of touch setting for the autotest
214                                     extension, e.g. 'TapToClick'.
215        @param value: True for enabled, False for disabled.
216
217        """
218        if self._has_inputcontrol:
219            self._set_touch_setting_by_inputcontrol(inputcontrol_setting, value)
220        elif self._autotest_ext is not None:
221            self._autotest_ext.EvaluateJavaScript(
222                    'chrome.autotestPrivate.set%s(%s);'
223                    % (autotest_ext_setting, ("%s" % value).lower()))
224            # TODO: remove this sleep once checking for value is available.
225            time.sleep(1)
226        else:
227            raise error.TestFail('Both inputcontrol and the autotest '
228                                 'extension are not availble.')
229
230
231    def _set_australian_scrolling(self, value):
232        """Set australian scrolling to the given value.
233
234        @param value: True for enabled, False for disabled.
235
236        """
237        self._set_touch_setting('australian_scrolling', 'NaturalScroll', value)
238
239
240    def _set_tap_to_click(self, value):
241        """Set tap-to-click to the given value.
242
243        @param value: True for enabled, False for disabled.
244
245        """
246        self._set_touch_setting('tapclick', 'TapToClick', value)
247
248
249    def _set_tap_dragging(self, value):
250        """Set tap dragging to the given value.
251
252        @param value: True for enabled, False for disabled.
253
254        """
255        self._set_touch_setting('tapdrag', 'TapDragging', value)
256
257
258    def _set_autotest_ext(self, ext):
259        """Set the autotest extension.
260
261        @ext: the autotest extension object.
262
263        """
264        self._autotest_ext = ext
265
266
267    def _open_test_page(self, cr, filename='test_page.html'):
268        """Prepare test page for testing.  Set self._tab with page.
269
270        @param cr: chrome.Chrome() object
271        @param filename: name of file in self.bindir to open
272
273        """
274        self._test_page = TestPage(cr, self.bindir, filename)
275        self._tab = self._test_page._tab
276
277
278    def _open_events_page(self, cr):
279        """Open the test events page.  Set self._events with EventsPage class.
280
281        Also set self._tab as this page and self.bindir as the http server dir.
282
283        @param cr: chrome.Chrome() object
284
285        """
286        self._events = EventsPage(cr, self.bindir)
287        self._tab = self._events._tab
288
289
290    def _center_cursor(self):
291        """Playback mouse movement to center cursor.
292
293        Requres that self._emulate_mouse() has been called.
294
295        """
296        self.player.blocking_playback_of_default_file(
297                'mouse_center_cursor_gesture', input_type='mouse')
298
299
300    def _get_kernel_events_recorder(self, input_type):
301        """Return a kernel event recording object for the given input type.
302
303        @param input_type: device type, e.g. 'touchpad'
304
305        @returns: KernelEventsRecorder instance.
306
307        """
308        node = self.player.devices[input_type].node
309        return KernelEventsRecorder(node)
310
311
312    def cleanup(self):
313        """ clean up """
314        self.player.close()
315
316
317class KernelEventsRecorder(object):
318    """Object to record kernel events for a particular device."""
319
320    def __init__(self, node):
321        """Setup to record future evtest output for this node.
322
323        @param input_type: the device which to inspect, e.g. 'mouse'
324
325        """
326        self.node = node
327        self.fh = tempfile.NamedTemporaryFile()
328        self.evtest_process = None
329
330
331    def start(self):
332        """Start recording events."""
333        self.evtest_process = subprocess.Popen(
334                ['evtest', self.node], stdout=self.fh)
335
336        # Wait until the initial output has finished before returning.
337        def find_exit():
338            """Polling function for end of output."""
339            interrupt_cmd = ('grep "interrupt to exit" %s | wc -l' %
340                             self.fh.name)
341            line_count = utils.run(interrupt_cmd).stdout.strip()
342            return line_count != '0'
343        utils.poll_for_condition(find_exit)
344
345
346    def clear(self):
347        """Clear previous events."""
348        self.stop()
349        self.fh.close()
350        self.fh = tempfile.NamedTemporaryFile()
351
352
353    def stop(self):
354        """Stop recording events."""
355        if self.evtest_process:
356            self.evtest_process.kill()
357            self.evtest_process = None
358
359
360    def get_recorded_events(self):
361        """Get the evtest output since object was created."""
362        self.fh.seek(0)
363        events = self.fh.read()
364        return events
365
366
367    def log_recorded_events(self):
368        """Save recorded events into logs."""
369        events = self.get_recorded_events()
370        logging.info('Kernel events seen:\n%s', events)
371
372
373    def get_last_event_timestamp(self, filter_str=''):
374        """Return the timestamp of the last event since recording started.
375
376        Events are in the form "Event: time <epoch time>, <info>\n"
377
378        @param filter_str: a regex string to match to the <info> section.
379
380        @returns: floats matching
381
382        """
383        events = self.get_recorded_events()
384        findall = re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
385                             events.decode('utf-8'), re.MULTILINE)
386        re.findall(r' time (.*?), [^\n]*?%s' % filter_str,
387                   events.decode('utf-8'), re.MULTILINE)
388        if not findall:
389            self.log_recorded_events()
390            raise error.TestError('Could not find any kernel timestamps!'
391                                  '  Filter: %s' % filter_str)
392        return float(findall[-1])
393
394
395    def close(self):
396        """Clean up this class."""
397        self.stop()
398        self.fh.close()
399
400
401class TestPage(object):
402    """Wrapper around a Telemtry tab for utility functions.
403
404    Provides functions such as reload and setting scroll height on page.
405
406    """
407    _DEFAULT_SCROLL = 5000
408
409    def __init__(self, cr, httpdir, filename):
410        """Open a given test page in the given httpdir.
411
412        @param cr: chrome.Chrome() object
413        @param httpdir: the directory to use for SetHTTPServerDirectories
414        @param filename: path to the file to open, relative to httpdir
415
416        """
417        cr.browser.platform.SetHTTPServerDirectories(httpdir)
418        self._tab = cr.browser.tabs[0]
419        self._tab.Navigate(cr.browser.platform.http_server.UrlOf(
420                os.path.join(httpdir, filename)))
421        self.wait_for_page_ready()
422
423
424    def reload_page(self):
425        """Reloads test page."""
426        self._tab.Navigate(self._tab.url)
427        self.wait_for_page_ready()
428
429
430    def wait_for_page_ready(self):
431        """Wait for a variable pageReady on the test page to be true.
432
433        Presuposes that a pageReady variable exists on this page.
434
435        @raises error.TestError if page is not ready after timeout.
436
437        """
438        self._tab.WaitForDocumentReadyStateToBeComplete()
439        utils.poll_for_condition(
440                lambda: self._tab.EvaluateJavaScript('pageReady'),
441                exception=error.TestError('Test page is not ready!'))
442
443
444    def expand_page(self):
445        """Expand the page to be very large, to allow scrolling."""
446        page_width = self._DEFAULT_SCROLL * 5
447        cmd = 'document.body.style.%s = "%dpx"' % ('%s', page_width)
448        self._tab.ExecuteJavaScript(cmd % 'width')
449        self._tab.ExecuteJavaScript(cmd % 'height')
450
451
452    def set_scroll_position(self, value, scroll_vertical=True):
453        """Set scroll position to given value.
454
455        @param value: integer value in pixels.
456        @param scroll_vertical: True for vertical scroll,
457                                False for horizontal Scroll.
458
459        """
460        cmd = 'window.scrollTo(%d, %d);'
461        if scroll_vertical:
462            self._tab.ExecuteJavaScript(cmd % (0, value))
463        else:
464            self._tab.ExecuteJavaScript(cmd % (value, 0))
465
466
467    def set_default_scroll_position(self, scroll_vertical=True):
468        """Set scroll position of page to default.
469
470        @param scroll_vertical: True for vertical scroll,
471                                False for horizontal Scroll.
472        @raise: TestError if page is not set to default scroll position
473
474        """
475        total_tries = 2
476        for i in range(total_tries):
477            try:
478                self.set_scroll_position(self._DEFAULT_SCROLL, scroll_vertical)
479                self.wait_for_default_scroll_position(scroll_vertical)
480            except error.TestError as e:
481                if i == total_tries - 1:
482                    pos = self.get_scroll_position(scroll_vertical)
483                    logging.error('SCROLL POSITION: %s', pos)
484                    raise e
485                else:
486                    self.expand_page()
487            else:
488                break
489
490
491    def get_scroll_position(self, scroll_vertical=True):
492        """Return current scroll position of page.
493
494        @param scroll_vertical: True for vertical scroll,
495                                False for horizontal Scroll.
496
497        """
498        if scroll_vertical:
499            return int(self._tab.EvaluateJavaScript('window.scrollY'))
500        else:
501            return int(self._tab.EvaluateJavaScript('window.scrollX'))
502
503
504    def wait_for_default_scroll_position(self, scroll_vertical=True):
505        """Wait for page to be at the default scroll position.
506
507        @param scroll_vertical: True for vertical scroll,
508                                False for horizontal scroll.
509
510        @raise: TestError if page either does not move or does not stop moving.
511
512        """
513        utils.poll_for_condition(
514                lambda: self.get_scroll_position( scroll_vertical) in
515                        [self._DEFAULT_SCROLL,self._DEFAULT_SCROLL-1],
516                exception=error.TestError('Page not set to default scroll!'))
517
518
519    def wait_for_scroll_position_to_settle(self, scroll_vertical=True):
520        """Wait for page to move and then stop moving.
521
522        @param scroll_vertical: True for Vertical scroll and
523                                False for horizontal scroll.
524
525        @raise: TestError if page either does not move or does not stop moving.
526
527        """
528        # Wait until page starts moving.
529        utils.poll_for_condition(
530                lambda: self.get_scroll_position(
531                        scroll_vertical) != self._DEFAULT_SCROLL,
532                exception=error.TestError('No scrolling occurred!'), timeout=30)
533
534        # Wait until page has stopped moving.
535        self._previous = self._DEFAULT_SCROLL
536        def _movement_stopped():
537            current = self.get_scroll_position()
538            result = current == self._previous
539            self._previous = current
540            return result
541
542        utils.poll_for_condition(
543                lambda: _movement_stopped(), sleep_interval=1,
544                exception=error.TestError('Page did not stop moving!'),
545                timeout=30)
546
547
548    def get_page_zoom(self):
549        """Return window.innerWidth for this page."""
550        return float(self._tab.EvaluateJavaScript(
551                            'window.visualViewport.scale'))
552
553
554class EventsPage(TestPage):
555    """Functions to monitor input events on the DUT, as seen by a webpage.
556
557    A subclass of TestPage which uses and interacts with a specific page.
558
559    """
560    def __init__(self, cr, httpdir):
561        """Open the website and save the tab in self._tab.
562
563        @param cr: chrome.Chrome() object
564        @param httpdir: the directory to use for SetHTTPServerDirectories
565
566        """
567        filename = 'touch_events_test_page.html'
568        current_dir = os.path.dirname(os.path.realpath(__file__))
569        shutil.copyfile(os.path.join(current_dir, filename),
570                        os.path.join(httpdir, filename))
571
572        super(EventsPage, self).__init__(cr, httpdir, filename)
573
574
575    def clear_previous_events(self):
576        """Wipe the test page back to its original state."""
577        self._tab.ExecuteJavaScript('pageReady = false')
578        self._tab.ExecuteJavaScript('clearPreviousEvents()')
579        self.wait_for_page_ready()
580
581
582    def get_events_log(self):
583        """Return the event log from the test page."""
584        return self._tab.EvaluateJavaScript('eventLog')
585
586
587    def log_events(self):
588        """Put the test page's event log into logging.info."""
589        logging.info('EVENTS LOG:')
590        logging.info(self.get_events_log())
591
592
593    def get_time_of_last_event(self):
594        """Return the timestamp of the last seen event (if any)."""
595        return self._tab.EvaluateJavaScript('timeOfLastEvent')
596
597
598    def get_event_count(self):
599        """Return the number of events that the test page has seen."""
600        return self._tab.EvaluateJavaScript('eventCount')
601
602
603    def get_scroll_delta(self, is_vertical):
604        """Return the net scrolling the test page has seen.
605
606        @param is_vertical: True for vertical scrolling; False for horizontal.
607
608        """
609        axis = 'y' if is_vertical else 'x'
610        return self._tab.EvaluateJavaScript('netScrollDelta.%s' % axis)
611
612
613    def get_click_count(self):
614        """Return the number of clicks the test page has seen."""
615        return self._tab.EvaluateJavaScript('clickCount')
616
617
618    def wait_for_events_to_complete(self, delay_secs=1, timeout=60):
619        """Wait until test page stops seeing events for delay_secs seconds.
620
621        @param delay_secs: the polling frequency in seconds.
622        @param timeout: the number of seconds to wait for events to complete.
623        @raises: error.TestError if no events occurred.
624        @raises: error.TestError if events did not stop after timeout seconds.
625
626        """
627        self._tmp_previous_event_count = -1
628        def _events_stopped_coming():
629            most_recent_event_count = self.get_event_count()
630            delta = most_recent_event_count - self._tmp_previous_event_count
631            self._tmp_previous_event_count = most_recent_event_count
632            return most_recent_event_count != 0 and delta == 0
633
634        try:
635            utils.poll_for_condition(
636                    _events_stopped_coming, exception=error.TestError(),
637                    sleep_interval=delay_secs, timeout=timeout)
638        except error.TestError:
639            if self._tmp_previous_event_count == 0:
640                raise error.TestError('No touch event was seen!')
641            else:
642                self.log_events()
643                raise error.TestError('Touch events did not stop!')
644
645
646    def set_prevent_defaults(self, value):
647        """Set whether to allow default event actions to go through.
648
649        E.g. if this is True, a two finger horizontal scroll will not actually
650        produce history navigation on the browser.
651
652        @param value: True for prevent defaults; False to allow them.
653
654        """
655        js_value = str(value).lower()
656        self._tab.ExecuteJavaScript('preventDefaults = %s;' % js_value)
657