• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# vim: set expandtab shiftwidth=4:
3# -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */
4#
5# Copyright © 2018 Red Hat, Inc.
6#
7# Permission is hereby granted, free of charge, to any person obtaining a
8# copy of this software and associated documentation files (the "Software"),
9# to deal in the Software without restriction, including without limitation
10# the rights to use, copy, modify, merge, publish, distribute, sublicense,
11# and/or sell copies of the Software, and to permit persons to whom the
12# Software is furnished to do so, subject to the following conditions:
13#
14# The above copyright notice and this permission notice (including the next
15# paragraph) shall be included in all copies or substantial portions of the
16# Software.
17#
18# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
21# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
23# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
24# DEALINGS IN THE SOFTWARE.
25
26import os
27import sys
28import time
29import multiprocessing
30import argparse
31from pathlib import Path
32
33try:
34    import libevdev
35    import yaml
36except ModuleNotFoundError as e:
37    print('Error: {}'.format(e), file=sys.stderr)
38    print('One or more python modules are missing. Please install those '
39          'modules and re-run this tool.')
40    sys.exit(1)
41
42
43SUPPORTED_FILE_VERSION = 1
44
45
46def error(msg, **kwargs):
47    print(msg, **kwargs, file=sys.stderr)
48
49
50class YamlException(Exception):
51    pass
52
53
54def fetch(yaml, key):
55    '''Helper function to avoid confusing a YAML error with a
56    normal KeyError bug'''
57    try:
58        return yaml[key]
59    except KeyError:
60        raise YamlException('Failed to get \'{}\' from recording.'.format(key))
61
62
63def create(device):
64    evdev = fetch(device, 'evdev')
65
66    d = libevdev.Device()
67    d.name = fetch(evdev, 'name')
68
69    ids = fetch(evdev, 'id')
70    if len(ids) != 4:
71        raise YamlException('Invalid ID format: {}'.format(ids))
72    d.id = dict(zip(['bustype', 'vendor', 'product', 'version'], ids))
73
74    codes = fetch(evdev, 'codes')
75    for evtype, evcodes in codes.items():
76        for code in evcodes:
77            data = None
78            if evtype == libevdev.EV_ABS.value:
79                values = fetch(evdev, 'absinfo')[code]
80                absinfo = libevdev.InputAbsInfo(minimum=values[0],
81                                                maximum=values[1],
82                                                fuzz=values[2],
83                                                flat=values[3],
84                                                resolution=values[4])
85                data = absinfo
86            elif evtype == libevdev.EV_REP.value:
87                if code == libevdev.EV_REP.REP_DELAY.value:
88                    data = 500
89                elif code == libevdev.EV_REP.REP_PERIOD.value:
90                    data = 20
91            d.enable(libevdev.evbit(evtype, code), data=data)
92
93    properties = fetch(evdev, 'properties')
94    for prop in properties:
95        d.enable(libevdev.propbit(prop))
96
97    uinput = d.create_uinput_device()
98    return uinput
99
100
101def print_events(devnode, indent, evs):
102    devnode = os.path.basename(devnode)
103    for e in evs:
104        print("{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format(
105            devnode, ' ' * (indent * 8), e.sec, e.usec, e.type.name, e.code.name, e.value))
106
107
108def replay(device, verbose):
109    events = fetch(device, 'events')
110    if events is None:
111        return
112    uinput = device['__uinput']
113
114    # The first event may have a nonzero offset but we want to replay
115    # immediately regardless. When replaying multiple devices, the first
116    # offset is the offset from the first event on any device.
117    offset = time.time() - device['__first_event_offset']
118
119    if offset < 0:
120        error('WARNING: event time offset is in the future, refusing to replay')
121        return
122
123    # each 'evdev' set contains one SYN_REPORT so we only need to check for
124    # the time offset once per event
125    for event in events:
126        try:
127            evdev = fetch(event, 'evdev')
128        except YamlException:
129            continue
130
131        (sec, usec, evtype, evcode, value) = evdev[0]
132        evtime = sec + usec / 1e6 + offset
133        now = time.time()
134
135        if evtime - now > 150 / 1e6:  # 150 µs error margin
136            time.sleep(evtime - now - 150 / 1e6)
137
138        evs = [libevdev.InputEvent(libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]) for e in evdev]
139        uinput.send_events(evs)
140        if verbose:
141            print_events(uinput.devnode, device['__index'], evs)
142
143
144def first_timestamp(device):
145    try:
146        events = fetch(device, 'events')
147        if events is None:
148            raise YamlException('No events from this device')
149
150        evdev = fetch(events[0], 'evdev')
151        (sec, usec, *_) = evdev[0]
152
153        return sec + usec / 1.e6
154
155    except YamlException:
156        import math
157        return math.inf
158
159
160def wrap(func, *args):
161    try:
162        func(*args)
163    except KeyboardInterrupt:
164        pass
165
166
167def loop(args, recording):
168    devices = fetch(recording, 'devices')
169
170    # All devices need to start replaying at the same time, so let's find
171    # the very first event and offset everything by that timestamp.
172    toffset = min([first_timestamp(d) for d in devices])
173
174    for idx, d in enumerate(devices):
175        uinput = create(d)
176        print('{}: {}'.format(uinput.devnode, uinput.name))
177        d['__uinput'] = uinput  # cheaper to hide it in the dict then work around it
178        d['__index'] = idx
179        d['__first_event_offset'] = toffset
180
181    while True:
182        input('Hit enter to start replaying')
183
184        processes = []
185        for d in devices:
186            p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose))
187            processes.append(p)
188
189        for p in processes:
190            p.start()
191
192        for p in processes:
193            p.join()
194
195        del processes
196
197
198def create_device_quirk(device):
199    try:
200        quirks = fetch(device, 'quirks')
201        if not quirks:
202            return None
203    except YamlException:
204        return None
205    # Where the device has a quirk, we match on name, vendor and product.
206    # That's the best match we can assemble here from the info we have.
207    evdev = fetch(device, 'evdev')
208    name = fetch(evdev, 'name')
209    id = fetch(evdev, 'id')
210    quirk = ('[libinput-replay {name}]\n'
211             'MatchName={name}\n'
212             'MatchVendor=0x{id[1]:04X}\n'
213             'MatchProduct=0x{id[2]:04X}\n').format(name=name, id=id)
214    quirk += '\n'.join(quirks)
215    return quirk
216
217
218def setup_quirks(recording):
219    devices = fetch(recording, 'devices')
220    overrides = None
221    quirks = []
222    for d in devices:
223        if 'quirks' in d:
224            quirk = create_device_quirk(d)
225            if quirk:
226                quirks.append(quirk)
227    if not quirks:
228        return None
229
230    overrides = Path('/etc/libinput/local-overrides.quirks')
231    if overrides.exists():
232        print('{} exists, please move it out of the way first'.format(overrides), file=sys.stderr)
233        sys.exit(1)
234
235    overrides.parent.mkdir(exist_ok=True)
236    with overrides.open('w+') as fd:
237        fd.write('# This file was generated by libinput replay\n')
238        fd.write('# Unless libinput replay is running right now, remove this file.\n')
239        fd.write('\n\n'.join(quirks))
240
241    return overrides
242
243
244def check_file(recording):
245    version = fetch(recording, 'version')
246    if version != SUPPORTED_FILE_VERSION:
247        raise YamlException('Invalid file format: {}, expected {}'.format(version, SUPPORTED_FILE_VERSION))
248
249    ndevices = fetch(recording, 'ndevices')
250    devices = fetch(recording, 'devices')
251    if ndevices != len(devices):
252        error('WARNING: truncated file, expected {} devices, got {}'.format(ndevices, len(devices)))
253
254
255def main():
256    parser = argparse.ArgumentParser(description='Replay a device recording')
257    parser.add_argument('recording', metavar='recorded-file.yaml',
258                        type=str, help='Path to device recording')
259    parser.add_argument('--verbose', action='store_true')
260    args = parser.parse_args()
261
262    quirks_file = None
263
264    try:
265        with open(args.recording) as f:
266            y = yaml.safe_load(f)
267            check_file(y)
268            quirks_file = setup_quirks(y)
269            loop(args, y)
270    except KeyboardInterrupt:
271        pass
272    except (PermissionError, OSError) as e:
273        error('Error: failed to open device: {}'.format(e))
274    except YamlException as e:
275        error('Error: failed to parse recording: {}'.format(e))
276    finally:
277        if quirks_file:
278            quirks_file.unlink()
279
280
281if __name__ == '__main__':
282    main()
283