diff options
Diffstat (limited to 'yocto-poky/meta/lib/oeqa/oetest.py')
-rw-r--r-- | yocto-poky/meta/lib/oeqa/oetest.py | 401 |
1 files changed, 298 insertions, 103 deletions
diff --git a/yocto-poky/meta/lib/oeqa/oetest.py b/yocto-poky/meta/lib/oeqa/oetest.py index 6f9edec58..3ed5bb8c2 100644 --- a/yocto-poky/meta/lib/oeqa/oetest.py +++ b/yocto-poky/meta/lib/oeqa/oetest.py @@ -7,16 +7,25 @@ # It also has some helper functions and it's responsible for actually starting the tests -import os, re, mmap +import os, re, mmap, sys import unittest import inspect import subprocess +import signal try: import bb except ImportError: pass import logging + +import oeqa.runtime +# Exported test doesn't require sdkext +try: + import oeqa.sdkext +except ImportError: + pass from oeqa.utils.decorators import LogResults, gettag, getResults +from oeqa.utils import avoid_paths_in_environ logger = logging.getLogger("BitBake") @@ -30,7 +39,6 @@ def getVar(obj): def checkTags(tc, tagexp): return eval(tagexp, None, getVar(tc)) - def filterByTagExp(testsuite, tagexp): if not tagexp: return testsuite @@ -43,106 +51,6 @@ def filterByTagExp(testsuite, tagexp): caseList.append(filterByTagExp(each, tagexp)) return testsuite.__class__(caseList) -def loadTests(tc, type="runtime"): - if type == "runtime": - # set the context object passed from the test class - setattr(oeTest, "tc", tc) - # set ps command to use - setattr(oeRuntimeTest, "pscmd", "ps -ef" if oeTest.hasPackage("procps") else "ps") - # prepare test suite, loader and runner - suite = unittest.TestSuite() - elif type == "sdk": - # set the context object passed from the test class - setattr(oeTest, "tc", tc) - testloader = unittest.TestLoader() - testloader.sortTestMethodsUsing = None - suites = [testloader.loadTestsFromName(name) for name in tc.testslist] - suites = filterByTagExp(suites, getattr(tc, "tagexp", None)) - - def getTests(test): - '''Return all individual tests executed when running the suite.''' - # Unfortunately unittest does not have an API for this, so we have - # to rely on implementation details. This only needs to work - # for TestSuite containing TestCase. - method = getattr(test, '_testMethodName', None) - if method: - # leaf case: a TestCase - yield test - else: - # Look into TestSuite. - tests = getattr(test, '_tests', []) - for t1 in tests: - for t2 in getTests(t1): - yield t2 - - # Determine dependencies between suites by looking for @skipUnlessPassed - # method annotations. Suite A depends on suite B if any method in A - # depends on a method on B. - for suite in suites: - suite.dependencies = [] - suite.depth = 0 - for test in getTests(suite): - methodname = getattr(test, '_testMethodName', None) - if methodname: - method = getattr(test, methodname) - depends_on = getattr(method, '_depends_on', None) - if depends_on: - for dep_suite in suites: - if depends_on in [getattr(t, '_testMethodName', None) for t in getTests(dep_suite)]: - if dep_suite not in suite.dependencies and \ - dep_suite is not suite: - suite.dependencies.append(dep_suite) - break - else: - logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % - (test, depends_on)) - # Use brute-force topological sort to determine ordering. Sort by - # depth (higher depth = must run later), with original ordering to - # break ties. - def set_suite_depth(suite): - for dep in suite.dependencies: - new_depth = set_suite_depth(dep) + 1 - if new_depth > suite.depth: - suite.depth = new_depth - return suite.depth - for index, suite in enumerate(suites): - set_suite_depth(suite) - suite.index = index - suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index))) - return testloader.suiteClass(suites) - -_buffer = "" - -def custom_verbose(msg, *args, **kwargs): - global _buffer - if msg[-1] != "\n": - _buffer += msg - else: - _buffer += msg - try: - bb.plain(_buffer.rstrip("\n"), *args, **kwargs) - except NameError: - logger.info(_buffer.rstrip("\n"), *args, **kwargs) - _buffer = "" - -def runTests(tc, type="runtime"): - - suite = loadTests(tc, type) - logger.info("Test modules %s" % tc.testslist) - if hasattr(tc, "tagexp") and tc.tagexp: - logger.info("Filter test cases by tags: %s" % tc.tagexp) - logger.info("Found %s tests" % suite.countTestCases()) - runner = unittest.TextTestRunner(verbosity=2) - try: - if bb.msg.loggerDefaultVerbose: - runner.stream.write = custom_verbose - except NameError: - # Not in bb environment? - pass - result = runner.run(suite) - - return result - @LogResults class oeTest(unittest.TestCase): @@ -222,7 +130,19 @@ class oeSDKTest(oeTest): return False def _run(self, cmd): - return subprocess.check_output(". %s; " % self.tc.sdkenv + cmd, shell=True) + return subprocess.check_output(". %s > /dev/null; %s;" % (self.tc.sdkenv, cmd), shell=True) + +class oeSDKExtTest(oeSDKTest): + def _run(self, cmd): + # extensible sdk shows a warning if found bitbake in the path + # because can cause contamination, i.e. use devtool from + # poky/scripts instead of eSDK one. + env = os.environ.copy() + paths_to_avoid = ['bitbake/bin', 'poky/scripts'] + env['PATH'] = avoid_paths_in_environ(paths_to_avoid) + + return subprocess.check_output(". %s > /dev/null;"\ + " %s;" % (self.tc.sdkenv, cmd), shell=True, env=env) def getmodule(pos=2): # stack returns a list of tuples containg frame information @@ -250,3 +170,278 @@ def skipModuleUnless(cond, reason): if not cond: skipModule(reason, 3) + +_buffer_logger = "" +def custom_verbose(msg, *args, **kwargs): + global _buffer_logger + if msg[-1] != "\n": + _buffer_logger += msg + else: + _buffer_logger += msg + try: + bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs) + except NameError: + logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs) + _buffer_logger = "" + +class TestContext(object): + def __init__(self, d): + self.d = d + + self.testsuites = self._get_test_suites() + self.testslist = self._get_tests_list(d.getVar("BBPATH", True).split(':')) + self.testsrequired = self._get_test_suites_required() + + self.filesdir = os.path.join(os.path.dirname(os.path.abspath( + oeqa.runtime.__file__)), "files") + self.imagefeatures = d.getVar("IMAGE_FEATURES", True).split() + self.distrofeatures = d.getVar("DISTRO_FEATURES", True).split() + + # get testcase list from specified file + # if path is a relative path, then relative to build/conf/ + def _read_testlist(self, fpath, builddir): + if not os.path.isabs(fpath): + fpath = os.path.join(builddir, "conf", fpath) + if not os.path.exists(fpath): + bb.fatal("No such manifest file: ", fpath) + tcs = [] + for line in open(fpath).readlines(): + line = line.strip() + if line and not line.startswith("#"): + tcs.append(line) + return " ".join(tcs) + + # return test list by type also filter if TEST_SUITES is specified + def _get_tests_list(self, bbpath): + testslist = [] + + type = self._get_test_namespace() + + # This relies on lib/ under each directory in BBPATH being added to sys.path + # (as done by default in base.bbclass) + for testname in self.testsuites: + if testname != "auto": + if testname.startswith("oeqa."): + testslist.append(testname) + continue + found = False + for p in bbpath: + if os.path.exists(os.path.join(p, 'lib', 'oeqa', type, testname + '.py')): + testslist.append("oeqa." + type + "." + testname) + found = True + break + elif os.path.exists(os.path.join(p, 'lib', 'oeqa', type, testname.split(".")[0] + '.py')): + testslist.append("oeqa." + type + "." + testname) + found = True + break + if not found: + bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname) + + if "auto" in self.testsuites: + def add_auto_list(path): + if not os.path.exists(os.path.join(path, '__init__.py')): + bb.fatal('Tests directory %s exists but is missing __init__.py' % path) + files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')]) + for f in files: + module = 'oeqa.' + type + '.' + f[:-3] + if module not in testslist: + testslist.append(module) + + for p in bbpath: + testpath = os.path.join(p, 'lib', 'oeqa', type) + bb.debug(2, 'Searching for tests in %s' % testpath) + if os.path.exists(testpath): + add_auto_list(testpath) + + return testslist + + def loadTests(self): + setattr(oeTest, "tc", self) + + testloader = unittest.TestLoader() + testloader.sortTestMethodsUsing = None + suites = [testloader.loadTestsFromName(name) for name in self.testslist] + suites = filterByTagExp(suites, getattr(self, "tagexp", None)) + + def getTests(test): + '''Return all individual tests executed when running the suite.''' + # Unfortunately unittest does not have an API for this, so we have + # to rely on implementation details. This only needs to work + # for TestSuite containing TestCase. + method = getattr(test, '_testMethodName', None) + if method: + # leaf case: a TestCase + yield test + else: + # Look into TestSuite. + tests = getattr(test, '_tests', []) + for t1 in tests: + for t2 in getTests(t1): + yield t2 + + # Determine dependencies between suites by looking for @skipUnlessPassed + # method annotations. Suite A depends on suite B if any method in A + # depends on a method on B. + for suite in suites: + suite.dependencies = [] + suite.depth = 0 + for test in getTests(suite): + methodname = getattr(test, '_testMethodName', None) + if methodname: + method = getattr(test, methodname) + depends_on = getattr(method, '_depends_on', None) + if depends_on: + for dep_suite in suites: + if depends_on in [getattr(t, '_testMethodName', None) for t in getTests(dep_suite)]: + if dep_suite not in suite.dependencies and \ + dep_suite is not suite: + suite.dependencies.append(dep_suite) + break + else: + logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % + (test, depends_on)) + + # Use brute-force topological sort to determine ordering. Sort by + # depth (higher depth = must run later), with original ordering to + # break ties. + def set_suite_depth(suite): + for dep in suite.dependencies: + new_depth = set_suite_depth(dep) + 1 + if new_depth > suite.depth: + suite.depth = new_depth + return suite.depth + + for index, suite in enumerate(suites): + set_suite_depth(suite) + suite.index = index + suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index))) + + self.suite = testloader.suiteClass(suites) + + return self.suite + + def runTests(self): + logger.info("Test modules %s" % self.testslist) + if hasattr(self, "tagexp") and self.tagexp: + logger.info("Filter test cases by tags: %s" % self.tagexp) + logger.info("Found %s tests" % self.suite.countTestCases()) + runner = unittest.TextTestRunner(verbosity=2) + if 'bb' in sys.modules: + runner.stream.write = custom_verbose + + return runner.run(self.suite) + +class ImageTestContext(TestContext): + def __init__(self, d, target, host_dumper): + super(ImageTestContext, self).__init__(d) + + self.tagexp = d.getVar("TEST_SUITES_TAGS", True) + + self.target = target + self.host_dumper = host_dumper + + manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE", True), + d.getVar("IMAGE_LINK_NAME", True) + ".manifest") + nomanifest = d.getVar("IMAGE_NO_MANIFEST", True) + if nomanifest is None or nomanifest != "1": + try: + with open(manifest) as f: + self.pkgmanifest = f.read() + except IOError as e: + bb.fatal("No package manifest file found. Did you build the image?\n%s" % e) + else: + self.pkgmanifest = "" + + self.sigterm = False + self.origsigtermhandler = signal.getsignal(signal.SIGTERM) + signal.signal(signal.SIGTERM, self._sigterm_exception) + + def _sigterm_exception(self, signum, stackframe): + bb.warn("TestImage received SIGTERM, shutting down...") + self.sigterm = True + self.target.stop() + + def _get_test_namespace(self): + return "runtime" + + def _get_test_suites(self): + testsuites = [] + + manifests = (self.d.getVar("TEST_SUITES_MANIFEST", True) or '').split() + if manifests: + for manifest in manifests: + testsuites.extend(self._read_testlist(manifest, + self.d.getVar("TOPDIR", True)).split()) + + else: + testsuites = self.d.getVar("TEST_SUITES", True).split() + + return testsuites + + def _get_test_suites_required(self): + return [t for t in self.d.getVar("TEST_SUITES", True).split() if t != "auto"] + + def loadTests(self): + super(ImageTestContext, self).loadTests() + setattr(oeRuntimeTest, "pscmd", "ps -ef" if oeTest.hasPackage("procps") else "ps") + +class SDKTestContext(TestContext): + def __init__(self, d, sdktestdir, sdkenv, tcname, *args): + super(SDKTestContext, self).__init__(d) + + self.sdktestdir = sdktestdir + self.sdkenv = sdkenv + self.tcname = tcname + + if not hasattr(self, 'target_manifest'): + self.target_manifest = d.getVar("SDK_TARGET_MANIFEST", True) + try: + with open(self.target_manifest) as f: + self.pkgmanifest = f.read() + except IOError as e: + bb.fatal("No package manifest file found. Did you build the sdk image?\n%s" % e) + + if not hasattr(self, 'host_manifest'): + self.host_manifest = d.getVar("SDK_HOST_MANIFEST", True) + try: + with open(self.host_manifest) as f: + self.hostpkgmanifest = f.read() + except IOError as e: + bb.fatal("No host package manifest file found. Did you build the sdk image?\n%s" % e) + + def _get_test_namespace(self): + return "sdk" + + def _get_test_suites(self): + return (self.d.getVar("TEST_SUITES_SDK", True) or "auto").split() + + def _get_test_suites_required(self): + return [t for t in (self.d.getVar("TEST_SUITES_SDK", True) or \ + "auto").split() if t != "auto"] + +class SDKExtTestContext(SDKTestContext): + def __init__(self, d, sdktestdir, sdkenv, tcname, *args): + self.target_manifest = d.getVar("SDK_EXT_TARGET_MANIFEST", True) + self.host_manifest = d.getVar("SDK_EXT_HOST_MANIFEST", True) + if args: + self.cm = args[0] # Compatibility mode for run SDK tests + else: + self.cm = False + + super(SDKExtTestContext, self).__init__(d, sdktestdir, sdkenv, tcname) + + self.sdkextfilesdir = os.path.join(os.path.dirname(os.path.abspath( + oeqa.sdkext.__file__)), "files") + + def _get_test_namespace(self): + if self.cm: + return "sdk" + else: + return "sdkext" + + def _get_test_suites(self): + return (self.d.getVar("TEST_SUITES_SDK_EXT", True) or "auto").split() + + def _get_test_suites_required(self): + return [t for t in (self.d.getVar("TEST_SUITES_SDK_EXT", True) or \ + "auto").split() if t != "auto"] |