summaryrefslogtreecommitdiff
path: root/yocto-poky/meta/lib/oeqa/oetest.py
diff options
context:
space:
mode:
Diffstat (limited to 'yocto-poky/meta/lib/oeqa/oetest.py')
-rw-r--r--yocto-poky/meta/lib/oeqa/oetest.py401
1 files changed, 298 insertions, 103 deletions
diff --git a/yocto-poky/meta/lib/oeqa/oetest.py b/yocto-poky/meta/lib/oeqa/oetest.py
index 6f9edec58..3ed5bb8c2 100644
--- a/yocto-poky/meta/lib/oeqa/oetest.py
+++ b/yocto-poky/meta/lib/oeqa/oetest.py
@@ -7,16 +7,25 @@
# It also has some helper functions and it's responsible for actually starting the tests
-import os, re, mmap
+import os, re, mmap, sys
import unittest
import inspect
import subprocess
+import signal
try:
import bb
except ImportError:
pass
import logging
+
+import oeqa.runtime
+# Exported test doesn't require sdkext
+try:
+ import oeqa.sdkext
+except ImportError:
+ pass
from oeqa.utils.decorators import LogResults, gettag, getResults
+from oeqa.utils import avoid_paths_in_environ
logger = logging.getLogger("BitBake")
@@ -30,7 +39,6 @@ def getVar(obj):
def checkTags(tc, tagexp):
return eval(tagexp, None, getVar(tc))
-
def filterByTagExp(testsuite, tagexp):
if not tagexp:
return testsuite
@@ -43,106 +51,6 @@ def filterByTagExp(testsuite, tagexp):
caseList.append(filterByTagExp(each, tagexp))
return testsuite.__class__(caseList)
-def loadTests(tc, type="runtime"):
- if type == "runtime":
- # set the context object passed from the test class
- setattr(oeTest, "tc", tc)
- # set ps command to use
- setattr(oeRuntimeTest, "pscmd", "ps -ef" if oeTest.hasPackage("procps") else "ps")
- # prepare test suite, loader and runner
- suite = unittest.TestSuite()
- elif type == "sdk":
- # set the context object passed from the test class
- setattr(oeTest, "tc", tc)
- testloader = unittest.TestLoader()
- testloader.sortTestMethodsUsing = None
- suites = [testloader.loadTestsFromName(name) for name in tc.testslist]
- suites = filterByTagExp(suites, getattr(tc, "tagexp", None))
-
- def getTests(test):
- '''Return all individual tests executed when running the suite.'''
- # Unfortunately unittest does not have an API for this, so we have
- # to rely on implementation details. This only needs to work
- # for TestSuite containing TestCase.
- method = getattr(test, '_testMethodName', None)
- if method:
- # leaf case: a TestCase
- yield test
- else:
- # Look into TestSuite.
- tests = getattr(test, '_tests', [])
- for t1 in tests:
- for t2 in getTests(t1):
- yield t2
-
- # Determine dependencies between suites by looking for @skipUnlessPassed
- # method annotations. Suite A depends on suite B if any method in A
- # depends on a method on B.
- for suite in suites:
- suite.dependencies = []
- suite.depth = 0
- for test in getTests(suite):
- methodname = getattr(test, '_testMethodName', None)
- if methodname:
- method = getattr(test, methodname)
- depends_on = getattr(method, '_depends_on', None)
- if depends_on:
- for dep_suite in suites:
- if depends_on in [getattr(t, '_testMethodName', None) for t in getTests(dep_suite)]:
- if dep_suite not in suite.dependencies and \
- dep_suite is not suite:
- suite.dependencies.append(dep_suite)
- break
- else:
- logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
- (test, depends_on))
- # Use brute-force topological sort to determine ordering. Sort by
- # depth (higher depth = must run later), with original ordering to
- # break ties.
- def set_suite_depth(suite):
- for dep in suite.dependencies:
- new_depth = set_suite_depth(dep) + 1
- if new_depth > suite.depth:
- suite.depth = new_depth
- return suite.depth
- for index, suite in enumerate(suites):
- set_suite_depth(suite)
- suite.index = index
- suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index)))
- return testloader.suiteClass(suites)
-
-_buffer = ""
-
-def custom_verbose(msg, *args, **kwargs):
- global _buffer
- if msg[-1] != "\n":
- _buffer += msg
- else:
- _buffer += msg
- try:
- bb.plain(_buffer.rstrip("\n"), *args, **kwargs)
- except NameError:
- logger.info(_buffer.rstrip("\n"), *args, **kwargs)
- _buffer = ""
-
-def runTests(tc, type="runtime"):
-
- suite = loadTests(tc, type)
- logger.info("Test modules %s" % tc.testslist)
- if hasattr(tc, "tagexp") and tc.tagexp:
- logger.info("Filter test cases by tags: %s" % tc.tagexp)
- logger.info("Found %s tests" % suite.countTestCases())
- runner = unittest.TextTestRunner(verbosity=2)
- try:
- if bb.msg.loggerDefaultVerbose:
- runner.stream.write = custom_verbose
- except NameError:
- # Not in bb environment?
- pass
- result = runner.run(suite)
-
- return result
-
@LogResults
class oeTest(unittest.TestCase):
@@ -222,7 +130,19 @@ class oeSDKTest(oeTest):
return False
def _run(self, cmd):
- return subprocess.check_output(". %s; " % self.tc.sdkenv + cmd, shell=True)
+ return subprocess.check_output(". %s > /dev/null; %s;" % (self.tc.sdkenv, cmd), shell=True)
+
+class oeSDKExtTest(oeSDKTest):
+ def _run(self, cmd):
+ # extensible sdk shows a warning if found bitbake in the path
+ # because can cause contamination, i.e. use devtool from
+ # poky/scripts instead of eSDK one.
+ env = os.environ.copy()
+ paths_to_avoid = ['bitbake/bin', 'poky/scripts']
+ env['PATH'] = avoid_paths_in_environ(paths_to_avoid)
+
+ return subprocess.check_output(". %s > /dev/null;"\
+ " %s;" % (self.tc.sdkenv, cmd), shell=True, env=env)
def getmodule(pos=2):
# stack returns a list of tuples containg frame information
@@ -250,3 +170,278 @@ def skipModuleUnless(cond, reason):
if not cond:
skipModule(reason, 3)
+
+_buffer_logger = ""
+def custom_verbose(msg, *args, **kwargs):
+ global _buffer_logger
+ if msg[-1] != "\n":
+ _buffer_logger += msg
+ else:
+ _buffer_logger += msg
+ try:
+ bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs)
+ except NameError:
+ logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs)
+ _buffer_logger = ""
+
+class TestContext(object):
+ def __init__(self, d):
+ self.d = d
+
+ self.testsuites = self._get_test_suites()
+ self.testslist = self._get_tests_list(d.getVar("BBPATH", True).split(':'))
+ self.testsrequired = self._get_test_suites_required()
+
+ self.filesdir = os.path.join(os.path.dirname(os.path.abspath(
+ oeqa.runtime.__file__)), "files")
+ self.imagefeatures = d.getVar("IMAGE_FEATURES", True).split()
+ self.distrofeatures = d.getVar("DISTRO_FEATURES", True).split()
+
+ # get testcase list from specified file
+ # if path is a relative path, then relative to build/conf/
+ def _read_testlist(self, fpath, builddir):
+ if not os.path.isabs(fpath):
+ fpath = os.path.join(builddir, "conf", fpath)
+ if not os.path.exists(fpath):
+ bb.fatal("No such manifest file: ", fpath)
+ tcs = []
+ for line in open(fpath).readlines():
+ line = line.strip()
+ if line and not line.startswith("#"):
+ tcs.append(line)
+ return " ".join(tcs)
+
+ # return test list by type also filter if TEST_SUITES is specified
+ def _get_tests_list(self, bbpath):
+ testslist = []
+
+ type = self._get_test_namespace()
+
+ # This relies on lib/ under each directory in BBPATH being added to sys.path
+ # (as done by default in base.bbclass)
+ for testname in self.testsuites:
+ if testname != "auto":
+ if testname.startswith("oeqa."):
+ testslist.append(testname)
+ continue
+ found = False
+ for p in bbpath:
+ if os.path.exists(os.path.join(p, 'lib', 'oeqa', type, testname + '.py')):
+ testslist.append("oeqa." + type + "." + testname)
+ found = True
+ break
+ elif os.path.exists(os.path.join(p, 'lib', 'oeqa', type, testname.split(".")[0] + '.py')):
+ testslist.append("oeqa." + type + "." + testname)
+ found = True
+ break
+ if not found:
+ bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname)
+
+ if "auto" in self.testsuites:
+ def add_auto_list(path):
+ if not os.path.exists(os.path.join(path, '__init__.py')):
+ bb.fatal('Tests directory %s exists but is missing __init__.py' % path)
+ files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')])
+ for f in files:
+ module = 'oeqa.' + type + '.' + f[:-3]
+ if module not in testslist:
+ testslist.append(module)
+
+ for p in bbpath:
+ testpath = os.path.join(p, 'lib', 'oeqa', type)
+ bb.debug(2, 'Searching for tests in %s' % testpath)
+ if os.path.exists(testpath):
+ add_auto_list(testpath)
+
+ return testslist
+
+ def loadTests(self):
+ setattr(oeTest, "tc", self)
+
+ testloader = unittest.TestLoader()
+ testloader.sortTestMethodsUsing = None
+ suites = [testloader.loadTestsFromName(name) for name in self.testslist]
+ suites = filterByTagExp(suites, getattr(self, "tagexp", None))
+
+ def getTests(test):
+ '''Return all individual tests executed when running the suite.'''
+ # Unfortunately unittest does not have an API for this, so we have
+ # to rely on implementation details. This only needs to work
+ # for TestSuite containing TestCase.
+ method = getattr(test, '_testMethodName', None)
+ if method:
+ # leaf case: a TestCase
+ yield test
+ else:
+ # Look into TestSuite.
+ tests = getattr(test, '_tests', [])
+ for t1 in tests:
+ for t2 in getTests(t1):
+ yield t2
+
+ # Determine dependencies between suites by looking for @skipUnlessPassed
+ # method annotations. Suite A depends on suite B if any method in A
+ # depends on a method on B.
+ for suite in suites:
+ suite.dependencies = []
+ suite.depth = 0
+ for test in getTests(suite):
+ methodname = getattr(test, '_testMethodName', None)
+ if methodname:
+ method = getattr(test, methodname)
+ depends_on = getattr(method, '_depends_on', None)
+ if depends_on:
+ for dep_suite in suites:
+ if depends_on in [getattr(t, '_testMethodName', None) for t in getTests(dep_suite)]:
+ if dep_suite not in suite.dependencies and \
+ dep_suite is not suite:
+ suite.dependencies.append(dep_suite)
+ break
+ else:
+ logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
+ (test, depends_on))
+
+ # Use brute-force topological sort to determine ordering. Sort by
+ # depth (higher depth = must run later), with original ordering to
+ # break ties.
+ def set_suite_depth(suite):
+ for dep in suite.dependencies:
+ new_depth = set_suite_depth(dep) + 1
+ if new_depth > suite.depth:
+ suite.depth = new_depth
+ return suite.depth
+
+ for index, suite in enumerate(suites):
+ set_suite_depth(suite)
+ suite.index = index
+ suites.sort(cmp=lambda a,b: cmp((a.depth, a.index), (b.depth, b.index)))
+
+ self.suite = testloader.suiteClass(suites)
+
+ return self.suite
+
+ def runTests(self):
+ logger.info("Test modules %s" % self.testslist)
+ if hasattr(self, "tagexp") and self.tagexp:
+ logger.info("Filter test cases by tags: %s" % self.tagexp)
+ logger.info("Found %s tests" % self.suite.countTestCases())
+ runner = unittest.TextTestRunner(verbosity=2)
+ if 'bb' in sys.modules:
+ runner.stream.write = custom_verbose
+
+ return runner.run(self.suite)
+
+class ImageTestContext(TestContext):
+ def __init__(self, d, target, host_dumper):
+ super(ImageTestContext, self).__init__(d)
+
+ self.tagexp = d.getVar("TEST_SUITES_TAGS", True)
+
+ self.target = target
+ self.host_dumper = host_dumper
+
+ manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE", True),
+ d.getVar("IMAGE_LINK_NAME", True) + ".manifest")
+ nomanifest = d.getVar("IMAGE_NO_MANIFEST", True)
+ if nomanifest is None or nomanifest != "1":
+ try:
+ with open(manifest) as f:
+ self.pkgmanifest = f.read()
+ except IOError as e:
+ bb.fatal("No package manifest file found. Did you build the image?\n%s" % e)
+ else:
+ self.pkgmanifest = ""
+
+ self.sigterm = False
+ self.origsigtermhandler = signal.getsignal(signal.SIGTERM)
+ signal.signal(signal.SIGTERM, self._sigterm_exception)
+
+ def _sigterm_exception(self, signum, stackframe):
+ bb.warn("TestImage received SIGTERM, shutting down...")
+ self.sigterm = True
+ self.target.stop()
+
+ def _get_test_namespace(self):
+ return "runtime"
+
+ def _get_test_suites(self):
+ testsuites = []
+
+ manifests = (self.d.getVar("TEST_SUITES_MANIFEST", True) or '').split()
+ if manifests:
+ for manifest in manifests:
+ testsuites.extend(self._read_testlist(manifest,
+ self.d.getVar("TOPDIR", True)).split())
+
+ else:
+ testsuites = self.d.getVar("TEST_SUITES", True).split()
+
+ return testsuites
+
+ def _get_test_suites_required(self):
+ return [t for t in self.d.getVar("TEST_SUITES", True).split() if t != "auto"]
+
+ def loadTests(self):
+ super(ImageTestContext, self).loadTests()
+ setattr(oeRuntimeTest, "pscmd", "ps -ef" if oeTest.hasPackage("procps") else "ps")
+
+class SDKTestContext(TestContext):
+ def __init__(self, d, sdktestdir, sdkenv, tcname, *args):
+ super(SDKTestContext, self).__init__(d)
+
+ self.sdktestdir = sdktestdir
+ self.sdkenv = sdkenv
+ self.tcname = tcname
+
+ if not hasattr(self, 'target_manifest'):
+ self.target_manifest = d.getVar("SDK_TARGET_MANIFEST", True)
+ try:
+ with open(self.target_manifest) as f:
+ self.pkgmanifest = f.read()
+ except IOError as e:
+ bb.fatal("No package manifest file found. Did you build the sdk image?\n%s" % e)
+
+ if not hasattr(self, 'host_manifest'):
+ self.host_manifest = d.getVar("SDK_HOST_MANIFEST", True)
+ try:
+ with open(self.host_manifest) as f:
+ self.hostpkgmanifest = f.read()
+ except IOError as e:
+ bb.fatal("No host package manifest file found. Did you build the sdk image?\n%s" % e)
+
+ def _get_test_namespace(self):
+ return "sdk"
+
+ def _get_test_suites(self):
+ return (self.d.getVar("TEST_SUITES_SDK", True) or "auto").split()
+
+ def _get_test_suites_required(self):
+ return [t for t in (self.d.getVar("TEST_SUITES_SDK", True) or \
+ "auto").split() if t != "auto"]
+
+class SDKExtTestContext(SDKTestContext):
+ def __init__(self, d, sdktestdir, sdkenv, tcname, *args):
+ self.target_manifest = d.getVar("SDK_EXT_TARGET_MANIFEST", True)
+ self.host_manifest = d.getVar("SDK_EXT_HOST_MANIFEST", True)
+ if args:
+ self.cm = args[0] # Compatibility mode for run SDK tests
+ else:
+ self.cm = False
+
+ super(SDKExtTestContext, self).__init__(d, sdktestdir, sdkenv, tcname)
+
+ self.sdkextfilesdir = os.path.join(os.path.dirname(os.path.abspath(
+ oeqa.sdkext.__file__)), "files")
+
+ def _get_test_namespace(self):
+ if self.cm:
+ return "sdk"
+ else:
+ return "sdkext"
+
+ def _get_test_suites(self):
+ return (self.d.getVar("TEST_SUITES_SDK_EXT", True) or "auto").split()
+
+ def _get_test_suites_required(self):
+ return [t for t in (self.d.getVar("TEST_SUITES_SDK_EXT", True) or \
+ "auto").split() if t != "auto"]