#!/usr/bin/env python3 # vim: set expandtab shiftwidth=4: # -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */ # # Copyright © 2018 Red Hat, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next # paragraph) shall be included in all copies or substantial portions of the # Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. import os import sys import time import math import multiprocessing import argparse from pathlib import Path try: import libevdev import yaml import pyudev except ModuleNotFoundError as e: print("Error: {}".format(e), file=sys.stderr) print( "One or more python modules are missing. Please install those " "modules and re-run this tool." ) sys.exit(1) SUPPORTED_FILE_VERSION = 1 def error(msg, **kwargs): print(msg, **kwargs, file=sys.stderr) class YamlException(Exception): pass def fetch(yaml, key): """Helper function to avoid confusing a YAML error with a normal KeyError bug""" try: return yaml[key] except KeyError: raise YamlException("Failed to get '{}' from recording.".format(key)) def check_udev_properties(yaml_data, uinput): """ Compare the properties our new uinput device has with the ones from the recording and ring the alarm bell if one of them is off. """ yaml_udev_section = fetch(yaml_data, "udev") yaml_udev_props = fetch(yaml_udev_section, "properties") yaml_props = { k: v for (k, v) in [prop.split("=", maxsplit=1) for prop in yaml_udev_props] } try: # We don't assign this one to virtual devices del yaml_props["LIBINPUT_DEVICE_GROUP"] except KeyError: pass # give udev some time to catch up time.sleep(0.2) context = pyudev.Context() udev_device = pyudev.Devices.from_device_file(context, uinput.devnode) for name, value in udev_device.properties.items(): if name in yaml_props: if yaml_props[name] != value: error( f"Warning: udev property mismatch: recording has {name}={yaml_props[name]}, device has {name}={value}" ) del yaml_props[name] else: # The list of properties we add to the recording, see libinput-record.c prefixes = ( "ID_INPUT", "LIBINPUT", "EVDEV_ABS", "MOUSE_DPI", "POINTINGSTICK_", ) for prefix in prefixes: if name.startswith(prefix): error(f"Warning: unexpected property: {name}={value}") # the ones we found above were removed from the dict for name, value in yaml_props.items(): error(f"Warning: device is missing recorded udev property: {name}={value}") def create(device): evdev = fetch(device, "evdev") d = libevdev.Device() d.name = fetch(evdev, "name") ids = fetch(evdev, "id") if len(ids) != 4: raise YamlException("Invalid ID format: {}".format(ids)) d.id = dict(zip(["bustype", "vendor", "product", "version"], ids)) codes = fetch(evdev, "codes") for evtype, evcodes in codes.items(): for code in evcodes: data = None if evtype == libevdev.EV_ABS.value: values = fetch(evdev, "absinfo")[code] absinfo = libevdev.InputAbsInfo( minimum=values[0], maximum=values[1], fuzz=values[2], flat=values[3], resolution=values[4], ) data = absinfo elif evtype == libevdev.EV_REP.value: if code == libevdev.EV_REP.REP_DELAY.value: data = 500 elif code == libevdev.EV_REP.REP_PERIOD.value: data = 20 d.enable(libevdev.evbit(evtype, code), data=data) properties = fetch(evdev, "properties") for prop in properties: d.enable(libevdev.propbit(prop)) uinput = d.create_uinput_device() check_udev_properties(device, uinput) return uinput def print_events(devnode, indent, evs): devnode = os.path.basename(devnode) for e in evs: print( "{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format( devnode, " " * (indent * 8), e.sec, e.usec, e.type.name, e.code.name, e.value, ) ) def collect_events(frame): evs = [] events_skipped = False for (sec, usec, evtype, evcode, value) in frame: if evtype == libevdev.EV_KEY.value and value == 2: # key repeat events_skipped = True continue e = libevdev.InputEvent( libevdev.evbit(evtype, evcode), value=value, sec=sec, usec=usec ) evs.append(e) # If we skipped some events and now all we have left is the # SYN_REPORTs, we drop the SYN_REPORTs as well. if events_skipped and all(e for e in evs if e.matches(libevdev.EV_SYN.SYN_REPORT)): return [] else: return evs def replay(device, verbose): events = fetch(device, "events") if events is None: return uinput = device["__uinput"] # The first event may have a nonzero offset but we want to replay # immediately regardless. When replaying multiple devices, the first # offset is the offset from the first event on any device. offset = time.time() - device["__first_event_offset"] if offset < 0: error("WARNING: event time offset is in the future, refusing to replay") return # each 'evdev' set contains one SYN_REPORT so we only need to check for # the time offset once per event for event in events: try: evdev = fetch(event, "evdev") except YamlException: continue evs = collect_events(evdev) if not evs: continue evtime = evs[0].sec + evs[0].usec / 1e6 + offset now = time.time() if evtime - now > 150 / 1e6: # 150 µs error margin time.sleep(evtime - now - 150 / 1e6) uinput.send_events(evs) if verbose: print_events(uinput.devnode, device["__index"], evs) def first_timestamp(device): events = fetch(device, "events") for e in events or []: try: evdev = fetch(e, "evdev") (sec, usec, *_) = evdev[0] return sec + usec / 1.0e6 except YamlException: pass return None def wrap(func, *args): try: func(*args) except KeyboardInterrupt: pass def loop(args, recording): devices = fetch(recording, "devices") first_timestamps = tuple( filter(lambda x: x is not None, [first_timestamp(d) for d in devices]) ) # All devices need to start replaying at the same time, so let's find # the very first event and offset everything by that timestamp. toffset = min(first_timestamps or [math.inf]) for idx, d in enumerate(devices): uinput = create(d) print("{}: {}".format(uinput.devnode, uinput.name)) d["__uinput"] = uinput # cheaper to hide it in the dict then work around it d["__index"] = idx d["__first_event_offset"] = toffset if not first_timestamps: input("No events in recording. Hit enter to quit") return while True: input("Hit enter to start replaying") processes = [] for d in devices: p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose)) processes.append(p) for p in processes: p.start() for p in processes: p.join() del processes def create_device_quirk(device): try: quirks = fetch(device, "quirks") if not quirks: return None except YamlException: return None # Where the device has a quirk, we match on name, vendor and product. # That's the best match we can assemble here from the info we have. evdev = fetch(device, "evdev") name = fetch(evdev, "name") id = fetch(evdev, "id") quirk = ( "[libinput-replay {name}]\n" "MatchName={name}\n" "MatchVendor=0x{id[1]:04X}\n" "MatchProduct=0x{id[2]:04X}\n" ).format(name=name, id=id) quirk += "\n".join(quirks) return quirk def setup_quirks(recording): devices = fetch(recording, "devices") overrides = None quirks = [] for d in devices: if "quirks" in d: quirk = create_device_quirk(d) if quirk: quirks.append(quirk) if not quirks: return None overrides = Path("/etc/libinput/local-overrides.quirks") if overrides.exists(): print( "{} exists, please move it out of the way first".format(overrides), file=sys.stderr, ) sys.exit(1) overrides.parent.mkdir(exist_ok=True) with overrides.open("w+") as fd: fd.write("# This file was generated by libinput replay\n") fd.write("# Unless libinput replay is running right now, remove this file.\n") fd.write("\n\n".join(quirks)) return overrides def check_file(recording): version = fetch(recording, "version") if version != SUPPORTED_FILE_VERSION: raise YamlException( "Invalid file format: {}, expected {}".format( version, SUPPORTED_FILE_VERSION ) ) ndevices = fetch(recording, "ndevices") devices = fetch(recording, "devices") if ndevices != len(devices): error( "WARNING: truncated file, expected {} devices, got {}".format( ndevices, len(devices) ) ) def main(): parser = argparse.ArgumentParser(description="Replay a device recording") parser.add_argument( "recording", metavar="recorded-file.yaml", type=str, help="Path to device recording", ) parser.add_argument("--verbose", action="store_true") args = parser.parse_args() quirks_file = None try: with open(args.recording) as f: y = yaml.safe_load(f) check_file(y) quirks_file = setup_quirks(y) loop(args, y) except KeyboardInterrupt: pass except (PermissionError, OSError) as e: error("Error: failed to open device: {}".format(e)) except YamlException as e: error("Error: failed to parse recording: {}".format(e)) finally: if quirks_file: quirks_file.unlink() if __name__ == "__main__": main()