• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""This module provides functions to record input events."""
6
7import logging
8import re
9import select
10import subprocess
11import threading
12import time
13
14from linux_input import EV_MSC, EV_SYN, MSC_SCAN, SYN_REPORT
15
16
17# Define extra misc events below as they are not defined in linux_input.
18MSC_SCAN_BTN_LEFT = 90001
19MSC_SCAN_BTN_RIGHT = 90002
20MSC_SCAN_BTN_MIDDLE = 90003
21
22
23class InputEventRecorderError(Exception):
24    """An exception class for input_event_recorder module."""
25    pass
26
27
28class Event(object):
29    """An event class based on evtest constructed from an evtest event.
30
31    An ordinary event looks like:
32    Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316
33
34    A SYN_REPORT event looks like:
35    Event: time 10788.289613, -------------- SYN_REPORT ------------
36
37    """
38
39    def __init__(self, type=0, code=0, value=0):
40        """Construction of an input event.
41
42        @param type: the event type
43        @param code: the event code
44        @param value: the event value
45
46        """
47        self.type = type
48        self.code = code
49        self.value= value
50
51
52    @staticmethod
53    def from_string(ev_string):
54        """Convert an event string to an event object.
55
56        @param ev_string: an event string.
57
58        @returns: an event object if the event string conforms to
59                  event pattern format. None otherwise.
60
61        """
62        # Get the pattern of an ordinary event
63        ev_pattern_time = r'Event:\s*time\s*(\d+\.\d+)'
64        ev_pattern_type = r'type\s*(\d+)\s*\(\w+\)'
65        ev_pattern_code = r'code\s*(\d+)\s*\(\w+\)'
66        ev_pattern_value = r'value\s*(-?\d+)'
67        ev_sep = r',\s*'
68        ev_pattern_str = ev_sep.join([ev_pattern_time,
69                                      ev_pattern_type,
70                                      ev_pattern_code,
71                                      ev_pattern_value])
72        ev_pattern = re.compile(ev_pattern_str, re.I)
73
74        # Get the pattern of the SYN_REPORT event
75        ev_pattern_type_SYN_REPORT = r'-+\s*SYN_REPORT\s-+'
76        ev_pattern_SYN_REPORT_str = ev_sep.join([ev_pattern_time,
77                                                 ev_pattern_type_SYN_REPORT])
78        ev_pattern_SYN_REPORT = re.compile(ev_pattern_SYN_REPORT_str, re.I)
79
80        # Check if it is a SYN event.
81        result = ev_pattern_SYN_REPORT.search(ev_string)
82        if result:
83            return Event(EV_SYN, SYN_REPORT, 0)
84        else:
85            # Check if it is a regular event.
86            result = ev_pattern.search(ev_string)
87            if result:
88                ev_type = int(result.group(2))
89                ev_code = int(result.group(3))
90                ev_value = int(result.group(4))
91                return Event(ev_type, ev_code, ev_value)
92            else:
93                logging.warn('not an event: %s', ev_string)
94                return None
95
96
97    def is_syn(self):
98        """Determine if the event is a SYN report event.
99
100        @returns: True if it is a SYN report event. False otherwise.
101
102        """
103        return self.type == EV_SYN and self.code == SYN_REPORT
104
105
106    def value_tuple(self):
107        """A tuple of the event type, code, and value.
108
109        @returns: the tuple of the event type, code, and value.
110
111        """
112        return (self.type, self.code, self.value)
113
114
115    def __eq__(self, other):
116        """determine if two events are equal.
117
118        @param line: an event string line.
119
120        @returns: True if two events are equal. False otherwise.
121
122        """
123        return (self.type == other.type and
124                self.code == other.code and
125                self.value == other.value)
126
127
128    def __str__(self):
129        """A string representation of the event.
130
131        @returns: a string representation of the event.
132
133        """
134        return '%d %d %d' % (self.type, self.code, self.value)
135
136
137class InputEventRecorder(object):
138    """An input event recorder.
139
140    Refer to recording_example() below about how to record input events.
141
142    """
143
144    INPUT_DEVICE_INFO_FILE = '/proc/bus/input/devices'
145    SELECT_TIMEOUT_SECS = 1
146
147    def __init__(self, device_name):
148        """Construction of input event recorder.
149
150        @param device_name: the device name of the input device node to record.
151
152        """
153        self.device_name = device_name
154        self.device_node = self.get_device_node_by_name(device_name)
155        if self.device_node is None:
156            err_msg = 'Failed to find the device node of %s' % device_name
157            raise InputEventRecorderError(err_msg)
158        self._recording_thread = None
159        self._stop_recording_thread_event = threading.Event()
160        self.tmp_file = '/tmp/evtest.dat'
161        self.events = []
162
163
164    def get_device_node_by_name(self, device_name):
165        """Get the input device node by name.
166
167        Example of a RN-42 emulated mouse device information looks like
168
169        I: Bus=0005 Vendor=0000 Product=0000 Version=0000
170        N: Name="RNBT-A96F"
171        P: Phys=6c:29:95:1a:b8:18
172        S: Sysfs=/devices/pci0000:00/0000:00:14.0/usb1/1-8/1-8:1.0/bluetooth/hci0/hci0:512:29/0005:0000:0000.0004/input/input15
173        U: Uniq=00:06:66:75:a9:6f
174        H: Handlers=event12
175        B: PROP=0
176        B: EV=17
177        B: KEY=70000 0 0 0 0
178        B: REL=103
179        B: MSC=10
180
181        @param device_name: the device name of the target input device node.
182
183        @returns: the corresponding device node of the device.
184
185        """
186        device_node = None
187        device_found = None
188        device_pattern = re.compile('N: Name=.*%s' % device_name, re.I)
189        event_number_pattern = re.compile('H: Handlers=.*event(\d*)', re.I)
190        with open(self.INPUT_DEVICE_INFO_FILE) as info:
191            for line in info:
192                if device_found:
193                    result = event_number_pattern.search(line)
194                    if result:
195                        event_number = int(result.group(1))
196                        device_node = '/dev/input/event%d' % event_number
197                        break
198                else:
199                    device_found = device_pattern.search(line)
200        return device_node
201
202
203    def record(self):
204        """Record input events."""
205        logging.info('Recording input events of %s.', self.device_node)
206        cmd = 'evtest %s' % self.device_node
207        self._recorder = subprocess.Popen(cmd, stdout=subprocess.PIPE,
208                                          shell=True)
209        with open(self.tmp_file, 'w') as output_f:
210            while True:
211                read_list, _, _ = select.select(
212                        [self._recorder.stdout], [], [], 1)
213                if read_list:
214                    line = self._recorder.stdout.readline()
215                    output_f.write(line)
216                    ev = Event.from_string(line)
217                    if ev:
218                        self.events.append(ev.value_tuple())
219                elif self._stop_recording_thread_event.is_set():
220                    self._stop_recording_thread_event.clear()
221                    break
222
223
224    def start(self):
225        """Start the recording thread."""
226        logging.info('Start recording thread.')
227        self._recording_thread = threading.Thread(target=self.record)
228        self._recording_thread.start()
229
230
231    def stop(self):
232        """Stop the recording thread."""
233        logging.info('Stop recording thread.')
234        self._stop_recording_thread_event.set()
235        self._recording_thread.join()
236
237
238    def clear_events(self):
239        """Clear the event list."""
240        self.events = []
241
242
243    def get_events(self):
244        """Get the event list.
245
246        @returns: the event list.
247        """
248        return self.events
249
250
251SYN_EVENT = Event(EV_SYN, SYN_REPORT, 0)
252MSC_SCAN_BTN_EVENT = {'LEFT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_LEFT),
253                      'RIGHT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_RIGHT),
254                      'MIDDLE': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_MIDDLE)}
255
256
257def recording_example():
258    """Example code for capturing input events on a Samus.
259
260    For a quick swipe, it outputs events in numeric format:
261
262    (3, 57, 9)
263    (3, 53, 641)
264    (3, 54, 268)
265    (3, 58, 60)
266    (3, 48, 88)
267    (1, 330, 1)
268    (1, 325, 1)
269    (3, 0, 641)
270    (3, 1, 268)
271    (3, 24, 60)
272    (0, 0, 0)
273    (3, 53, 595)
274    (3, 54, 288)
275    (3, 0, 595)
276    (3, 1, 288)
277    (0, 0, 0)
278    (3, 57, -1)
279    (1, 330, 0)
280    (1, 325, 0)
281    (3, 24, 0)
282    (0, 0, 0)
283
284    The above events in corresponding evtest text format are:
285
286    Event: time .782950, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value 9
287    Event: time .782950, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 641
288    Event: time .782950, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 268
289    Event: time .782950, type 3 (EV_ABS), code 58 (ABS_MT_PRESSURE), value 60
290    Event: time .782950, type 3 (EV_ABS), code 59 (?), value 0
291    Event: time .782950, type 3 (EV_ABS), code 48 (ABS_MT_TOUCH_MAJOR), value 88
292    Event: time .782950, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 1
293    Event: time .782950, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 1
294    Event: time .782950, type 3 (EV_ABS), code 0 (ABS_X), value 641
295    Event: time .782950, type 3 (EV_ABS), code 1 (ABS_Y), value 268
296    Event: time .782950, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 60
297    Event: time .782950, -------------- SYN_REPORT ------------
298    Event: time .798273, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 595
299    Event: time .798273, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 288
300    Event: time .798273, type 3 (EV_ABS), code 0 (ABS_X), value 595
301    Event: time .798273, type 3 (EV_ABS), code 1 (ABS_Y), value 288
302    Event: time .798273, -------------- SYN_REPORT ------------
303    Event: time .821437, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value -1
304    Event: time .821437, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 0
305    Event: time .821437, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 0
306    Event: time .821437, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 0
307    Event: time .821437, -------------- SYN_REPORT ------------
308    """
309    device_name = 'Atmel maXTouch Touchpad'
310    recorder = InputEventRecorder(device_name)
311    print 'Samus touchpad device name:', recorder.device_name
312    print 'Samus touchpad device node:', recorder.device_node
313    print 'Please make gestures on the touchpad for up to 5 seconds.'
314    recorder.clear_events()
315    recorder.start()
316    time.sleep(5)
317    recorder.stop()
318    for e in recorder.get_events():
319        print e
320
321
322if __name__ == '__main__':
323    recording_example()
324