########################################################################### # # Copyright 2010-2011 VMware, Inc. # All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sub license, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice (including the # next paragraph) shall be included in all copies or substantial portions # of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS AND/OR ITS SUPPLIERS BE LIABLE FOR # ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # ########################################################################### """Testing framework that assists invoking external programs and outputing results in ANT's junit XML format, used by Jenkins-CI.""" import locale import optparse import os.path import shutil import string import sys import time __all__ = [ 'Error', 'Failure', 'Main', 'Report', 'Test', 'TestSuite', ] class Failure(Exception): pass class Error(Exception): pass def escape(s): '''Escape and encode a XML string.''' if not isinstance(s, unicode): #s = s.decode(locale.getpreferredencoding(), 'replace') s = s.decode('ascii', 'ignore') s = s.replace('&', '&') s = s.replace('<', '<') s = s.replace('>', '>') s = s.replace('"', '"') s = s.replace("'", ''') s = s.encode('UTF-8') return s # same as string.printable, but without '\v\f' _printable = string.digits + string.letters + string.punctuation + ' \t\n\r' _printable = ''.join([chr(_c) in _printable and chr(_c) or '?' for _c in range(256)]) del _c class Report: """Write test results in ANT's junit XML format. See also: - https://github.com/jenkinsci/jenkins/tree/master/test/src/test/resources/hudson/tasks/junit - http://www.junit.org/node/399 - http://wiki.apache.org/ant/Proposals/EnhancedTestReports """ def __init__(self, filename, time = True): self.path = os.path.dirname(os.path.abspath(filename)) if not os.path.exists(self.path): os.makedirs(self.path) self.stream = open(filename, 'wt') self.testsuites = [] self.inside_testsuite = False self.inside_testcase = False self.time = time def start(self): self.stream.write('\n') self.stream.write('\n') def stop(self): if self.inside_testcase: self.stream.write('\n') self.inside_testcase = False if self.inside_testsuite: self.stream.write('\n') self.inside_testsuite = False self.stream.write('\n') self.stream.flush() self.stream.close() def escapeName(self, name): '''Dots are special for junit, so escape them with underscores.''' name = name.replace('.', '_') return name def startSuite(self, name): self.testsuites.append(self.escapeName(name)) def stopSuite(self): if self.inside_testsuite: self.stream.write('\n') self.inside_testsuite = False self.testsuites.pop(-1) def startCase(self, name): assert not self.inside_testcase self.inside_testcase = True if not self.inside_testsuite: self.stream.write('\n' % escape('.'.join(self.testsuites[:1]))) self.inside_testsuite = True self.case_name = name self.buffer = [] self.stdout = [] self.stderr = [] self.start_time = time.time() def stopCase(self, duration = None): assert self.inside_testcase self.inside_testcase = False if len(self.testsuites) == 1: classname = self.testsuites[0] + '.' + self.testsuites[0] else: classname = '.'.join(self.testsuites) name = self.case_name self.stream.write('\n') else: self.stream.write('>') for entry in self.buffer: self.stream.write(entry) if self.stdout: self.stream.write('') for text in self.stdout: self.stream.write(escape(text)) self.stream.write('') if self.stderr: self.stream.write('') for text in self.stderr: self.stream.write(escape(text)) self.stream.write('') self.stream.write('\n') self.stream.flush() def addStdout(self, text): if isinstance(text, str): text = text.translate(_printable) self.stdout.append(text) def addStderr(self, text): if isinstance(text, str): text = text.translate(_printable) self.stderr.append(text) def addSkipped(self): self.buffer.append('\n') def addError(self, message, stacktrace=""): self.buffer.append('') else: self.buffer.append('>') self.buffer.append(escape(stacktrace)) self.buffer.append('') def addFailure(self, message, stacktrace=""): self.buffer.append('') else: self.buffer.append('>') self.buffer.append(escape(stacktrace)) self.buffer.append('') def addMeasurement(self, name, value): '''Embedded a measurement in the standard output. https://wiki.jenkins-ci.org/display/JENKINS/Measurement+Plots+Plugin ''' if value is not None: message = '%s%f\n' % (name, value) self.addStdout(message) def addAttachment(self, path): '''Attach a file. https://wiki.jenkins-ci.org/display/JENKINS/JUnit+Attachments+Plugin ''' attach_dir = os.path.join(self.path, '.'.join(self.testsuites + [self.case_name])) if not os.path.exists(attach_dir): os.makedirs(attach_dir) shutil.copy2(path, attach_dir) def addWorkspaceURL(self, path): import urlparse try: workspace_path = os.environ['WORKSPACE'] job_url = os.environ['JOB_URL'] except KeyError: self.addStdout(path + '\n') else: rel_path = os.path.relpath(path, workspace_path) workspace_url = urlparse.urljoin(job_url, 'ws/') url = urlparse.urljoin(workspace_url, rel_path) if os.path.isdir(path): url += '/' self.addStdout(url + '\n') class BaseTest: def _visit(self, report): raise NotImplementedError def fail(self, *args): raise Failure(*args) def error(self, *args): raise Error(*args) class TestSuite(BaseTest): def __init__(self, name, tests=()): self.name = name self.tests = [] self.addTests(tests) def addTest(self, test): self.tests.append(test) def addTests(self, tests): for test in tests: self.addTest(test) def run(self, filename = None, report = None): if report is None: if filename is None: filename = self.name + '.xml' report = Report(filename) report.start() try: self._visit(report) finally: report.stop() def _visit(self, report): report.startSuite(self.name) try: self.test(report) finally: report.stopSuite() def test(self, report): for test in self.tests: test._visit(report) class Test(BaseTest): def __init__(self, name): self.name = name def _visit(self, report): report.startCase(self.name) try: try: return self.test(report) except Failure, ex: report.addFailure(*ex.args) except Error, ex: report.addError(*ex.args) except KeyboardInterrupt: raise except: report.addError(str(sys.exc_value)) finally: report.stopCase() def test(self, report): raise NotImplementedError class Main: default_timeout = 5*60 def __init__(self, name): self.name = name def optparser(self): optparser = optparse.OptionParser(usage="\n\t%prog [options] ...") optparser.add_option( '-n', '--dry-run', action="store_true", dest="dry_run", default=False, help="perform a trial run without executing") optparser.add_option( '-t', '--timeout', metavar='SECONDS', type="float", dest="timeout", default = self.default_timeout, help="timeout in seconds [default: %default]") #optparser.add_option( # '-f', '--filter', # action='append', # type="choice", metevar='GLOB', # dest="filters", default=[], # help="filter") return optparser def create_suite(self): raise NotImplementedError def run_suite(self, suite): filename = self.name + '.xml' report = Report(filename) suite.run() def main(self): optparser = self.optparser() (self.options, self.args) = optparser.parse_args(sys.argv[1:]) suite = self.create_suite() self.run_suite(suite)