diff options
author | mbligh <mbligh@592f7852-d20e-0410-864c-8624ca9c26a4> | 2009-12-29 02:55:23 +0000 |
---|---|---|
committer | mbligh <mbligh@592f7852-d20e-0410-864c-8624ca9c26a4> | 2009-12-29 02:55:23 +0000 |
commit | c727b1206352798d5f73376a00b3dd0df55432cf (patch) | |
tree | e57a7e8e94541bfa41583d4bf0f9885860880751 | |
parent | 564453927c870b6ff48806d5dcab4c1b6d8af3bc (diff) |
Reworked the server side profiling support to use barrier to synchronize
starting and stopping of profilers on multiple machines. The synchronization
will only work with server side jobs that work with multiple host objects
in the same process (ie there is no multi machine synchronization done for
jobs that do early a job.parallel_simple() which forks a process per
machine because each separate autoserv process will only be aware of a
single host object).
Signed-off-by: Mihai Rusu <dizzy@google.com>
git-svn-id: svn://test.kernel.org/autotest/trunk@4064 592f7852-d20e-0410-864c-8624ca9c26a4
-rw-r--r-- | server/crashcollect.py | 2 | ||||
-rw-r--r-- | server/profiler.py | 217 | ||||
-rw-r--r-- | server/profilers.py | 240 | ||||
-rwxr-xr-x | server/server_job.py | 6 |
4 files changed, 252 insertions, 213 deletions
diff --git a/server/crashcollect.py b/server/crashcollect.py index 916abef9..1f28861d 100644 --- a/server/crashcollect.py +++ b/server/crashcollect.py @@ -1,6 +1,6 @@ import os, time, pickle, logging, shutil -from autotest_lib.server import utils, profiler +from autotest_lib.server import utils # import any site hooks for the crashdump and crashinfo collection diff --git a/server/profiler.py b/server/profiler.py index 7c571de7..38bc9b53 100644 --- a/server/profiler.py +++ b/server/profiler.py @@ -1,22 +1,8 @@ -import os, itertools, shutil, tempfile, logging +import itertools import common -from autotest_lib.client.common_lib import utils, error -from autotest_lib.server import autotest, hosts - -PROFILER_TMPDIR = "/tmp/profilers" - - -# control file template for running a job that uses profiler 'name' -run_profiler_control = """\ -job.profilers.add(%s) -job.run_test("profiler_test") -job.profilers.delete(%r) -""" - - -def get_unpassable_types(arg): +def _get_unpassable_types(arg): """ Given an argument, returns a set of types contained in arg that are unpassable. If arg is an atomic type (e.g. int) it either returns an empty set (if the type is passable) or a singleton of the type (if the @@ -32,55 +18,29 @@ def get_unpassable_types(arg): parts = iter(arg) types = set() for part in parts: - types |= get_unpassable_types(part) + types |= _get_unpassable_types(part) return types else: return set([type(arg)]) -def validate_args(args): +def _validate_args(args): """ Validates arguments. Lists and dictionaries are valid argument types, so you can pass *args and **dargs in directly, rather than having to iterate over them yourself. """ - unpassable_types = get_unpassable_types(args) + unpassable_types = _get_unpassable_types(args) if unpassable_types: msg = "arguments of type '%s' cannot be passed to remote profilers" msg %= ", ".join(t.__name__ for t in unpassable_types) raise TypeError(msg) -def encode_args(profiler, args, dargs): - parts = [repr(profiler)] - parts += [repr(arg) for arg in args] - parts += ["%s=%r" % darg for darg in dargs.iteritems()] - return ", ".join(parts) - - -def get_profiler_log_path(autodir): - """Given the directory of a profiler client, find the client log path.""" - return os.path.join(PROFILER_TMPDIR, autodir, "results", "default", - "debug", "client.DEBUG") - - -def get_profiler_results_dir(autodir): - """ Given the directory of the autotest client used to run a profiler, - return the remote path where profiler results will be stored.""" - return os.path.join(PROFILER_TMPDIR, autodir, "results", "default", - "profiler_test", "profiling") - - class profiler_proxy(object): """ This is a server-side class that acts as a proxy to a real client-side profiler class.""" - def __init__(self, job, profiler_name): - self.job = job + def __init__(self, profiler_name): self.name = profiler_name - # maps hostname to (host object, autotest.Autotest object, Autotest - # install dir), where the host object is the one created specifically - # for profiling - self.installed_hosts = {} - self.current_test = None # does the profiler support rebooting? profiler_module = common.setup_modules.import_module( @@ -89,48 +49,9 @@ class profiler_proxy(object): self.supports_reboot = profiler_class.supports_reboot - def _install(self): - """ Install autotest on any current job hosts. """ - in_use_hosts = set(host.hostname for host in self.job.hosts - if not # exclude hosts created here for profiling - (host.get_autodir() and - host.get_autodir().startswith(PROFILER_TMPDIR))) - logging.debug('Hosts currently in use: %s', in_use_hosts) - - # determine what valid host objects we already have installed - profiler_hosts = set() - for host, at, profiler_dir in self.installed_hosts.values(): - if host.path_exists(profiler_dir): - profiler_hosts.add(host.hostname) - else: - # the profiler was wiped out somehow, drop this install - logging.warning('The profiler client on %s at %s was deleted', - host.hostname, profiler_dir) - host.close() - del self.installed_hosts[host.hostname] - logging.debug('Hosts with profiler clients already installed: %s', - profiler_hosts) - - # install autotest on any new hosts in use - for hostname in in_use_hosts - profiler_hosts: - host = hosts.create_host(hostname, auto_monitor=False) - tmp_dir = host.get_tmp_dir(parent=PROFILER_TMPDIR) - at = autotest.Autotest(host) - at.install_no_autoserv(autodir=tmp_dir) - self.installed_hosts[host.hostname] = (host, at, tmp_dir) - - # drop any installs from hosts no longer in job.hosts - hostnames_to_drop = profiler_hosts - in_use_hosts - hosts_to_drop = [self.installed_hosts[hostname][0] - for hostname in hostnames_to_drop] - for host in hosts_to_drop: - host.close() - del self.installed_hosts[host.hostname] - - def initialize(self, *args, **dargs): - validate_args(args) - validate_args(dargs) + _validate_args(args) + _validate_args(dargs) self.args, self.dargs = args, dargs @@ -139,129 +60,13 @@ class profiler_proxy(object): # the actual setup happens lazily at start() - def _signal_client(self, host, autodir, command): - """ Signal to a client that it should execute profilers.command - by writing a byte into AUTODIR/profilers.command. """ - path = os.path.join(autodir, "profiler.%s" % command) - host.run("echo A > %s" % path) - - - def _wait_on_client(self, host, autodir, command): - """ Wait for the client to signal that it's finished by writing - a byte into AUTODIR/profilers.command. Only waits for 30 seconds - before giving up. """ - path = os.path.join(autodir, "profiler.%s" % command) - try: - host.run("cat %s" % path, ignore_status=True, timeout=180) - except error.AutoservSSHTimeout: - pass # even if it times out, just give up and go ahead anyway - - - def _get_hosts(self, host=None): - """ - Returns a list of (Host, Autotest, install directory) tuples for hosts - currently supported by this profiler. The returned Host object is always - the one created by this profiler, regardless of what's passed in. If - 'host' is not None, all entries not matching that host object are - filtered out of the list. - """ - if host is None: - return self.installed_hosts.values() - if host.hostname in self.installed_hosts: - return [self.installed_hosts[host.hostname]] - return [] - - - def _get_failure_logs(self, autodir, test, host): - """Collect the client logs from a profiler run and put them in a - file named failure-*.log.""" - try: - profdir = os.path.join(test.profdir, host.hostname) - if not os.path.exists(profdir): - os.makedirs(profdir) - fd, path = tempfile.mkstemp(suffix=".log", prefix="failure-", - dir=os.path.join(test.profdir, - host.hostname)) - os.close(fd) - host.get_file(get_profiler_log_path(autodir), path) - except (error.AutotestError, error.AutoservError): - logging.exception("Profiler failure log collection failed") - # swallow the exception so that we don't override an existing - # exception being thrown - - def start(self, test, host=None): - self._install() - encoded_args = encode_args(self.name, self.args, self.dargs) - control_script = run_profiler_control % (encoded_args, self.name) - for host, at, autodir in self._get_hosts(host): - fifo_pattern = os.path.join(autodir, "profiler.*") - host.run("rm -f %s" % fifo_pattern) - host.run("mkfifo %s" % os.path.join(autodir, "profiler.ready")) - try: - at.run(control_script, background=True) - self._wait_on_client(host, autodir, "ready") - self._signal_client(host, autodir, "start") - - remote_results_dir = get_profiler_results_dir(autodir) - local_results_dir = os.path.join(test.profdir, host.hostname) - self.job.add_client_log(host.hostname, remote_results_dir, - local_results_dir) - except: - self._get_failure_logs(autodir, test, host) - raise - self.current_test = test + raise NotImplementedError('start not implemented') def stop(self, test, host=None): - assert self.current_test == test - for host, at, autodir in self._get_hosts(host): - try: - self._signal_client(host, autodir, "stop") - except: - self._get_failure_logs(autodir, test, host) - raise + raise NotImplementedError('stop not implemented') def report(self, test, host=None, wait_on_client=True): - assert self.current_test == test - self.current_test = None - - # signal to all the clients that they should report - if wait_on_client: - for host, at, autodir in self._get_hosts(host): - try: - self._signal_client(host, autodir, "report") - except: - self._get_failure_logs(autodir, test, host) - raise - - # pull back all the results - for host, at, autodir in self._get_hosts(host): - if wait_on_client: - self._wait_on_client(host, autodir, "finished") - results_dir = get_profiler_results_dir(autodir) - local_dir = os.path.join(test.profdir, host.hostname) - if not os.path.exists(local_dir): - os.makedirs(local_dir) - - self.job.remove_client_log(host.hostname, results_dir, local_dir) - tempdir = tempfile.mkdtemp(dir=self.job.tmpdir) - try: - host.get_file(results_dir + "/", tempdir) - except error.AutoservRunError: - pass # no files to pull back, nothing we can do - utils.merge_trees(tempdir, local_dir) - shutil.rmtree(tempdir, ignore_errors=True) - - - def handle_reboot(self, host): - if self.current_test: - test = self.current_test - if not self.supports_reboot: - msg = "profiler '%s' does not support rebooting during tests" - msg %= self.name - self.job.record("WARN", os.path.basename(test.outputdir), - None, msg) - self.report(test, host, wait_on_client=False) - self.start(test, host) + raise NotImplementedError('report not implemented') diff --git a/server/profilers.py b/server/profilers.py index 2dd4dada..a09a9957 100644 --- a/server/profilers.py +++ b/server/profilers.py @@ -1,18 +1,47 @@ -import os, sys +import os, shutil, tempfile, logging + import common +from autotest_lib.client.common_lib import utils, error, profiler_manager +from autotest_lib.server import profiler, autotest, standalone_profiler, hosts + + +PROFILER_TMPDIR = '/tmp/profilers' + + +def get_profiler_results_dir(autodir): + """ + Given the directory of the autotest client used to run a profiler, + return the remote path where profiler results will be stored. + """ + return os.path.join(autodir, 'results', 'default', 'barriertest', + 'profiling') + -from autotest_lib.client.common_lib import utils, packages, profiler_manager -from autotest_lib.server import profiler +def get_profiler_log_path(autodir): + """ + Given the directory of a profiler client, find the client log path. + """ + return os.path.join(autodir, 'results', 'default', 'debug', 'client.DEBUG') class profilers(profiler_manager.profiler_manager): def __init__(self, job): super(profilers, self).__init__(job) self.add_log = {} + self.start_delay = 0 + # maps hostname to (host object, autotest.Autotest object, Autotest + # install dir), where the host object is the one created specifically + # for profiling + self.installed_hosts = {} + self.current_test = None + + + def set_start_delay(self, start_delay): + self.start_delay = start_delay def load_profiler(self, profiler_name, args, dargs): - newprofiler = profiler.profiler_proxy(self.job, profiler_name) + newprofiler = profiler.profiler_proxy(profiler_name) newprofiler.initialize(*args, **dargs) newprofiler.setup(*args, **dargs) # lazy setup is done client-side return newprofiler @@ -29,6 +58,205 @@ class profilers(profiler_manager.profiler_manager): del self.add_log[profiler] + def _install_clients(self): + """ + Install autotest on any current job hosts. + """ + in_use_hosts = set() + # find hosts in use but not used by us + for host in self.job.hosts: + autodir = host.get_autodir() + if not (autodir and autodir.startswith(PROFILER_TMPDIR)): + in_use_hosts.add(host.hostname) + logging.debug('Hosts currently in use: %s', in_use_hosts) + + # determine what valid host objects we already have installed + profiler_hosts = set() + for host, at, profiler_dir in self.installed_hosts.values(): + if host.path_exists(profiler_dir): + profiler_hosts.add(host.hostname) + else: + # the profiler was wiped out somehow, drop this install + logging.warning('The profiler client on %s at %s was deleted', + host.hostname, profiler_dir) + host.close() + del self.installed_hosts[host.hostname] + logging.debug('Hosts with profiler clients already installed: %s', + profiler_hosts) + + # install autotest on any new hosts in use + for hostname in in_use_hosts - profiler_hosts: + host = hosts.create_host(hostname, auto_monitor=False) + tmp_dir = host.get_tmp_dir(parent=PROFILER_TMPDIR) + at = autotest.Autotest(host) + at.install_no_autoserv(autodir=tmp_dir) + self.installed_hosts[host.hostname] = (host, at, tmp_dir) + + # drop any installs from hosts no longer in job.hosts + hostnames_to_drop = profiler_hosts - in_use_hosts + hosts_to_drop = [self.installed_hosts[hostname][0] + for hostname in hostnames_to_drop] + for host in hosts_to_drop: + host.close() + del self.installed_hosts[host.hostname] + + + def _get_hosts(self, host=None): + """ + Returns a list of (Host, Autotest, install directory) tuples for hosts + currently supported by this profiler. The returned Host object is always + the one created by this profiler, regardless of what's passed in. If + 'host' is not None, all entries not matching that host object are + filtered out of the list. + """ + if host is None: + return self.installed_hosts.values() + if host.hostname in self.installed_hosts: + return [self.installed_hosts[host.hostname]] + return [] + + + def _get_local_profilers_dir(self, test, hostname): + if not test.job.in_machine_dir and len(self.installed_hosts) > 1: + local_dir = os.path.join(test.profdir, hostname) + if not os.path.exists(local_dir): + os.makedirs(local_dir) + else: + local_dir = test.profdir + + return local_dir + + + def _get_failure_logs(self, autodir, test, host): + """ + Collect the client logs from a profiler run and put them in a + file named failure-*.log. + """ + try: + fd, path = tempfile.mkstemp(suffix='.log', prefix='failure-', + dir=self._get_local_profilers_dir(test, host.hostname)) + os.close(fd) + host.get_file(get_profiler_log_path(autodir), path) + except (error.AutotestError, error.AutoservError): + logging.exception('Profiler failure log collection failed') + # swallow the exception so that we don't override an existing + # exception being thrown + + + def _get_all_failure_logs(self, test, hosts): + for host, at, autodir in hosts: + self._get_failure_logs(autodir, test, host) + + + def _run_clients(self, test, hosts): + """ + We initialize the profilers just before start because only then we + know all the hosts involved. + """ + + hostnames = [host_info[0].hostname for host_info in hosts] + profilers_args = [(p.name, p.args, p.dargs) + for p in self.list] + + for host, at, autodir in hosts: + control_script = standalone_profiler.generate_test(hostnames, + host.hostname, + profilers_args, + 180, None) + try: + at.run(control_script, background=True) + except Exception: + self._get_failure_logs(autodir, test, host) + raise + + remote_results_dir = get_profiler_results_dir(autodir) + local_results_dir = self._get_local_profilers_dir(test, + host.hostname) + self.job.add_client_log(host.hostname, remote_results_dir, + local_results_dir) + + try: + # wait for the profilers to be added + standalone_profiler.wait_for_profilers(hostnames) + except Exception: + self._get_all_failure_logs(test, hosts) + raise + + + def before_start(self, test, host=None): + # create host objects and install the needed clients + # so later in start() we don't spend too much time + self._install_clients() + self._run_clients(test, self._get_hosts(host)) + + + def start(self, test, host=None): + hosts = self._get_hosts(host) + + # wait for the profilers to start + hostnames = [host_info[0].hostname for host_info in hosts] + try: + standalone_profiler.start_profilers(hostnames) + except Exception: + self._get_all_failure_logs(test, hosts) + raise + + self.current_test = test + + + def stop(self, test): + assert self.current_test == test + + hosts = self._get_hosts() + # wait for the profilers to stop + hostnames = [host_info[0].hostname for host_info in hosts] + try: + standalone_profiler.stop_profilers(hostnames) + except Exception: + self._get_all_failure_logs(test, hosts) + raise + + + def report(self, test, host=None): + assert self.current_test == test + + hosts = self._get_hosts(host) + # when running on specific hosts we cannot wait for the other + # hosts to sync with us + if not host: + hostnames = [host_info[0].hostname for host_info in hosts] + try: + standalone_profiler.finish_profilers(hostnames) + except Exception: + self._get_all_failure_logs(test, hosts) + raise + + # pull back all the results + for host, at, autodir in hosts: + results_dir = get_profiler_results_dir(autodir) + local_dir = self._get_local_profilers_dir(test, host.hostname) + + self.job.remove_client_log(host.hostname, results_dir, local_dir) + + tempdir = tempfile.mkdtemp(dir=self.job.tmpdir) + try: + host.get_file(results_dir + '/', tempdir) + except error.AutoservRunError: + pass # no files to pull back, nothing we can do + utils.merge_trees(tempdir, local_dir) + shutil.rmtree(tempdir, ignore_errors=True) + + def handle_reboot(self, host): - for p in self.list: - p.handle_reboot(host) + if self.current_test: + test = self.current_test + for profiler in self.list: + if not profiler.supports_reboot: + msg = 'profiler %s does not support rebooting during tests' + msg %= profiler.name + self.job.record('WARN', os.path.basename(test.outputdir), + None, msg) + + self.report(test, host) + self.before_start(test, host) + self.start(test, host) diff --git a/server/server_job.py b/server/server_job.py index 9d7246b3..2b8ef6eb 100755 --- a/server/server_job.py +++ b/server/server_job.py @@ -146,6 +146,10 @@ class base_server_job(base_job.base_job): self.num_tests_run = 0 self.num_tests_failed = 0 + # should tell us if this job results are inside a machine named + # directory + self.in_machine_dir = False + self._register_subcommand_hooks() self._test_tag_prefix = None @@ -311,6 +315,7 @@ class base_server_job(base_job.base_job): self.machines = [machine] self.push_execution_context(machine) os.chdir(self.resultdir) + self.in_machine_dir = True utils.write_keyval(self.resultdir, {"hostname": machine}) self.init_parser(self.resultdir) result = function(machine) @@ -320,6 +325,7 @@ class base_server_job(base_job.base_job): def wrapper(machine): self.push_execution_context(machine) os.chdir(self.resultdir) + self.in_machine_dir = True machine_data = {'hostname' : machine, 'status_version' : str(self._STATUS_VERSION)} utils.write_keyval(self.resultdir, machine_data) |