1# Copyright 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""This module provides functions to record input events.""" 6 7import logging 8import re 9import select 10import subprocess 11import threading 12import time 13 14from linux_input import EV_MSC, EV_SYN, MSC_SCAN, SYN_REPORT 15 16 17# Define extra misc events below as they are not defined in linux_input. 18MSC_SCAN_BTN_LEFT = 90001 19MSC_SCAN_BTN_RIGHT = 90002 20MSC_SCAN_BTN_MIDDLE = 90003 21 22 23class InputEventRecorderError(Exception): 24 """An exception class for input_event_recorder module.""" 25 pass 26 27 28class Event(object): 29 """An event class based on evtest constructed from an evtest event. 30 31 An ordinary event looks like: 32 Event: time 133082.748019, type 3 (EV_ABS), code 0 (ABS_X), value 316 33 34 A SYN_REPORT event looks like: 35 Event: time 10788.289613, -------------- SYN_REPORT ------------ 36 37 """ 38 39 def __init__(self, type=0, code=0, value=0): 40 """Construction of an input event. 41 42 @param type: the event type 43 @param code: the event code 44 @param value: the event value 45 46 """ 47 self.type = type 48 self.code = code 49 self.value= value 50 51 52 @staticmethod 53 def from_string(ev_string): 54 """Convert an event string to an event object. 55 56 @param ev_string: an event string. 57 58 @returns: an event object if the event string conforms to 59 event pattern format. None otherwise. 60 61 """ 62 # Get the pattern of an ordinary event 63 ev_pattern_time = r'Event:\s*time\s*(\d+\.\d+)' 64 ev_pattern_type = r'type\s*(\d+)\s*\(\w+\)' 65 ev_pattern_code = r'code\s*(\d+)\s*\(\w+\)' 66 ev_pattern_value = r'value\s*(-?\d+)' 67 ev_sep = r',\s*' 68 ev_pattern_str = ev_sep.join([ev_pattern_time, 69 ev_pattern_type, 70 ev_pattern_code, 71 ev_pattern_value]) 72 ev_pattern = re.compile(ev_pattern_str, re.I) 73 74 # Get the pattern of the SYN_REPORT event 75 ev_pattern_type_SYN_REPORT = r'-+\s*SYN_REPORT\s-+' 76 ev_pattern_SYN_REPORT_str = ev_sep.join([ev_pattern_time, 77 ev_pattern_type_SYN_REPORT]) 78 ev_pattern_SYN_REPORT = re.compile(ev_pattern_SYN_REPORT_str, re.I) 79 80 # Check if it is a SYN event. 81 result = ev_pattern_SYN_REPORT.search(ev_string) 82 if result: 83 return Event(EV_SYN, SYN_REPORT, 0) 84 else: 85 # Check if it is a regular event. 86 result = ev_pattern.search(ev_string) 87 if result: 88 ev_type = int(result.group(2)) 89 ev_code = int(result.group(3)) 90 ev_value = int(result.group(4)) 91 return Event(ev_type, ev_code, ev_value) 92 else: 93 logging.warn('not an event: %s', ev_string) 94 return None 95 96 97 def is_syn(self): 98 """Determine if the event is a SYN report event. 99 100 @returns: True if it is a SYN report event. False otherwise. 101 102 """ 103 return self.type == EV_SYN and self.code == SYN_REPORT 104 105 106 def value_tuple(self): 107 """A tuple of the event type, code, and value. 108 109 @returns: the tuple of the event type, code, and value. 110 111 """ 112 return (self.type, self.code, self.value) 113 114 115 def __eq__(self, other): 116 """determine if two events are equal. 117 118 @param line: an event string line. 119 120 @returns: True if two events are equal. False otherwise. 121 122 """ 123 return (self.type == other.type and 124 self.code == other.code and 125 self.value == other.value) 126 127 128 def __str__(self): 129 """A string representation of the event. 130 131 @returns: a string representation of the event. 132 133 """ 134 return '%d %d %d' % (self.type, self.code, self.value) 135 136 137class InputEventRecorder(object): 138 """An input event recorder. 139 140 Refer to recording_example() below about how to record input events. 141 142 """ 143 144 INPUT_DEVICE_INFO_FILE = '/proc/bus/input/devices' 145 SELECT_TIMEOUT_SECS = 1 146 147 def __init__(self, device_name): 148 """Construction of input event recorder. 149 150 @param device_name: the device name of the input device node to record. 151 152 """ 153 self.device_name = device_name 154 self.device_node = self.get_device_node_by_name(device_name) 155 if self.device_node is None: 156 err_msg = 'Failed to find the device node of %s' % device_name 157 raise InputEventRecorderError(err_msg) 158 self._recording_thread = None 159 self._stop_recording_thread_event = threading.Event() 160 self.tmp_file = '/tmp/evtest.dat' 161 self.events = [] 162 163 164 def get_device_node_by_name(self, device_name): 165 """Get the input device node by name. 166 167 Example of a RN-42 emulated mouse device information looks like 168 169 I: Bus=0005 Vendor=0000 Product=0000 Version=0000 170 N: Name="RNBT-A96F" 171 P: Phys=6c:29:95:1a:b8:18 172 S: Sysfs=/devices/pci0000:00/0000:00:14.0/usb1/1-8/1-8:1.0/bluetooth/hci0/hci0:512:29/0005:0000:0000.0004/input/input15 173 U: Uniq=00:06:66:75:a9:6f 174 H: Handlers=event12 175 B: PROP=0 176 B: EV=17 177 B: KEY=70000 0 0 0 0 178 B: REL=103 179 B: MSC=10 180 181 @param device_name: the device name of the target input device node. 182 183 @returns: the corresponding device node of the device. 184 185 """ 186 device_node = None 187 device_found = None 188 device_pattern = re.compile('N: Name=.*%s' % device_name, re.I) 189 event_number_pattern = re.compile('H: Handlers=.*event(\d*)', re.I) 190 with open(self.INPUT_DEVICE_INFO_FILE) as info: 191 for line in info: 192 if device_found: 193 result = event_number_pattern.search(line) 194 if result: 195 event_number = int(result.group(1)) 196 device_node = '/dev/input/event%d' % event_number 197 break 198 else: 199 device_found = device_pattern.search(line) 200 return device_node 201 202 203 def record(self): 204 """Record input events.""" 205 logging.info('Recording input events of %s.', self.device_node) 206 cmd = 'evtest %s' % self.device_node 207 self._recorder = subprocess.Popen(cmd, stdout=subprocess.PIPE, 208 shell=True) 209 with open(self.tmp_file, 'w') as output_f: 210 while True: 211 read_list, _, _ = select.select( 212 [self._recorder.stdout], [], [], 1) 213 if read_list: 214 line = self._recorder.stdout.readline() 215 output_f.write(line) 216 ev = Event.from_string(line) 217 if ev: 218 self.events.append(ev.value_tuple()) 219 elif self._stop_recording_thread_event.is_set(): 220 self._stop_recording_thread_event.clear() 221 break 222 223 224 def start(self): 225 """Start the recording thread.""" 226 logging.info('Start recording thread.') 227 self._recording_thread = threading.Thread(target=self.record) 228 self._recording_thread.start() 229 230 231 def stop(self): 232 """Stop the recording thread.""" 233 logging.info('Stop recording thread.') 234 self._stop_recording_thread_event.set() 235 self._recording_thread.join() 236 237 238 def clear_events(self): 239 """Clear the event list.""" 240 self.events = [] 241 242 243 def get_events(self): 244 """Get the event list. 245 246 @returns: the event list. 247 """ 248 return self.events 249 250 251SYN_EVENT = Event(EV_SYN, SYN_REPORT, 0) 252MSC_SCAN_BTN_EVENT = {'LEFT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_LEFT), 253 'RIGHT': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_RIGHT), 254 'MIDDLE': Event(EV_MSC, MSC_SCAN, MSC_SCAN_BTN_MIDDLE)} 255 256 257def recording_example(): 258 """Example code for capturing input events on a Samus. 259 260 For a quick swipe, it outputs events in numeric format: 261 262 (3, 57, 9) 263 (3, 53, 641) 264 (3, 54, 268) 265 (3, 58, 60) 266 (3, 48, 88) 267 (1, 330, 1) 268 (1, 325, 1) 269 (3, 0, 641) 270 (3, 1, 268) 271 (3, 24, 60) 272 (0, 0, 0) 273 (3, 53, 595) 274 (3, 54, 288) 275 (3, 0, 595) 276 (3, 1, 288) 277 (0, 0, 0) 278 (3, 57, -1) 279 (1, 330, 0) 280 (1, 325, 0) 281 (3, 24, 0) 282 (0, 0, 0) 283 284 The above events in corresponding evtest text format are: 285 286 Event: time .782950, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value 9 287 Event: time .782950, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 641 288 Event: time .782950, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 268 289 Event: time .782950, type 3 (EV_ABS), code 58 (ABS_MT_PRESSURE), value 60 290 Event: time .782950, type 3 (EV_ABS), code 59 (?), value 0 291 Event: time .782950, type 3 (EV_ABS), code 48 (ABS_MT_TOUCH_MAJOR), value 88 292 Event: time .782950, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 1 293 Event: time .782950, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 1 294 Event: time .782950, type 3 (EV_ABS), code 0 (ABS_X), value 641 295 Event: time .782950, type 3 (EV_ABS), code 1 (ABS_Y), value 268 296 Event: time .782950, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 60 297 Event: time .782950, -------------- SYN_REPORT ------------ 298 Event: time .798273, type 3 (EV_ABS), code 53 (ABS_MT_POSITION_X), value 595 299 Event: time .798273, type 3 (EV_ABS), code 54 (ABS_MT_POSITION_Y), value 288 300 Event: time .798273, type 3 (EV_ABS), code 0 (ABS_X), value 595 301 Event: time .798273, type 3 (EV_ABS), code 1 (ABS_Y), value 288 302 Event: time .798273, -------------- SYN_REPORT ------------ 303 Event: time .821437, type 3 (EV_ABS), code 57 (ABS_MT_TRACKING_ID), value -1 304 Event: time .821437, type 1 (EV_KEY), code 330 (BTN_TOUCH), value 0 305 Event: time .821437, type 1 (EV_KEY), code 325 (BTN_TOOL_FINGER), value 0 306 Event: time .821437, type 3 (EV_ABS), code 24 (ABS_PRESSURE), value 0 307 Event: time .821437, -------------- SYN_REPORT ------------ 308 """ 309 device_name = 'Atmel maXTouch Touchpad' 310 recorder = InputEventRecorder(device_name) 311 print 'Samus touchpad device name:', recorder.device_name 312 print 'Samus touchpad device node:', recorder.device_node 313 print 'Please make gestures on the touchpad for up to 5 seconds.' 314 recorder.clear_events() 315 recorder.start() 316 time.sleep(5) 317 recorder.stop() 318 for e in recorder.get_events(): 319 print e 320 321 322if __name__ == '__main__': 323 recording_example() 324