summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrederic Martinsons <frederic.martinsons@sigfox.com>2020-11-10 16:48:22 +0100
committerFrederic Martinsons <frederic.martinsons@sigfox.com>2020-11-12 18:34:59 +0100
commit519df2544b04ccbc9cf87fbfbdbbaf44edeef75b (patch)
tree9bb60dbf7f684a2131878964ea5c973a03d35cf9
parent06f75500dbc3f471bf0c40221bbfb5f970f65f32 (diff)
tools: add new system daemon stub tester application
This script mock some of ModemManager DBus interfaces (main object, Modem, Sim), it also adds a test interface "org.freedesktop.ModemManager1.LibmmGlibTest" which allow to inject some errors and simulate behavior: - Add a modem object - Emit modem state changed - Set modem error This script also add the possibility to log in a file for debugging purpose. Because the stdout/stderr are not shown when the program is spawned by DBus auto activation. Note: Script is heavily inspired from test-networkmanager-service.py from NetworkManager project Signed-off-by: Frederic Martinsons <frederic.martinsons@sigfox.com>
-rw-r--r--Makefile.am1
-rw-r--r--configure.ac1
-rw-r--r--tools/Makefile.am1
-rwxr-xr-xtools/test-modemmanager-service.py490
4 files changed, 493 insertions, 0 deletions
diff --git a/Makefile.am b/Makefile.am
index b70b0625..4dd7ad63 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -13,6 +13,7 @@ SUBDIRS = \
vapi \
introspection \
test \
+ tools \
examples \
docs \
$(NULL)
diff --git a/configure.ac b/configure.ac
index 091dba62..069f0d83 100644
--- a/configure.ac
+++ b/configure.ac
@@ -539,6 +539,7 @@ src/Makefile
src/tests/Makefile
plugins/Makefile
test/Makefile
+tools/Makefile
introspection/Makefile
introspection/tests/Makefile
po/Makefile.in
diff --git a/tools/Makefile.am b/tools/Makefile.am
new file mode 100644
index 00000000..8b1edd5f
--- /dev/null
+++ b/tools/Makefile.am
@@ -0,0 +1 @@
+EXTRA_DIST = test-modemmanager-service.py
diff --git a/tools/test-modemmanager-service.py b/tools/test-modemmanager-service.py
new file mode 100755
index 00000000..e45b5fa4
--- /dev/null
+++ b/tools/test-modemmanager-service.py
@@ -0,0 +1,490 @@
+#!/usr/bin/env python3
+# -*- Mode: python; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*-
+
+from __future__ import print_function
+
+import gi
+gi.require_version('ModemManager', '1.0')
+from gi.repository import GLib, ModemManager
+import argparse
+import sys
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+import random
+import collections
+
+mainloop = GLib.MainLoop()
+
+#########################################################
+IFACE_DBUS = 'org.freedesktop.DBus'
+
+class UnknownInterfaceException(dbus.DBusException):
+ def __init__(self, *args, **kwargs):
+ self._dbus_error_name = '{}.UnknownInterface'.format(IFACE_DBUS)
+ super().__init__(*args, **kwargs)
+
+class UnknownPropertyException(dbus.DBusException):
+ def __init__(self, *args, **kwargs):
+ self._dbus_error_name = '{}.UnknownProperty'.format(IFACE_DBUS)
+ super().__init__(*args, **kwargs)
+
+class MobileEquipmentException(dbus.DBusException):
+ _dbus_error_name = '{}.Error.MobileEquipment'.format(IFACE_DBUS)
+ def __init__(self, *args, **kwargs):
+ equipment_error_num = kwargs.pop('equipment_error', None)
+ if equipment_error_num is not None:
+ equipment_error_except = ModemManager.MobileEquipmentError(equipment_error_num)
+ self._dbus_error_name = '{}.Error.MobileEquipment.{}'.format(IFACE_DBUS, equipment_error_except.value_nick)
+ super().__init__(*args, **kwargs)
+
+def log(msg):
+ if log_file:
+ try:
+ log_file.write(msg + "\n")
+ log_file.flush()
+ except Exception:
+ pass
+ else:
+ print(msg)
+
+def to_path_array(src):
+ array = dbus.Array([], signature=dbus.Signature('o'))
+ for o in src:
+ array.append(to_path(o))
+ return array
+
+def to_path(src):
+ if src:
+ return dbus.ObjectPath(src.path)
+ return dbus.ObjectPath("/")
+
+class ExportedObj(dbus.service.Object):
+
+ DBusInterface = collections.namedtuple('DBusInterface', ['dbus_iface', 'get_props_func', 'set_props_func', 'prop_changed_func'])
+
+ def __init__(self, bus, object_path):
+ super(ExportedObj, self).__init__(bus, object_path)
+ self._bus = bus
+ self.path = object_path
+ self.__ensure_dbus_ifaces()
+ log("Will add object with path '%s' to object manager" % object_path)
+ object_manager.add_object(self)
+
+ def __ensure_dbus_ifaces(self):
+ if not hasattr(self, '_ExportedObj__dbus_ifaces'):
+ self.__dbus_ifaces = {}
+
+ def add_dbus_interface(self, dbus_iface, get_props_func, set_props_func, prop_changed_func):
+ self.__ensure_dbus_ifaces()
+ self.__dbus_ifaces[dbus_iface] = ExportedObj.DBusInterface(dbus_iface, get_props_func, set_props_func, prop_changed_func)
+
+ def __dbus_interface_get(self, dbus_iface):
+ if dbus_iface not in self.__dbus_ifaces:
+ raise UnknownInterfaceException()
+ return self.__dbus_ifaces[dbus_iface]
+
+ def _dbus_property_get(self, dbus_iface, propname=None):
+ props = self.__dbus_interface_get(dbus_iface).get_props_func()
+ if propname is None:
+ return props
+ if propname not in props:
+ raise UnknownPropertyException()
+ return props[propname]
+
+ def _dbus_property_set(self, dbus_iface, propname, value):
+ props = self.__dbus_interface_get(dbus_iface).get_props_func()
+
+ try:
+ if props[propname] == value:
+ return
+ except KeyError:
+ raise UnknownPropertyException()
+
+ if self.__dbus_interface_get(dbus_iface).set_props_func is not None:
+ self.__dbus_interface_get(dbus_iface).set_props_func(propname, value)
+ self._dbus_property_notify(dbus_iface, propname)
+
+ def _dbus_property_notify(self, dbus_iface, propname):
+ prop = self._dbus_property_get(dbus_iface, propname)
+ self.__dbus_interface_get(dbus_iface).prop_changed_func(self, {propname: prop})
+ ExportedObj.PropertiesChanged(self, dbus_iface, {propname: prop}, [])
+
+ @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as')
+ def PropertiesChanged(self, iface, changed, invalidated):
+ pass
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}')
+ def GetAll(self, dbus_iface):
+ return self._dbus_property_get(dbus_iface)
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v')
+ def Get(self, dbus_iface, name):
+ return self._dbus_property_get(dbus_iface, name)
+
+ @dbus.service.method(dbus_interface=dbus.PROPERTIES_IFACE, in_signature='ssv', out_signature='')
+ def Set(self, dbus_iface, name, value):
+ return self._dbus_property_set(dbus_iface, name, value)
+
+ def get_managed_ifaces(self):
+ my_ifaces = {}
+ for iface in self.__dbus_ifaces:
+ my_ifaces[iface] = self.__dbus_ifaces[iface].get_props_func()
+ return self.path, my_ifaces
+
+
+###################################################################
+IFACE_SIM = 'org.freedesktop.ModemManager1.Sim'
+
+PS_IMSI = "Imsi"
+PS_OPERATOR_IDENTIFIER = "OperatorIdentifier"
+PS_OPERATOR_NAME = "OperatorName"
+PS_SIM_IDENTIFIER = "SimIdentifier"
+
+class Sim(ExportedObj):
+ def __init__(self, bus, counter, iccid, modem):
+ object_path = "/org/freedesktop/ModemManager1/SIM/%d" % counter
+ self.iccid = iccid
+ self.modem = modem
+
+ self.add_dbus_interface(IFACE_SIM, self.__get_props, None, Sim.PropertiesChanged)
+ super(Sim, self).__init__(bus, object_path)
+
+ # Properties interface
+ def __get_props(self):
+ props = {}
+ props[PS_IMSI] = "Imsi_1"
+ props[PS_OPERATOR_IDENTIFIER] = "OperatorIdentifier_1"
+ props[PS_OPERATOR_NAME] = "OperatorName_1"
+ props[PS_SIM_IDENTIFIER] = self.iccid
+ return props
+
+ # methods
+ @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='ss', out_signature='')
+ def ChangePin(self, old_pin, new_pin):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='sb', out_signature='ao')
+ def EnablePin(self, pin, enabled):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='s', out_signature='')
+ def SendPin(self, pin):
+ if self.modem.equipmentError is not None:
+ raise MobileEquipmentException(equipment_error=self.modem.equipmentError)
+ self.modem.unlock()
+
+ @dbus.service.method(dbus_interface=IFACE_SIM, in_signature='ss', out_signature='')
+ def SendPuk(self, puk, pin):
+ self.modem.unlock()
+
+ # signals
+ @dbus.service.signal(IFACE_SIM, signature='a{sv}')
+ def PropertiesChanged(self, changed):
+ pass
+
+
+###################################################################
+IFACE_MODEM = 'org.freedesktop.ModemManager1.Modem'
+
+PM_SIM = "Sim"
+PM_BEARERS = "Bearers"
+PM_SUPPORTED_CAPABILITIES = "SupportedCapabilities"
+PM_CURRENT_CAPABILITIES = "CurrentCapabilities"
+PM_MAX_BEARERS = "MaxBearers"
+PM_MAX_ACTIVE_BEARERS = "MaxActiveBearers"
+PM_MANUFACTURER = "Manufacturer"
+PM_MODEL = "Model"
+PM_REVISION = "Revision"
+PM_DEVICE_IDENTIFIER = "DeviceIdentifier"
+PM_DEVICE = "Device"
+PM_DRIVERS = "Drivers"
+PM_PLUGIN = "Plugin"
+PM_PRIMARY_PORT = "PrimaryPort"
+PM_PORTS = "Ports"
+PM_EQUIPMENT_IDENTIFIER = "EquipmentIdentifier"
+PM_UNLOCK_REQUIRED = "UnlockRequired"
+PM_UNLOCK_RETRIES = "UnlockRetries"
+PM_STATE = "State"
+PM_STATE_FAILED_REASON = "StateFailedReason"
+PM_ACCESS_TECHNOLOGIES = "AccessTechnologies"
+PM_SIGNAL_QUALITY = "SignalQuality"
+PM_OWN_NUMBERS = "OwnNumbers"
+PM_POWER_STATE = "PowerState"
+PM_SUPPORTED_MODES = "SupportedModes"
+PM_CURRENT_MODES = "CurrentModes"
+PM_SUPPORTED_BANDS = "SupportedBands"
+PM_CURRENT_BANDS = "CurrentBands"
+PM_SUPPORTED_IP_FAMILIES = "SupportedIpFamilies"
+
+class Modem(ExportedObj):
+ counter = 0
+
+ def __init__(self, bus, add_sim, iccid):
+ object_path = "/org/freedesktop/ModemManager1/Modem/%d" % Modem.counter
+ self.sim_object = None
+ if add_sim:
+ self.sim_object = Sim(bus, Modem.counter, iccid, self)
+ self.sim_path = to_path(self.sim_object)
+ self.equipmentError = None
+ self.reset_status = True
+ self.reset_status_clear = False
+
+ self.__props = self.__init_default_props()
+
+ Modem.counter = Modem.counter + 1
+
+ self.add_dbus_interface(IFACE_MODEM, self.__get_props, self.__set_prop, Modem.PropertiesChanged)
+ super(Modem, self).__init__(bus, object_path)
+
+ # Properties interface
+ def __init_default_props(self):
+ props = {}
+ props[PM_SIM] = dbus.ObjectPath(self.sim_path)
+ props[PM_DEVICE] = dbus.String("/fake/path")
+ props[PM_UNLOCK_REQUIRED] = dbus.UInt32(ModemManager.ModemLock.NONE)
+ props[PM_STATE] = dbus.Int32(ModemManager.ModemState.UNKNOWN)
+ props[PM_STATE_FAILED_REASON] = dbus.UInt32(ModemManager.ModemStateFailedReason.UNKNOWN)
+ # Not already used properties
+ #props[PM_BEARERS] = None
+ #props[PM_SUPPORTED_CAPABILITIES] = None
+ #props[PM_CURRENT_CAPABILITIES] = None
+ #props[PM_MAX_BEARERS] = None
+ #props[PM_MAX_ACTIVE_BEARERS] = None
+ #props[PM_MANUFACTURER] = None
+ #props[PM_MODEL] = None
+ #props[PM_REVISION] = None
+ #props[PM_DEVICE_IDENTIFIER] = None
+ #props[PM_DRIVERS] = None
+ #props[PM_PLUGIN] = None
+ #props[PM_PRIMARY_PORT] = None
+ #props[PM_PORTS] = None
+ #props[PM_EQUIPMENT_IDENTIFIER] = None
+ #props[PM_UNLOCK_RETRIES] = dbus.UInt32(0)
+ #props[PM_ACCESS_TECHNOLOGIES] = None
+ #props[PM_SIGNAL_QUALITY] = None
+ #props[PM_OWN_NUMBERS] = None
+ #props[PM_POWER_STATE] = None
+ #props[PM_SUPPORTED_MODES] = None
+ #props[PM_CURRENT_MODES] = None
+ #props[PM_SUPPORTED_BANDS] = None
+ #props[PM_CURRENT_BANDS] = None
+ #props[PM_SUPPORTED_IP_FAMILIES] = None
+ return props
+
+ def __get_props(self):
+ return self.__props
+
+ def __set_prop(self, name, value):
+ try:
+ self.__props[name] = value
+ except KeyError:
+ pass
+
+ def unlock(self):
+ self._dbus_property_set(IFACE_MODEM, PM_UNLOCK_REQUIRED , dbus.UInt32(ModemManager.ModemLock.NONE))
+
+ # methods
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='b', out_signature='')
+ def Enable(self, enable):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='', out_signature='ao')
+ def ListBearers(self):
+ return None
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='a{sv}', out_signature='o')
+ def CreateBearer(self, properties):
+ return None
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='o', out_signature='')
+ def DeleteBearer(self, bearer):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='', out_signature='')
+ def Reset(self):
+ if not self.reset_status:
+ if self.reset_status_clear:
+ self.reset_status = True
+ self.reset_status_clear = False
+
+ raise Exception("Fake reset exception")
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='s', out_signature='')
+ def FactoryReset(self, code):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='u', out_signature='')
+ def SetPowerState(self, state):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='u', out_signature='')
+ def SetCurrentCapabilities(self, capabilites):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='(uu)', out_signature='')
+ def SetCurrentModes(self, modes):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='au', out_signature='')
+ def SetCurrentBands(self, bands):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MODEM, in_signature='su', out_signature='s')
+ def Command(self, cmd, timeout):
+ return None
+
+ # signals
+ @dbus.service.signal(IFACE_MODEM, signature='a{sv}')
+ def PropertiesChanged(self, changed):
+ pass
+
+ @dbus.service.signal(IFACE_MODEM, signature='uuu')
+ def StateChanged(self, old_state, new_state, reason):
+ pass
+
+
+###################################################################
+IFACE_OBJECT_MANAGER = 'org.freedesktop.DBus.ObjectManager'
+
+PATH_OBJECT_MANAGER = '/org/freedesktop/ModemManager1'
+
+IFACE_TEST = 'org.freedesktop.ModemManager1.LibmmGlibTest'
+IFACE_MM = 'org.freedesktop.ModemManager1'
+
+class ObjectManager(dbus.service.Object):
+ def __init__(self, bus, object_path):
+ super(ObjectManager, self).__init__(bus, object_path)
+ self.objs = []
+ self.bus = bus
+ self.modem = None
+
+ @dbus.service.method(dbus_interface=IFACE_OBJECT_MANAGER,
+ in_signature='', out_signature='a{oa{sa{sv}}}',
+ sender_keyword='sender')
+ def GetManagedObjects(self, sender=None):
+ managed_objects = {}
+ for obj in self.objs:
+ name, ifaces = obj.get_managed_ifaces()
+ managed_objects[name] = ifaces
+ return managed_objects
+
+ def add_object(self, obj):
+ self.objs.append(obj)
+ name, ifaces = obj.get_managed_ifaces()
+ self.InterfacesAdded(name, ifaces)
+
+ def remove_object(self, obj):
+ self.objs.remove(obj)
+ name, ifaces = obj.get_managed_ifaces()
+ self.InterfacesRemoved(name, ifaces.keys())
+
+ @dbus.service.signal(IFACE_OBJECT_MANAGER, signature='oa{sa{sv}}')
+ def InterfacesAdded(self, name, ifaces):
+ pass
+
+ @dbus.service.signal(IFACE_OBJECT_MANAGER, signature='oas')
+ def InterfacesRemoved(self, name, ifaces):
+ pass
+
+ # ModemManager methods
+ @dbus.service.method(dbus_interface=IFACE_MM, in_signature='', out_signature='')
+ def ScanDevices(self):
+ pass
+
+ @dbus.service.method(dbus_interface=IFACE_MM, in_signature='s', out_signature='')
+ def SetLogging(self, logging):
+ pass
+
+ # Testing methods
+ @dbus.service.method(IFACE_TEST, in_signature='', out_signature='')
+ def Quit(self):
+ mainloop.quit()
+
+ @dbus.service.method(IFACE_TEST, in_signature='bs', out_signature='o')
+ def AddModem(self, add_sim, iccid):
+ self.modem = Modem(self.bus, add_sim, iccid)
+ return dbus.ObjectPath(self.modem.path)
+
+ @dbus.service.method(IFACE_TEST, in_signature='uuu', out_signature='')
+ def EmitStateChanged(self, old_state, new_state, reason):
+ if self.modem is not None:
+ self.modem.StateChanged(old_state, new_state, reason)
+
+ @dbus.service.method(IFACE_TEST, in_signature='ub', out_signature='')
+ def SetMobileEquipmentError(self, error, clear):
+ if self.modem is not None:
+ if clear:
+ self.modem.equipmentError = None
+ else:
+ self.modem.equipmentError = error
+
+ @dbus.service.method(IFACE_TEST, in_signature='bb', out_signature='')
+ def SetResetStatus(self, status, clear):
+ if self.modem is not None:
+ self.modem.reset_status = status
+ self.modem.reset_status_clear = clear
+
+ @dbus.service.method(dbus_interface=IFACE_TEST, in_signature='', out_signature='')
+ def Restart(self):
+ bus.release_name("org.freedesktop.ModemManager1")
+ bus.request_name("org.freedesktop.ModemManager1")
+
+###################################################################
+def stdin_cb(io, condition):
+ mainloop.quit()
+
+def quit_cb(user_data):
+ mainloop.quit()
+
+def main():
+ parser = argparse.ArgumentParser(description="ModemManager dbus interface stub utility")
+ parser.add_argument("-f", "--log-file", help="Path of a file to log things into")
+
+ cfg = parser.parse_args()
+
+ global log_file
+
+ if cfg.log_file:
+ try:
+ log_file = open(cfg.log_file, "w")
+ except Exception:
+ log_file = None
+ else:
+ log_file = None
+
+ log("Starting mainloop")
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ random.seed()
+
+ global object_manager, bus
+
+ bus = dbus.SessionBus()
+ log("Creating object manager for /org/freedesktop/ModemManager1")
+ object_manager = ObjectManager(bus, "/org/freedesktop/ModemManager1")
+
+ log("Requesting name org.freedesktop.ModemManager1")
+ if not bus.request_name("org.freedesktop.ModemManager1"):
+ log("Unable to acquire the DBus name")
+ sys.exit(1)
+
+ # Watch stdin; if it closes, assume our parent has crashed, and exit
+ id1 = GLib.io_add_watch(0, GLib.IOCondition.HUP, stdin_cb)
+
+ log("Starting the main loop")
+ try:
+ mainloop.run()
+ except (Exception, KeyboardInterrupt):
+ pass
+
+ GLib.source_remove(id1)
+
+ log("Ending the stub")
+ if log_file:
+ log_file.close()
+ sys.exit(0)
+
+
+if __name__ == '__main__':
+ main()