# # Copyright 2008 Google Inc. All Rights Reserved. """ The job module contains the objects and methods used to manage jobs in Autotest. The valid actions are: list: lists job(s) create: create a job abort: abort job(s) stat: detailed listing of job(s) The common options are: See topic_common.py for a High Level Design and Algorithm. """ import getpass, os, pwd, re, socket, sys from autotest_lib.cli import topic_common, action_common class job(topic_common.atest): """Job class atest job [create|clone|list|stat|abort] """ usage_action = '[create|clone|list|stat|abort]' topic = msg_topic = 'job' msg_items = '' def _convert_status(self, results): for result in results: total = sum(result['status_counts'].values()) status = ['%s=%s(%.1f%%)' % (key, val, 100.0*float(val)/total) for key, val in result['status_counts'].iteritems()] status.sort() result['status_counts'] = ', '.join(status) def backward_compatibility(self, action, argv): """ 'job create --clone' became 'job clone --id' """ if action == 'create': for option in ['-l', '--clone']: if option in argv: argv[argv.index(option)] = '--id' action = 'clone' return action class job_help(job): """Just here to get the atest logic working. Usage is set by its parent""" pass class job_list_stat(action_common.atest_list, job): def __init__(self): super(job_list_stat, self).__init__() self.topic_parse_info = topic_common.item_parse_info( attribute_name='jobs', use_leftover=True) def __split_jobs_between_ids_names(self): job_ids = [] job_names = [] # Sort between job IDs and names for job_id in self.jobs: if job_id.isdigit(): job_ids.append(job_id) else: job_names.append(job_id) return (job_ids, job_names) def execute_on_ids_and_names(self, op, filters={}, check_results={'id__in': 'id', 'name__in': 'id'}, tag_id='id__in', tag_name='name__in'): if not self.jobs: # Want everything return super(job_list_stat, self).execute(op=op, filters=filters) all_jobs = [] (job_ids, job_names) = self.__split_jobs_between_ids_names() for items, tag in [(job_ids, tag_id), (job_names, tag_name)]: if items: new_filters = filters.copy() new_filters[tag] = items jobs = super(job_list_stat, self).execute(op=op, filters=new_filters, check_results=check_results) all_jobs.extend(jobs) return all_jobs class job_list(job_list_stat): """atest job list [] [--all] [--running] [--user ]""" def __init__(self): super(job_list, self).__init__() self.parser.add_option('-a', '--all', help='List jobs for all ' 'users.', action='store_true', default=False) self.parser.add_option('-r', '--running', help='List only running ' 'jobs', action='store_true') self.parser.add_option('-u', '--user', help='List jobs for given ' 'user', type='string') def parse(self): options, leftover = super(job_list, self).parse() self.all = options.all self.data['running'] = options.running if options.user: if options.all: self.invalid_syntax('Only specify --all or --user, not both.') else: self.data['owner'] = options.user elif not options.all and not self.jobs: self.data['owner'] = getpass.getuser() return options, leftover def execute(self): return self.execute_on_ids_and_names(op='get_jobs_summary', filters=self.data) def output(self, results): keys = ['id', 'owner', 'name', 'status_counts'] if self.verbose: keys.extend(['priority', 'control_type', 'created_on']) self._convert_status(results) super(job_list, self).output(results, keys) class job_stat(job_list_stat): """atest job stat """ usage_action = 'stat' def __init__(self): super(job_stat, self).__init__() self.parser.add_option('-f', '--control-file', help='Display the control file', action='store_true', default=False) self.parser.add_option('-N', '--list-hosts', help='Display only a list of hosts', action='store_true') self.parser.add_option('-s', '--list-hosts-status', help='Display only the hosts in these statuses ' 'for a job.', action='store') def parse(self): status_list = topic_common.item_parse_info( attribute_name='status_list', inline_option='list_hosts_status') options, leftover = super(job_stat, self).parse([status_list], req_items='jobs') if not self.jobs: self.invalid_syntax('Must specify at least one job.') self.show_control_file = options.control_file self.list_hosts = options.list_hosts if self.list_hosts and self.status_list: self.invalid_syntax('--list-hosts is implicit when using ' '--list-hosts-status.') if len(self.jobs) > 1 and (self.list_hosts or self.status_list): self.invalid_syntax('--list-hosts and --list-hosts-status should ' 'only be used on a single job.') return options, leftover def _merge_results(self, summary, qes): hosts_status = {} for qe in qes: if qe['host']: job_id = qe['job']['id'] hostname = qe['host']['hostname'] hosts_status.setdefault(job_id, {}).setdefault(qe['status'], []).append(hostname) for job in summary: job_id = job['id'] if hosts_status.has_key(job_id): this_job = hosts_status[job_id] job['hosts'] = ' '.join(' '.join(host) for host in this_job.itervalues()) host_per_status = ['%s="%s"' %(status, ' '.join(host)) for status, host in this_job.iteritems()] job['hosts_status'] = ', '.join(host_per_status) if self.status_list: statuses = set(s.lower() for s in self.status_list) all_hosts = [s for s in host_per_status if s.split('=', 1)[0].lower() in statuses] job['hosts_selected_status'] = '\n'.join(all_hosts) else: job['hosts_status'] = '' if not job.get('hosts'): self.generic_error('Job has unassigned meta-hosts, ' 'try again shortly.') return summary def execute(self): summary = self.execute_on_ids_and_names(op='get_jobs_summary') # Get the real hostnames qes = self.execute_on_ids_and_names(op='get_host_queue_entries', check_results={}, tag_id='job__in', tag_name='job__name__in') self._convert_status(summary) return self._merge_results(summary, qes) def output(self, results): if self.list_hosts: keys = ['hosts'] elif self.status_list: keys = ['hosts_selected_status'] elif not self.verbose: keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status'] else: keys = ['id', 'name', 'priority', 'status_counts', 'hosts_status', 'owner', 'control_type', 'synch_count', 'created_on', 'run_verify', 'reboot_before', 'reboot_after', 'parse_failed_repair'] if self.show_control_file: keys.append('control_file') super(job_stat, self).output(results, keys) class job_create_or_clone(action_common.atest_create, job): """Class containing the code common to the job create and clone actions""" msg_items = 'job_name' def __init__(self): super(job_create_or_clone, self).__init__() self.hosts = [] self.data_item_key = 'name' self.parser.add_option('-p', '--priority', help='Job priority (low, ' 'medium, high, urgent), default=medium', type='choice', choices=('low', 'medium', 'high', 'urgent'), default='medium') self.parser.add_option('-b', '--labels', help='Comma separated list of labels ' 'to get machine list from.', default='') self.parser.add_option('-m', '--machine', help='List of machines to ' 'run on') self.parser.add_option('-M', '--mlist', help='File listing machines to use', type='string', metavar='MACHINE_FLIST') self.parser.add_option('--one-time-hosts', help='List of one time hosts') self.parser.add_option('-e', '--email', help='A comma seperated list of ' 'email addresses to notify of job completion', default='') def _parse_hosts(self, args): """ Parses the arguments to generate a list of hosts and meta_hosts A host is a regular name, a meta_host is n*label or *label. These can be mixed on the CLI, and separated by either commas or spaces, e.g.: 5*Machine_Label host0 5*Machine_Label2,host2 """ hosts = [] meta_hosts = [] for arg in args: for host in arg.split(','): if re.match('^[0-9]+[*]', host): num, host = host.split('*', 1) meta_hosts += int(num) * [host] elif re.match('^[*](\w*)', host): meta_hosts += [re.match('^[*](\w*)', host).group(1)] elif host != '' and host not in hosts: # Real hostname and not a duplicate hosts.append(host) return (hosts, meta_hosts) def parse(self, parse_info=[]): host_info = topic_common.item_parse_info(attribute_name='hosts', inline_option='machine', filename_option='mlist') job_info = topic_common.item_parse_info(attribute_name='jobname', use_leftover=True) oth_info = topic_common.item_parse_info(attribute_name='one_time_hosts', inline_option='one_time_hosts') label_info = topic_common.item_parse_info(attribute_name='labels', inline_option='labels') options, leftover = super(job_create_or_clone, self).parse( [host_info, job_info, oth_info, label_info] + parse_info, req_items='jobname') self.data = {} if len(self.jobname) > 1: self.invalid_syntax('Too many arguments specified, only expected ' 'to receive job name: %s' % self.jobname) self.jobname = self.jobname[0] if options.priority: self.data['priority'] = options.priority.capitalize() if self.one_time_hosts: self.data['one_time_hosts'] = self.one_time_hosts if self.labels: label_hosts = self.execute_rpc(op='get_hosts', multiple_labels=self.labels) for host in label_hosts: self.hosts.append(host['hostname']) self.data['name'] = self.jobname (self.data['hosts'], self.data['meta_hosts']) = self._parse_hosts(self.hosts) self.data['email_list'] = options.email return options, leftover def create_job(self): job_id = self.execute_rpc(op='create_job', **self.data) return ['%s (id %s)' % (self.jobname, job_id)] def get_items(self): return [self.jobname] class job_create(job_create_or_clone): """atest job create [--priority ] [--synch_count] [--control-file ] [--on-server] [--test ] [--kernel ] [--mlist ] [--machine ] [--labels ] [--reboot_before