#!/usr/bin/env python # SPDX-License-Identifier: LGPL-2.1-or-later # A example script to activate many profiles in parallel. # # It uses entirely asynchronous API. At various points the # script explicitly iterates the main context, which is unlike # a more complex application that uses the GMainContext, which # probably would run the context only at one point as long as # the application is running (from the main function). import gi import os import sys import time gi.require_version("NM", "1.0") from gi.repository import NM, GLib class MyError(Exception): pass NUM_PARALLEL_STARTING = 10 NUM_PARALLEL_IN_PROGRESS = 50 s = os.getenv("NUM_PARALLEL_STARTING") if s: NUM_PARALLEL_STARTING = int(s) s = os.getenv("NUM_PARALLEL_IN_PROGRESS") if s: NUM_PARALLEL_IN_PROGRESS = int(s) start_time = time.monotonic() def log(msg): # use nm_utils_print(), so that the log messages are in synch with # LIBNM_CLIENT_DEBUG=trace messages. NM.utils_print(0, "[%015.10f] %s\n" % (time.monotonic() - start_time, msg)) def nmc_new(io_priority=GLib.PRIORITY_DEFAULT, cancellable=None): # create a NMClient instance using the async initialization # (but the function itself iterates the main context until # the initialization completes). result = [] def cb(source_object, res): try: source_object.init_finish(res) except Exception as e: result.append(e) else: result.append(None) nmc = NM.Client() nmc.init_async(io_priority, cancellable, cb) while not result: nmc.get_main_context().iteration(may_block=True) if result[0]: raise result[0] log("initialized NMClient cache") return nmc def nmc_destroy(nmc_transfer_ref): # Just for fun, show how to completely cleanup a NMClient instance. # An NMClient instance registers D-Bus signals and unrefing the instance # will cancel/unsubscribe those signals, but there might still be some # pending operations scheduled on the main context. That means, after # unrefing the NMClient instance, we may need to iterate the GMainContext # a bit longer, go get rid of all resources (otherwise, the GMainContext # itself cannot be destroyed and leaks). # # We can use nm_client_get_context_busy_watcher() for that, by subscribing # a weak reference and iterating the context as long as the object is # alive. nmc = nmc_transfer_ref[0] del nmc_transfer_ref[0] alive = [1] def weak_ref_cb(alive): del alive[0] nmc.get_context_busy_watcher().weak_ref(weak_ref_cb, alive) main_context = nmc.get_main_context() del nmc while alive: main_context.iteration(may_block=True) log("NMClient instance cleaned up") def find_connections(nmc, argv): # parse the inpurt argv and select the connection profiles to activate. # The arguments are either "connection.id" or "connection.uuid", possibly # qualified by "id" or "uuid". result = [] while True: if not argv: break arg_type = argv.pop(0) if arg_type in ["id", "uuid"]: if not argv: raise MyError('missing specifier after "%s"' % (arg_type)) arg_param = argv.pop(0) else: arg_param = arg_type arg_type = "*" cc = [] for c in nmc.get_connections(): if arg_type in ["id", "*"] and arg_param == c.get_id(): cc.append(c) if arg_type in ["uuid", "*"] and arg_param == c.get_uuid(): cc.append(c) if not cc: raise MyError( 'Could not find a matching connection "%s" "%s"' % (arg_type, arg_param) ) if len(cc) > 1: raise MyError( 'Could not find a unique matching connection "%s" "%s", instead %d profiles found' % (arg_type, arg_param, len(cc)) ) if cc[0] not in result: # we allow duplicates, but combine them. result.extend(cc) for c in result: log( "requested connection: %s (%s) (%s)" % (c.get_id(), c.get_uuid(), c.get_path()) ) return result class Activation(object): ACTIVATION_STATE_START = "start" ACTIVATION_STATE_STARTING = "starting" ACTIVATION_STATE_WAITING = "waiting" ACTIVATION_STATE_DONE = "done" def __init__(self, con): self.con = con self.state = Activation.ACTIVATION_STATE_START self.result_msg = None self.result_ac = None self.ac_result = None self.wait_id = None def __str__(self): return "%s (%s)" % (self.con.get_id(), self.con.get_uuid()) def is_done(self, log=log): if self.state == Activation.ACTIVATION_STATE_DONE: return True if self.state != Activation.ACTIVATION_STATE_WAITING: return False def _log_result(self, msg, done_with_success=False): log("connection %s done: %s" % (self, msg)) self.state = Activation.ACTIVATION_STATE_DONE self.done_with_success = done_with_success return True ac = self.result_ac if not ac: return _log_result(self, "failed activation call (%s)" % (self.result_msg,)) if ac.get_client() is None: return _log_result(self, "active connection disappeared") if ac.get_state() > NM.ActiveConnectionState.ACTIVATED: return _log_result( self, "connection failed to activate (state %s)" % (ac.get_state()) ) if ac.get_state() == NM.ActiveConnectionState.ACTIVATED: return _log_result( self, "connection successfully activated", done_with_success=True ) return False def start(self, nmc, cancellable=None, activated_callback=None, log=log): # Call nmc.activate_connection_async() and return a user data # with the information about the pending operation. assert self.state == Activation.ACTIVATION_STATE_START self.state = Activation.ACTIVATION_STATE_STARTING log("activation %s start asynchronously" % (self)) def cb_activate_connection(source_object, res): assert self.state == Activation.ACTIVATION_STATE_STARTING try: ac = nmc.activate_connection_finish(res) except Exception as e: self.result_msg = str(e) log( "activation %s started asynchronously failed: %s" % (self, self.result_msg) ) else: self.result_msg = "success" self.result_ac = ac log( "activation %s started asynchronously success: %s" % (self, ac.get_path()) ) self.state = Activation.ACTIVATION_STATE_WAITING if activated_callback is not None: activated_callback(self) nmc.activate_connection_async( self.con, None, None, cancellable, cb_activate_connection ) def wait(self, done_callback=None, log=log): assert self.state == Activation.ACTIVATION_STATE_WAITING assert self.result_ac assert self.wait_id is None def cb_wait(ac, state): if self.is_done(log=log): self.result_ac.disconnect(self.wait_id) self.wait_id = None done_callback(self) log("waiting for %s to fully activate" % (self)) self.wait_id = self.result_ac.connect("notify", cb_wait) class Manager(object): def __init__(self, nmc, cons): self.nmc = nmc self.ac_start = [Activation(c) for c in cons] self.ac_starting = [] self.ac_waiting = [] self.ac_done = [] def _log(self, msg): lists = [self.ac_start, self.ac_starting, self.ac_waiting, self.ac_done] n = sum(len(l) for l in lists) n = str(len(str(n))) prefix = "/".join((("%0" + n + "d") % len(l)) for l in lists) log("%s: %s" % (prefix, msg)) def ac_run(self): loop = GLib.MainLoop(self.nmc.get_main_context()) while self.ac_start or self.ac_starting or self.ac_waiting: rate_limit_parallel_in_progress = ( len(self.ac_starting) + len(self.ac_waiting) >= NUM_PARALLEL_IN_PROGRESS ) if ( not rate_limit_parallel_in_progress and self.ac_start and len(self.ac_starting) < NUM_PARALLEL_STARTING ): activation = self.ac_start.pop(0) self.ac_starting.append(activation) def cb_activated(activation2): self.ac_starting.remove(activation2) if activation2.is_done(log=self._log): self.ac_done.append(activation2) else: self.ac_waiting.append(activation2) def cb_done(activation3): self.ac_waiting.remove(activation3) self.ac_done.append(activation3) loop.quit() activation2.wait(done_callback=cb_done, log=self._log) loop.quit() activation.start( self.nmc, activated_callback=cb_activated, log=self._log ) continue loop.run() res_list = [ac.done_with_success for ac in self.ac_done] log( "%s out of %s activations are now successfully activated" % (sum(res_list), len(self.ac_done)) ) return all(res_list) def main(): nmc = nmc_new() cons = find_connections(nmc, sys.argv[1:]) all_good = Manager(nmc, cons).ac_run() nmc_transfer_ref = [nmc] del nmc nmc_destroy(nmc_transfer_ref) sys.exit(0 if all_good else 1) if __name__ == "__main__": main()