#!/usr/bin/env python3 # vim: set expandtab shiftwidth=4: # -*- Mode: python; coding: utf-8; indent-tabs-mode: nil -*- */ # # Copyright © 2018 Red Hat, Inc. # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the "Software"), # to deal in the Software without restriction, including without limitation # the rights to use, copy, modify, merge, publish, distribute, sublicense, # and/or sell copies of the Software, and to permit persons to whom the # Software is furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice (including the next # paragraph) shall be included in all copies or substantial portions of the # Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER # DEALINGS IN THE SOFTWARE. import os import sys import time import multiprocessing import argparse try: import libevdev import yaml except ModuleNotFoundError as e: print('Error: {}'.format(e), file=sys.stderr) print('One or more python modules are missing. Please install those ' 'modules and re-run this tool.') sys.exit(1) SUPPORTED_FILE_VERSION = 1 def error(msg, **kwargs): print(msg, **kwargs, file=sys.stderr) class YamlException(Exception): pass def fetch(yaml, key): '''Helper function to avoid confusing a YAML error with a normal KeyError bug''' try: return yaml[key] except KeyError: raise YamlException('Failed to get \'{}\' from recording.'.format(key)) def create(device): evdev = fetch(device, 'evdev') d = libevdev.Device() d.name = fetch(evdev, 'name') ids = fetch(evdev, 'id') if len(ids) != 4: raise YamlException('Invalid ID format: {}'.format(ids)) d.id = dict(zip(['bustype', 'vendor', 'product', 'version'], ids)) codes = fetch(evdev, 'codes') for evtype, evcodes in codes.items(): for code in evcodes: data = None if evtype == libevdev.EV_ABS.value: values = fetch(evdev, 'absinfo')[code] absinfo = libevdev.InputAbsInfo(minimum=values[0], maximum=values[1], fuzz=values[2], flat=values[3], resolution=values[4]) data = absinfo elif evtype == libevdev.EV_REP.value: if code == libevdev.EV_REP.REP_DELAY.value: data = 500 elif code == libevdev.EV_REP.REP_PERIOD.value: data = 20 d.enable(libevdev.evbit(evtype, code), data=data) uinput = d.create_uinput_device() return uinput def print_events(devnode, indent, evs): devnode = os.path.basename(devnode) for e in evs: print("{}: {}{:06d}.{:06d} {} / {:<20s} {:4d}".format( devnode, ' ' * (indent * 8), e.sec, e.usec, e.type.name, e.code.name, e.value)) def replay(device, verbose): events = fetch(device, 'events') if events is None: return uinput = device['__uinput'] offset = time.time() handled_first_event = False # each 'evdev' set contains one SYN_REPORT so we only need to check for # the time offset once per event for event in events: try: evdev = fetch(event, 'evdev') except YamlException: continue (sec, usec, evtype, evcode, value) = evdev[0] # The first event may have a nonzero offset but we want to replay # immediately regardless. if not handled_first_event: offset -= sec + usec/1.e6 handled_first_event = True evtime = sec + usec/1e6 + offset now = time.time() if evtime - now > 150/1e6: # 150 µs error margin time.sleep(evtime - now - 150/1e6) evs = [libevdev.InputEvent(libevdev.evbit(e[2], e[3]), value=e[4], sec=e[0], usec=e[1]) for e in evdev] uinput.send_events(evs) if verbose: print_events(uinput.devnode, device['__index'], evs) def wrap(func, *args): try: func(*args) except KeyboardInterrupt: pass def loop(args, recording): version = fetch(recording, 'version') if version != SUPPORTED_FILE_VERSION: raise YamlException('Invalid file format: {}, expected {}'.format(version, SUPPORTED_FILE_VERSION)) ndevices = fetch(recording, 'ndevices') devices = fetch(recording, 'devices') if ndevices != len(devices): error('WARNING: truncated file, expected {} devices, got {}'.format(ndevices, len(devices))) for idx, d in enumerate(devices): uinput = create(d) print('{}: {}'.format(uinput.devnode, uinput.name)) d['__uinput'] = uinput # cheaper to hide it in the dict then work around it d['__index'] = idx stop = False while not stop: input('Hit enter to start replaying') processes = [] for d in devices: p = multiprocessing.Process(target=wrap, args=(replay, d, args.verbose)) processes.append(p) for p in processes: p.start() for p in processes: p.join() del processes def main(): parser = argparse.ArgumentParser( description='Replay a device recording' ) parser.add_argument('recording', metavar='recorded-file.yaml', type=str, help='Path to device recording') parser.add_argument('--verbose', action='store_true') args = parser.parse_args() try: with open(args.recording) as f: y = yaml.safe_load(f) loop(args, y) except KeyboardInterrupt: pass except (PermissionError, OSError) as e: error('Error: failed to open device: {}'.format(e)) except YamlException as e: error('Error: failed to parse recording: {}'.format(e)) if __name__ == '__main__': main()