diff options
author | Frederic Martinsons <frederic.martinsons@sigfox.com> | 2020-11-10 16:48:22 +0100 |
---|---|---|
committer | Frederic Martinsons <frederic.martinsons@sigfox.com> | 2020-11-12 18:34:59 +0100 |
commit | 519df2544b04ccbc9cf87fbfbdbbaf44edeef75b (patch) | |
tree | 9bb60dbf7f684a2131878964ea5c973a03d35cf9 | |
parent | 06f75500dbc3f471bf0c40221bbfb5f970f65f32 (diff) |
tools: add new system daemon stub tester application
This script mock some of ModemManager DBus interfaces (main object, Modem, Sim),
it also adds a test interface "org.freedesktop.ModemManager1.LibmmGlibTest"
which allow to inject some errors and simulate behavior:
- Add a modem object
- Emit modem state changed
- Set modem error
This script also add the possibility to log in a file for debugging purpose.
Because the stdout/stderr are not shown when the program is spawned by DBus
auto activation.
Note: Script is heavily inspired from test-networkmanager-service.py from
NetworkManager project
Signed-off-by: Frederic Martinsons <frederic.martinsons@sigfox.com>
-rw-r--r-- | Makefile.am | 1 | ||||
-rw-r--r-- | configure.ac | 1 | ||||
-rw-r--r-- | tools/Makefile.am | 1 | ||||
-rwxr-xr-x | tools/test-modemmanager-service.py | 490 |
4 files changed, 493 insertions, 0 deletions
diff --git a/Makefile.am b/Makefile.am index b70b0625..4dd7ad63 100644 --- a/Makefile.am +++ b/Makefile.am @@ -13,6 +13,7 @@ SUBDIRS = \ vapi \ introspection \ test \ + tools \ examples \ docs \ $(NULL) diff --git a/configure.ac b/configure.ac index 091dba62..069f0d83 100644 --- a/configure.ac +++ b/configure.ac @@ -539,6 +539,7 @@ src/Makefile src/tests/Makefile plugins/Makefile test/Makefile +tools/Makefile introspection/Makefile introspection/tests/Makefile po/Makefile.in diff --git a/tools/Makefile.am b/tools/Makefile.am new file mode 100644 index 00000000..8b1edd5f --- /dev/null +++ b/tools/Makefile.am @@ -0,0 +1 @@ +EXTRA_DIST = test-modemmanager-service.py diff --git a/tools/test-modemmanager-service.py b/tools/test-modemmanager-service.py new file mode 100755 index 00000000..e45b5fa4 --- /dev/null +++ b/tools/test-modemmanager-service.py @@ -0,0 +1,490 @@ +#!/usr/bin/env python3 +# -*- Mode: python; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- + +from __future__ import print_function + +import gi +gi.require_version('ModemManager', '1.0') +from gi.repository import GLib, ModemManager +import argparse +import sys +import dbus +import dbus.service +import dbus.mainloop.glib +import random +import collections + +mainloop = GLib.MainLoop() + +######################################################### +IFACE_DBUS = 'org.freedesktop.DBus' + +class UnknownInterfaceException(dbus.DBusException): + def __init__(self, *args, **kwargs): + self._dbus_error_name = '{}.UnknownInterface'.format(IFACE_DBUS) + super().__init__(*args, **kwargs) + +class UnknownPropertyException(dbus.DBusException): + def __init__(self, *args, **kwargs): + self._dbus_error_name = '{}.UnknownProperty'.format(IFACE_DBUS) + super().__init__(*args, **kwargs) + +class MobileEquipmentException(dbus.DBusException): + _dbus_error_name = '{}.Error.MobileEquipment'.format(IFACE_DBUS) + def __init__(self, *args, **kwargs): + equipment_error_num = kwargs.pop('equipment_error', None) + if equipment_error_num is not None: + equipment_error_except = ModemManager.MobileEquipmentError(equipment_error_num) + self._dbus_error_name = '{}.Error.MobileEquipment.{}'.format(IFACE_DBUS, equipment_error_except.value_nick) + super().__init__(*args, **kwargs) + +def log(msg): + if log_file: + try: + log_file.write(msg + "\n") + log_file.flush() + except Exception: + pass + else: + print(msg) + +def to_path_array(src): + array = dbus.Array([], signature=dbus.Signature('o')) + for o in src: + array.append(to_path(o)) + return array + +def to_path(src): + if src: + return dbus.ObjectPath(src.path) + return dbus.ObjectPath("/") + +class ExportedObj(dbus.service.Object): + + DBusInterface = collections.namedtuple('DBusInterface', ['dbus_iface', 'get_props_func', 'set_props_func', 'prop_changed_func']) + + def __init__(self, bus, object_path): + super(ExportedObj, self).__init__(bus, object_path) + self._bus = bus + self.path = object_path + self.__ensure_dbus_ifaces() + log("Will add object with path '%s' to object manager" % object_path) + object_manager.add_object(self) + + def __ensure_dbus_ifaces(self): + if not hasattr(self, '_ExportedObj__dbus_ifaces'): + self.__dbus_ifaces = {} + + def add_dbus_interface(self, dbus_iface, get_props_func, set_props_func, prop_changed_func): + self.__ensure_dbus_ifaces() + self.__dbus_ifaces[dbus_iface] = ExportedObj.DBusInterface(dbus_iface, get_props_func, set_props_func, prop_changed_func) + + def __dbus_interface_get(self, dbus_iface): + if dbus_iface not in self.__dbus_ifaces: + raise UnknownInterfaceException() + return self.__dbus_ifaces[dbus_iface] + + def _dbus_property_get(self, dbus_iface, propname=None): + props = self.__dbus_interface_get(dbus_iface).get_props_func() + if propname is None: + return props + if propname not in props: + raise UnknownPropertyException() + return props[propname] + + def _dbus_property_set(self, dbus_iface, propname, value): + props = self.__dbus_interface_get(dbus_iface).get_props_func() + + try: + if props[propname] == value: + return + except KeyError: + raise UnknownPropertyException() + + if self.__dbus_interface_get(dbus_iface).set_props_func is not None: + self.__dbus_interface_get(dbus_iface).set_props_func(propname, value) + self._dbus_property_notify(dbus_iface, propname) + + def _dbus_property_notify(self, dbus_iface, propname): + prop = self._dbus_property_get(dbus_iface, propname) + self.__dbus_interface_get(dbus_iface).prop_changed_func(self, {propname: prop}) + ExportedObj.PropertiesChanged(self, dbus_iface, {propname: prop}, []) + + @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as') + def PropertiesChanged(self, iface, changed, invalidated): + pass + + @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}') + def GetAll(self, dbus_iface): + return self._dbus_property_get(dbus_iface) + + @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v') + def Get(self, dbus_iface, name): + return self._dbus_property_get(dbus_iface, name) + + @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='ssv', out_signature='') + def Set(self, dbus_iface, name, value): + return self._dbus_property_set(dbus_iface, name, value) + + def get_managed_ifaces(self): + my_ifaces = {} + for iface in self.__dbus_ifaces: + my_ifaces[iface] = self.__dbus_ifaces[iface].get_props_func() + return self.path, my_ifaces + + +################################################################### +IFACE_SIM = 'org.freedesktop.ModemManager1.Sim' + +PS_IMSI = "Imsi" +PS_OPERATOR_IDENTIFIER = "OperatorIdentifier" +PS_OPERATOR_NAME = "OperatorName" +PS_SIM_IDENTIFIER = "SimIdentifier" + +class Sim(ExportedObj): + def __init__(self, bus, counter, iccid, modem): + object_path = "/org/freedesktop/ModemManager1/SIM/%d" % counter + self.iccid = iccid + self.modem = modem + + self.add_dbus_interface(IFACE_SIM, self.__get_props, None, Sim.PropertiesChanged) + super(Sim, self).__init__(bus, object_path) + + # Properties interface + def __get_props(self): + props = {} + props[PS_IMSI] = "Imsi_1" + props[PS_OPERATOR_IDENTIFIER] = "OperatorIdentifier_1" + props[PS_OPERATOR_NAME] = "OperatorName_1" + props[PS_SIM_IDENTIFIER] = self.iccid + return props + + # methods + @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='ss', out_signature='') + def ChangePin(self, old_pin, new_pin): + pass + + @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='sb', out_signature='ao') + def EnablePin(self, pin, enabled): + pass + + @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='s', out_signature='') + def SendPin(self, pin): + if self.modem.equipmentError is not None: + raise MobileEquipmentException(equipment_error=self.modem.equipmentError) + self.modem.unlock() + + @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='ss', out_signature='') + def SendPuk(self, puk, pin): + self.modem.unlock() + + # signals + @dbus.service.signal(IFACE_SIM, signature='a{sv}') + def PropertiesChanged(self, changed): + pass + + +################################################################### +IFACE_MODEM = 'org.freedesktop.ModemManager1.Modem' + +PM_SIM = "Sim" +PM_BEARERS = "Bearers" +PM_SUPPORTED_CAPABILITIES = "SupportedCapabilities" +PM_CURRENT_CAPABILITIES = "CurrentCapabilities" +PM_MAX_BEARERS = "MaxBearers" +PM_MAX_ACTIVE_BEARERS = "MaxActiveBearers" +PM_MANUFACTURER = "Manufacturer" +PM_MODEL = "Model" +PM_REVISION = "Revision" +PM_DEVICE_IDENTIFIER = "DeviceIdentifier" +PM_DEVICE = "Device" +PM_DRIVERS = "Drivers" +PM_PLUGIN = "Plugin" +PM_PRIMARY_PORT = "PrimaryPort" +PM_PORTS = "Ports" +PM_EQUIPMENT_IDENTIFIER = "EquipmentIdentifier" +PM_UNLOCK_REQUIRED = "UnlockRequired" +PM_UNLOCK_RETRIES = "UnlockRetries" +PM_STATE = "State" +PM_STATE_FAILED_REASON = "StateFailedReason" +PM_ACCESS_TECHNOLOGIES = "AccessTechnologies" +PM_SIGNAL_QUALITY = "SignalQuality" +PM_OWN_NUMBERS = "OwnNumbers" +PM_POWER_STATE = "PowerState" +PM_SUPPORTED_MODES = "SupportedModes" +PM_CURRENT_MODES = "CurrentModes" +PM_SUPPORTED_BANDS = "SupportedBands" +PM_CURRENT_BANDS = "CurrentBands" +PM_SUPPORTED_IP_FAMILIES = "SupportedIpFamilies" + +class Modem(ExportedObj): + counter = 0 + + def __init__(self, bus, add_sim, iccid): + object_path = "/org/freedesktop/ModemManager1/Modem/%d" % Modem.counter + self.sim_object = None + if add_sim: + self.sim_object = Sim(bus, Modem.counter, iccid, self) + self.sim_path = to_path(self.sim_object) + self.equipmentError = None + self.reset_status = True + self.reset_status_clear = False + + self.__props = self.__init_default_props() + + Modem.counter = Modem.counter + 1 + + self.add_dbus_interface(IFACE_MODEM, self.__get_props, self.__set_prop, Modem.PropertiesChanged) + super(Modem, self).__init__(bus, object_path) + + # Properties interface + def __init_default_props(self): + props = {} + props[PM_SIM] = dbus.ObjectPath(self.sim_path) + props[PM_DEVICE] = dbus.String("/fake/path") + props[PM_UNLOCK_REQUIRED] = dbus.UInt32(ModemManager.ModemLock.NONE) + props[PM_STATE] = dbus.Int32(ModemManager.ModemState.UNKNOWN) + props[PM_STATE_FAILED_REASON] = dbus.UInt32(ModemManager.ModemStateFailedReason.UNKNOWN) + # Not already used properties + #props[PM_BEARERS] = None + #props[PM_SUPPORTED_CAPABILITIES] = None + #props[PM_CURRENT_CAPABILITIES] = None + #props[PM_MAX_BEARERS] = None + #props[PM_MAX_ACTIVE_BEARERS] = None + #props[PM_MANUFACTURER] = None + #props[PM_MODEL] = None + #props[PM_REVISION] = None + #props[PM_DEVICE_IDENTIFIER] = None + #props[PM_DRIVERS] = None + #props[PM_PLUGIN] = None + #props[PM_PRIMARY_PORT] = None + #props[PM_PORTS] = None + #props[PM_EQUIPMENT_IDENTIFIER] = None + #props[PM_UNLOCK_RETRIES] = dbus.UInt32(0) + #props[PM_ACCESS_TECHNOLOGIES] = None + #props[PM_SIGNAL_QUALITY] = None + #props[PM_OWN_NUMBERS] = None + #props[PM_POWER_STATE] = None + #props[PM_SUPPORTED_MODES] = None + #props[PM_CURRENT_MODES] = None + #props[PM_SUPPORTED_BANDS] = None + #props[PM_CURRENT_BANDS] = None + #props[PM_SUPPORTED_IP_FAMILIES] = None + return props + + def __get_props(self): + return self.__props + + def __set_prop(self, name, value): + try: + self.__props[name] = value + except KeyError: + pass + + def unlock(self): + self._dbus_property_set(IFACE_MODEM, PM_UNLOCK_REQUIRED , dbus.UInt32(ModemManager.ModemLock.NONE)) + + # methods + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='b', out_signature='') + def Enable(self, enable): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='', out_signature='ao') + def ListBearers(self): + return None + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='a{sv}', out_signature='o') + def CreateBearer(self, properties): + return None + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='o', out_signature='') + def DeleteBearer(self, bearer): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='', out_signature='') + def Reset(self): + if not self.reset_status: + if self.reset_status_clear: + self.reset_status = True + self.reset_status_clear = False + + raise Exception("Fake reset exception") + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='s', out_signature='') + def FactoryReset(self, code): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='u', out_signature='') + def SetPowerState(self, state): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='u', out_signature='') + def SetCurrentCapabilities(self, capabilites): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='(uu)', out_signature='') + def SetCurrentModes(self, modes): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='au', out_signature='') + def SetCurrentBands(self, bands): + pass + + @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='su', out_signature='s') + def Command(self, cmd, timeout): + return None + + # signals + @dbus.service.signal(IFACE_MODEM, signature='a{sv}') + def PropertiesChanged(self, changed): + pass + + @dbus.service.signal(IFACE_MODEM, signature='uuu') + def StateChanged(self, old_state, new_state, reason): + pass + + +################################################################### +IFACE_OBJECT_MANAGER = 'org.freedesktop.DBus.ObjectManager' + +PATH_OBJECT_MANAGER = '/org/freedesktop/ModemManager1' + +IFACE_TEST = 'org.freedesktop.ModemManager1.LibmmGlibTest' +IFACE_MM = 'org.freedesktop.ModemManager1' + +class ObjectManager(dbus.service.Object): + def __init__(self, bus, object_path): + super(ObjectManager, self).__init__(bus, object_path) + self.objs = [] + self.bus = bus + self.modem = None + + @dbus.service.method(dbus_interface=IFACE_OBJECT_MANAGER, + in_signature='', out_signature='a{oa{sa{sv}}}', + sender_keyword='sender') + def GetManagedObjects(self, sender=None): + managed_objects = {} + for obj in self.objs: + name, ifaces = obj.get_managed_ifaces() + managed_objects[name] = ifaces + return managed_objects + + def add_object(self, obj): + self.objs.append(obj) + name, ifaces = obj.get_managed_ifaces() + self.InterfacesAdded(name, ifaces) + + def remove_object(self, obj): + self.objs.remove(obj) + name, ifaces = obj.get_managed_ifaces() + self.InterfacesRemoved(name, ifaces.keys()) + + @dbus.service.signal(IFACE_OBJECT_MANAGER, signature='oa{sa{sv}}') + def InterfacesAdded(self, name, ifaces): + pass + + @dbus.service.signal(IFACE_OBJECT_MANAGER, signature='oas') + def InterfacesRemoved(self, name, ifaces): + pass + + # ModemManager methods + @dbus.service.method(dbus_interface=IFACE_MM, in_signature='', out_signature='') + def ScanDevices(self): + pass + + @dbus.service.method(dbus_interface=IFACE_MM, in_signature='s', out_signature='') + def SetLogging(self, logging): + pass + + # Testing methods + @dbus.service.method(IFACE_TEST, in_signature='', out_signature='') + def Quit(self): + mainloop.quit() + + @dbus.service.method(IFACE_TEST, in_signature='bs', out_signature='o') + def AddModem(self, add_sim, iccid): + self.modem = Modem(self.bus, add_sim, iccid) + return dbus.ObjectPath(self.modem.path) + + @dbus.service.method(IFACE_TEST, in_signature='uuu', out_signature='') + def EmitStateChanged(self, old_state, new_state, reason): + if self.modem is not None: + self.modem.StateChanged(old_state, new_state, reason) + + @dbus.service.method(IFACE_TEST, in_signature='ub', out_signature='') + def SetMobileEquipmentError(self, error, clear): + if self.modem is not None: + if clear: + self.modem.equipmentError = None + else: + self.modem.equipmentError = error + + @dbus.service.method(IFACE_TEST, in_signature='bb', out_signature='') + def SetResetStatus(self, status, clear): + if self.modem is not None: + self.modem.reset_status = status + self.modem.reset_status_clear = clear + + @dbus.service.method(dbus_interface=IFACE_TEST, in_signature='', out_signature='') + def Restart(self): + bus.release_name("org.freedesktop.ModemManager1") + bus.request_name("org.freedesktop.ModemManager1") + +################################################################### +def stdin_cb(io, condition): + mainloop.quit() + +def quit_cb(user_data): + mainloop.quit() + +def main(): + parser = argparse.ArgumentParser(description="ModemManager dbus interface stub utility") + parser.add_argument("-f", "--log-file", help="Path of a file to log things into") + + cfg = parser.parse_args() + + global log_file + + if cfg.log_file: + try: + log_file = open(cfg.log_file, "w") + except Exception: + log_file = None + else: + log_file = None + + log("Starting mainloop") + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + random.seed() + + global object_manager, bus + + bus = dbus.SessionBus() + log("Creating object manager for /org/freedesktop/ModemManager1") + object_manager = ObjectManager(bus, "/org/freedesktop/ModemManager1") + + log("Requesting name org.freedesktop.ModemManager1") + if not bus.request_name("org.freedesktop.ModemManager1"): + log("Unable to acquire the DBus name") + sys.exit(1) + + # Watch stdin; if it closes, assume our parent has crashed, and exit + id1 = GLib.io_add_watch(0, GLib.IOCondition.HUP, stdin_cb) + + log("Starting the main loop") + try: + mainloop.run() + except (Exception, KeyboardInterrupt): + pass + + GLib.source_remove(id1) + + log("Ending the stub") + if log_file: + log_file.close() + sys.exit(0) + + +if __name__ == '__main__': + main() |