1#!/usr/bin/env python3 2# vim: set expandtab shiftwidth=4: 3# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */ 4# 5# Copyright © 2018 Red Hat, Inc. 6# 7# Permission is hereby granted, free of charge, to any person obtaining a 8# copy of this software and associated documentation files (the "Software"), 9# to deal in the Software without restriction, including without limitation 10# the rights to use, copy, modify, merge, publish, distribute, sublicense, 11# and/or sell copies of the Software, and to permit persons to whom the 12# Software is furnished to do so, subject to the following conditions: 13# 14# The above copyright notice and this permission notice (including the next 15# paragraph) shall be included in all copies or substantial portions of the 16# Software. 17# 18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 21# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 23# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 24# DEALINGS IN THE SOFTWARE. 25 26import os 27import sys 28import time 29import math 30import multiprocessing 31import argparse 32from pathlib import Path 33 34try: 35 import libevdev 36 import yaml 37 import pyudev 38except ModuleNotFoundError as e: 39 print("Error: {}".format(e), file=sys.stderr) 40 print( 41 "One or more python modules are missing. Please install those " 42 "modules and re-run this tool." 43 ) 44 sys.exit(1) 45 46 47SUPPORTED_FILE_VERSION = 1 48 49 50def error(msg, **kwargs): 51 print(msg, **kwargs, file=sys.stderr) 52 53 54class YamlException(Exception): 55 pass 56 57 58def fetch(yaml, key): 59 """Helper function to avoid confusing a YAML error with a 60 normal KeyError bug""" 61 try: 62 return yaml[key] 63 except KeyError: 64 raise YamlException("Failed to get '{}' from recording.".format(key)) 65 66 67def check_udev_properties(yaml_data, uinput): 68 """ 69 Compare the properties our new uinput device has with the ones from the 70 recording and ring the alarm bell if one of them is off. 71 """ 72 yaml_udev_section = fetch(yaml_data, "udev") 73 yaml_udev_props = fetch(yaml_udev_section, "properties") 74 yaml_props = { 75 k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props] 76 } 77 try: 78 # We don't assign this one to virtual devices 79 del yaml_props["LIBINPUT_DEVICE_GROUP"] 80 except KeyError: 81 pass 82 83 # give udev some time to catch up 84 time.sleep(0.2) 85 context = pyudev.Context() 86 udev_device = pyudev.Devices.from_device_file(context, uinput.devnode) 87 for name, value in udev_device.properties.items(): 88 if name in yaml_props: 89 if yaml_props[name] != value: 90 error( 91 f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}" 92 ) 93 del yaml_props[name] 94 else: 95 # The list of properties we add to the recording, see libinput-record.c 96 prefixes = ( 97 "ID_INPUT", 98 "LIBINPUT", 99 "EVDEV_ABS", 100 "MOUSE_DPI", 101 "POINTINGSTICK_", 102 ) 103 for prefix in prefixes: 104 if name.startswith(prefix): 105 error(f"Warning: unexpected property: {name}={value}") 106 107 # the ones we found above were removed from the dict 108 for name, value in yaml_props.items(): 109 error(f"Warning: device is missing recorded udev property: {name}={value}") 110 111 112def create(device): 113 evdev = fetch(device, "evdev") 114 115 d = libevdev.Device() 116 d.name = fetch(evdev, "name") 117 118 ids = fetch(evdev, "id") 119 if len(ids) != 4: 120 raise YamlException("Invalid ID format: {}".format(ids)) 121 d.id = dict(zip(["bustype", "vendor", "product", "version"], ids)) 122 123 codes = fetch(evdev, "codes") 124 for evtype, evcodes in codes.items(): 125 for code in evcodes: 126 data = None 127 if evtype == libevdev.EV_ABS.value: 128 values = fetch(evdev, "absinfo")[code] 129 absinfo = libevdev.InputAbsInfo( 130 minimum=values[0], 131 maximum=values[1], 132 fuzz=values[2], 133 flat=values[3], 134 resolution=values[4], 135 ) 136 data = absinfo 137 elif evtype == libevdev.EV_REP.value: 138 if code == libevdev.EV_REP.REP_DELAY.value: 139 data = 500 140 elif code == libevdev.EV_REP.REP_PERIOD.value: 141 data = 20 142 d.enable(libevdev.evbit(evtype, code), data=data) 143 144 properties = fetch(evdev, "properties") 145 for prop in properties: 146 d.enable(libevdev.propbit(prop)) 147 148 uinput = d.create_uinput_device() 149 150 check_udev_properties(device, uinput) 151 152 return uinput 153 154 155def print_events(devnode, indent, evs): 156 devnode = os.path.basename(devnode) 157 for e in evs: 158 print( 159 "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format( 160 devnode, 161 " " * (indent * 8), 162 e.sec, 163 e.usec, 164 e.type.name, 165 e.code.name, 166 e.value, 167 ) 168 ) 169 170 171def collect_events(frame): 172 evs = [] 173 events_skipped = False 174 for (sec, usec, evtype, evcode, value) in frame: 175 if evtype == libevdev.EV_KEY.value and value == 2: # key repeat 176 events_skipped = True 177 continue 178 179 e = libevdev.InputEvent( 180 libevdev.evbit(evtype, evcode), value=value, sec=sec, usec=usec 181 ) 182 evs.append(e) 183 184 # If we skipped some events and now all we have left is the 185 # SYN_REPORTs, we drop the SYN_REPORTs as well. 186 if events_skipped and all(e for e in evs if e.matches(libevdev.EV_SYN.SYN_REPORT)): 187 return [] 188 else: 189 return evs 190 191 192def replay(device, verbose): 193 events = fetch(device, "events") 194 if events is None: 195 return 196 uinput = device["__uinput"] 197 198 # The first event may have a nonzero offset but we want to replay 199 # immediately regardless. When replaying multiple devices, the first 200 # offset is the offset from the first event on any device. 201 offset = time.time() - device["__first_event_offset"] 202 203 if offset < 0: 204 error("WARNING: event time offset is in the future, refusing to replay") 205 return 206 207 # each 'evdev' set contains one SYN_REPORT so we only need to check for 208 # the time offset once per event 209 for event in events: 210 try: 211 evdev = fetch(event, "evdev") 212 except YamlException: 213 continue 214 215 evs = collect_events(evdev) 216 if not evs: 217 continue 218 219 evtime = evs[0].sec + evs[0].usec / 1e6 + offset 220 now = time.time() 221 222 if evtime - now > 150 / 1e6: # 150 µs error margin 223 time.sleep(evtime - now - 150 / 1e6) 224 225 uinput.send_events(evs) 226 if verbose: 227 print_events(uinput.devnode, device["__index"], evs) 228 229 230def first_timestamp(device): 231 events = fetch(device, "events") 232 for e in events or []: 233 try: 234 evdev = fetch(e, "evdev") 235 (sec, usec, *_) = evdev[0] 236 return sec + usec / 1.0e6 237 except YamlException: 238 pass 239 240 return None 241 242 243def wrap(func, *args): 244 try: 245 func(*args) 246 except KeyboardInterrupt: 247 pass 248 249 250def loop(args, recording): 251 devices = fetch(recording, "devices") 252 253 first_timestamps = tuple( 254 filter(lambda x: x is not None, [first_timestamp(d) for d in devices]) 255 ) 256 # All devices need to start replaying at the same time, so let's find 257 # the very first event and offset everything by that timestamp. 258 toffset = min(first_timestamps or [math.inf]) 259 260 for idx, d in enumerate(devices): 261 uinput = create(d) 262 print("{}: {}".format(uinput.devnode, uinput.name)) 263 d["__uinput"] = uinput # cheaper to hide it in the dict then work around it 264 d["__index"] = idx 265 d["__first_event_offset"] = toffset 266 267 if not first_timestamps: 268 input("No events in recording. Hit enter to quit") 269 return 270 271 while True: 272 input("Hit enter to start replaying") 273 274 processes = [] 275 for d in devices: 276 p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose)) 277 processes.append(p) 278 279 for p in processes: 280 p.start() 281 282 for p in processes: 283 p.join() 284 285 del processes 286 287 288def create_device_quirk(device): 289 try: 290 quirks = fetch(device, "quirks") 291 if not quirks: 292 return None 293 except YamlException: 294 return None 295 # Where the device has a quirk, we match on name, vendor and product. 296 # That's the best match we can assemble here from the info we have. 297 evdev = fetch(device, "evdev") 298 name = fetch(evdev, "name") 299 id = fetch(evdev, "id") 300 quirk = ( 301 "[libinput-replay {name}]\n" 302 "MatchName={name}\n" 303 "MatchVendor=0x{id[1]:04X}\n" 304 "MatchProduct=0x{id[2]:04X}\n" 305 ).format(name=name, id=id) 306 quirk += "\n".join(quirks) 307 return quirk 308 309 310def setup_quirks(recording): 311 devices = fetch(recording, "devices") 312 overrides = None 313 quirks = [] 314 for d in devices: 315 if "quirks" in d: 316 quirk = create_device_quirk(d) 317 if quirk: 318 quirks.append(quirk) 319 if not quirks: 320 return None 321 322 overrides = Path("/etc/libinput/local-overrides.quirks") 323 if overrides.exists(): 324 print( 325 "{} exists, please move it out of the way first".format(overrides), 326 file=sys.stderr, 327 ) 328 sys.exit(1) 329 330 overrides.parent.mkdir(exist_ok=True) 331 with overrides.open("w+") as fd: 332 fd.write("# This file was generated by libinput replay\n") 333 fd.write("# Unless libinput replay is running right now, remove this file.\n") 334 fd.write("\n\n".join(quirks)) 335 336 return overrides 337 338 339def check_file(recording): 340 version = fetch(recording, "version") 341 if version != SUPPORTED_FILE_VERSION: 342 raise YamlException( 343 "Invalid file format: {}, expected {}".format( 344 version, SUPPORTED_FILE_VERSION 345 ) 346 ) 347 348 ndevices = fetch(recording, "ndevices") 349 devices = fetch(recording, "devices") 350 if ndevices != len(devices): 351 error( 352 "WARNING: truncated file, expected {} devices, got {}".format( 353 ndevices, len(devices) 354 ) 355 ) 356 357 358def main(): 359 parser = argparse.ArgumentParser(description="Replay a device recording") 360 parser.add_argument( 361 "recording", 362 metavar="recorded-file.yaml", 363 type=str, 364 help="Path to device recording", 365 ) 366 parser.add_argument("--verbose", action="store_true") 367 args = parser.parse_args() 368 369 quirks_file = None 370 371 try: 372 with open(args.recording) as f: 373 y = yaml.safe_load(f) 374 check_file(y) 375 quirks_file = setup_quirks(y) 376 loop(args, y) 377 except KeyboardInterrupt: 378 pass 379 except (PermissionError, OSError) as e: 380 error("Error: failed to open device: {}".format(e)) 381 except YamlException as e: 382 error("Error: failed to parse recording: {}".format(e)) 383 finally: 384 if quirks_file: 385 quirks_file.unlink() 386 387 388if __name__ == "__main__": 389 main() 390