diff options
author | Brad Bishop <bradleyb@fuzziesquirrel.com> | 2018-02-01 18:27:11 +0300 |
---|---|---|
committer | Brad Bishop <bradleyb@fuzziesquirrel.com> | 2018-03-13 05:51:39 +0300 |
commit | 6e60e8b2b2bab889379b380a28a167a0edd9d1d3 (patch) | |
tree | f12f54d5ba8e74e67e5fad3651a1e125bb8f4191 /import-layers/yocto-poky/meta/lib/oeqa/core | |
parent | 509842add85b53e13164c1569a1fd43d5b8d91c5 (diff) | |
download | openbmc-6e60e8b2b2bab889379b380a28a167a0edd9d1d3.tar.xz |
Yocto 2.3
Move OpenBMC to Yocto 2.3(pyro).
Tested: Built and verified Witherspoon and Palmetto images
Change-Id: I50744030e771f4850afc2a93a10d3507e76d36bc
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
Resolves: openbmc/openbmc#2461
Diffstat (limited to 'import-layers/yocto-poky/meta/lib/oeqa/core')
36 files changed, 2019 insertions, 0 deletions
diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/README b/import-layers/yocto-poky/meta/lib/oeqa/core/README new file mode 100644 index 000000000..0c859fd78 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/README @@ -0,0 +1,38 @@ += OEQA Framework = + +== Introduction == + +This is the new OEQA framework the base clases of the framework +are in this module oeqa/core the subsequent components needs to +extend this classes. + +A new/unique runner was created called oe-test and is under scripts/ +oe-test, this new runner scans over oeqa module searching for test +components that supports OETestContextExecutor implemented in context +module (i.e. oeqa/core/context.py). + +For execute an example: + +$ source oe-init-build-env +$ oe-test core + +For list supported components: + +$ oe-test -h + +== Create new Test component == + +Usally for add a new Test component the developer needs to extend +OETestContext/OETestContextExecutor in context.py and OETestCase in +case.py. + +== How to run the testing of the OEQA framework == + +Run all tests: + +$ PATH=$PATH:../../ python3 -m unittest discover -s tests + +Run some test: + +$ cd tests/ +$ ./test_data.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/__init__.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/case.py b/import-layers/yocto-poky/meta/lib/oeqa/core/case.py new file mode 100644 index 000000000..d2dbf20f9 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/case.py @@ -0,0 +1,46 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import unittest + +from oeqa.core.exception import OEQAMissingVariable + +def _validate_td_vars(td, td_vars, type_msg): + if td_vars: + for v in td_vars: + if not v in td: + raise OEQAMissingVariable("Test %s need %s variable but"\ + " isn't into td" % (type_msg, v)) + +class OETestCase(unittest.TestCase): + # TestContext and Logger instance set by OETestLoader. + tc = None + logger = None + + # td has all the variables needed by the test cases + # is the same across all the test cases. + td = None + + # td_vars has the variables needed by a test class + # or test case instance, if some var isn't into td a + # OEMissingVariable exception is raised + td_vars = None + + @classmethod + def _oeSetUpClass(clss): + _validate_td_vars(clss.td, clss.td_vars, "class") + clss.setUpClassMethod() + + @classmethod + def _oeTearDownClass(clss): + clss.tearDownClassMethod() + + def _oeSetUp(self): + for d in self.decorators: + d.setUpDecorator() + self.setUpMethod() + + def _oeTearDown(self): + for d in self.decorators: + d.tearDownDecorator() + self.tearDownMethod() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/__init__.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json new file mode 100644 index 000000000..21d6b16d1 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/data.json @@ -0,0 +1 @@ +{"ARCH": "x86", "IMAGE": "core-image-minimal"}
\ No newline at end of file diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py new file mode 100644 index 000000000..11cf3800c --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/cases/example/test_basic.py @@ -0,0 +1,20 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.depends import OETestDepends + +class OETestExample(OETestCase): + def test_example(self): + self.logger.info('IMAGE: %s' % self.td.get('IMAGE')) + self.assertEqual('core-image-minimal', self.td.get('IMAGE')) + self.logger.info('ARCH: %s' % self.td.get('ARCH')) + self.assertEqual('x86', self.td.get('ARCH')) + +class OETestExampleDepend(OETestCase): + @OETestDepends(['OETestExample.test_example']) + def test_example_depends(self): + pass + + def test_example_no_depends(self): + pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/context.py b/import-layers/yocto-poky/meta/lib/oeqa/core/context.py new file mode 100644 index 000000000..4476750a3 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/context.py @@ -0,0 +1,243 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import sys +import json +import time +import logging +import collections +import re + +from oeqa.core.loader import OETestLoader +from oeqa.core.runner import OETestRunner, OEStreamLogger, xmlEnabled + +class OETestContext(object): + loaderClass = OETestLoader + runnerClass = OETestRunner + streamLoggerClass = OEStreamLogger + + files_dir = os.path.abspath(os.path.join(os.path.dirname( + os.path.abspath(__file__)), "../files")) + + def __init__(self, td=None, logger=None): + if not type(td) is dict: + raise TypeError("td isn't dictionary type") + + self.td = td + self.logger = logger + self._registry = {} + self._registry['cases'] = collections.OrderedDict() + self._results = {} + + def _read_modules_from_manifest(self, manifest): + if not os.path.exists(manifest): + raise + + modules = [] + for line in open(manifest).readlines(): + line = line.strip() + if line and not line.startswith("#"): + modules.append(line) + + return modules + + def loadTests(self, module_paths, modules=[], tests=[], + modules_manifest="", modules_required=[], filters={}): + if modules_manifest: + modules = self._read_modules_from_manifest(modules_manifest) + + self.loader = self.loaderClass(self, module_paths, modules, tests, + modules_required, filters) + self.suites = self.loader.discover() + + def runTests(self): + streamLogger = self.streamLoggerClass(self.logger) + self.runner = self.runnerClass(self, stream=streamLogger, verbosity=2) + + self._run_start_time = time.time() + result = self.runner.run(self.suites) + self._run_end_time = time.time() + + return result + + def logSummary(self, result, component, context_msg=''): + self.logger.info("SUMMARY:") + self.logger.info("%s (%s) - Ran %d test%s in %.3fs" % (component, + context_msg, result.testsRun, result.testsRun != 1 and "s" or "", + (self._run_end_time - self._run_start_time))) + + if result.wasSuccessful(): + msg = "%s - OK - All required tests passed" % component + else: + msg = "%s - FAIL - Required tests failed" % component + skipped = len(self._results['skipped']) + if skipped: + msg += " (skipped=%d)" % skipped + self.logger.info(msg) + + def _getDetailsNotPassed(self, case, type, desc): + found = False + + for (scase, msg) in self._results[type]: + # XXX: When XML reporting is enabled scase is + # xmlrunner.result._TestInfo instance instead of + # string. + if xmlEnabled: + if case.id() == scase.test_id: + found = True + break + scase_str = scase.test_id + else: + if case == scase: + found = True + break + scase_str = str(scase) + + # When fails at module or class level the class name is passed as string + # so figure out to see if match + m = re.search("^setUpModule \((?P<module_name>.*)\)$", scase_str) + if m: + if case.__class__.__module__ == m.group('module_name'): + found = True + break + + m = re.search("^setUpClass \((?P<class_name>.*)\)$", scase_str) + if m: + class_name = "%s.%s" % (case.__class__.__module__, + case.__class__.__name__) + + if class_name == m.group('class_name'): + found = True + break + + if found: + return (found, msg) + + return (found, None) + + def logDetails(self): + self.logger.info("RESULTS:") + for case_name in self._registry['cases']: + case = self._registry['cases'][case_name] + + result_types = ['failures', 'errors', 'skipped', 'expectedFailures'] + result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL'] + + fail = False + desc = None + for idx, name in enumerate(result_types): + (fail, msg) = self._getDetailsNotPassed(case, result_types[idx], + result_desc[idx]) + if fail: + desc = result_desc[idx] + break + + oeid = -1 + for d in case.decorators: + if hasattr(d, 'oeid'): + oeid = d.oeid + + if fail: + self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), + oeid, desc)) + if msg: + self.logger.info(msg) + else: + self.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), + oeid, 'PASSED')) + +class OETestContextExecutor(object): + _context_class = OETestContext + + name = 'core' + help = 'core test component example' + description = 'executes core test suite example' + + default_cases = [os.path.join(os.path.abspath(os.path.dirname(__file__)), + 'cases/example')] + default_test_data = os.path.join(default_cases[0], 'data.json') + default_tests = None + + def register_commands(self, logger, subparsers): + self.parser = subparsers.add_parser(self.name, help=self.help, + description=self.description, group='components') + + self.default_output_log = '%s-results-%s.log' % (self.name, + time.strftime("%Y%m%d%H%M%S")) + self.parser.add_argument('--output-log', action='store', + default=self.default_output_log, + help="results output log, default: %s" % self.default_output_log) + self.parser.add_argument('--run-tests', action='store', + default=self.default_tests, + help="tests to run in <module>[.<class>[.<name>]] format. Just works for modules now") + + if self.default_test_data: + self.parser.add_argument('--test-data-file', action='store', + default=self.default_test_data, + help="data file to load, default: %s" % self.default_test_data) + else: + self.parser.add_argument('--test-data-file', action='store', + help="data file to load") + + if self.default_cases: + self.parser.add_argument('CASES_PATHS', action='store', + default=self.default_cases, nargs='*', + help="paths to directories with test cases, default: %s"\ + % self.default_cases) + else: + self.parser.add_argument('CASES_PATHS', action='store', + nargs='+', help="paths to directories with test cases") + + self.parser.set_defaults(func=self.run) + + def _setup_logger(self, logger, args): + formatter = logging.Formatter('%(asctime)s - ' + self.name + \ + ' - %(levelname)s - %(message)s') + sh = logger.handlers[0] + sh.setFormatter(formatter) + fh = logging.FileHandler(args.output_log) + fh.setFormatter(formatter) + logger.addHandler(fh) + + return logger + + def _process_args(self, logger, args): + self.tc_kwargs = {} + self.tc_kwargs['init'] = {} + self.tc_kwargs['load'] = {} + self.tc_kwargs['run'] = {} + + self.tc_kwargs['init']['logger'] = self._setup_logger(logger, args) + if args.test_data_file: + self.tc_kwargs['init']['td'] = json.load( + open(args.test_data_file, "r")) + else: + self.tc_kwargs['init']['td'] = {} + + + if args.run_tests: + self.tc_kwargs['load']['modules'] = args.run_tests.split() + else: + self.tc_kwargs['load']['modules'] = None + + self.module_paths = args.CASES_PATHS + + def run(self, logger, args): + self._process_args(logger, args) + + self.tc = self._context_class(**self.tc_kwargs['init']) + self.tc.loadTests(self.module_paths, **self.tc_kwargs['load']) + rc = self.tc.runTests(**self.tc_kwargs['run']) + self.tc.logSummary(rc, self.name) + self.tc.logDetails() + + output_link = os.path.join(os.path.dirname(args.output_log), + "%s-results.log" % self.name) + if os.path.exists(output_link): + os.remove(output_link) + os.symlink(args.output_log, output_link) + + return rc + +_executor_class = OETestContextExecutor diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py new file mode 100644 index 000000000..855b6b9d2 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/__init__.py @@ -0,0 +1,71 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from functools import wraps +from abc import abstractmethod + +decoratorClasses = set() + +def registerDecorator(obj): + decoratorClasses.add(obj) + return obj + +class OETestDecorator(object): + case = None # Reference of OETestCase decorated + attrs = None # Attributes to be loaded by decorator implementation + + def __init__(self, *args, **kwargs): + if not self.attrs: + return + + for idx, attr in enumerate(self.attrs): + if attr in kwargs: + value = kwargs[attr] + else: + value = args[idx] + setattr(self, attr, value) + + def __call__(self, func): + @wraps(func) + def wrapped_f(*args, **kwargs): + self.attrs = self.attrs # XXX: Enables OETestLoader discover + return func(*args, **kwargs) + return wrapped_f + + # OETestLoader call it when is loading test cases. + # XXX: Most methods would change the registry for later + # processing; be aware that filtrate method needs to + # run later than bind, so there could be data (in the + # registry) of a cases that were filtered. + def bind(self, registry, case): + self.case = case + self.logger = case.tc.logger + self.case.decorators.append(self) + + # OETestRunner call this method when tries to run + # the test case. + def setUpDecorator(self): + pass + + # OETestRunner call it after a test method has been + # called even if the method raised an exception. + def tearDownDecorator(self): + pass + +class OETestDiscover(OETestDecorator): + + # OETestLoader call it after discover test cases + # needs to return the cases to be run. + @staticmethod + def discover(registry): + return registry['cases'] + +class OETestFilter(OETestDecorator): + + # OETestLoader call it while loading the tests + # in loadTestsFromTestCase method, it needs to + # return a bool, True if needs to be filtered. + # This method must consume the filter used. + @abstractmethod + def filtrate(self, filters): + return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py new file mode 100644 index 000000000..ff7bdd98b --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/data.py @@ -0,0 +1,98 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.exception import OEQAMissingVariable + +from . import OETestDecorator, registerDecorator + +def has_feature(td, feature): + """ + Checks for feature in DISTRO_FEATURES or IMAGE_FEATURES. + """ + + if (feature in td.get('DISTRO_FEATURES', '') or + feature in td.get('IMAGE_FEATURES', '')): + return True + return False + +@registerDecorator +class skipIfDataVar(OETestDecorator): + """ + Skip test based on value of a data store's variable. + + It will get the info of var from the data store and will + check it against value; if are equal it will skip the test + with msg as the reason. + """ + + attrs = ('var', 'value', 'msg') + + def setUpDecorator(self): + msg = ('Checking if %r value is %r to skip test' % + (self.var, self.value)) + self.logger.debug(msg) + if self.case.td.get(self.var) == self.value: + self.case.skipTest(self.msg) + +@registerDecorator +class skipIfNotDataVar(OETestDecorator): + """ + Skip test based on value of a data store's variable. + + It will get the info of var from the data store and will + check it against value; if are not equal it will skip the + test with msg as the reason. + """ + + attrs = ('var', 'value', 'msg') + + def setUpDecorator(self): + msg = ('Checking if %r value is not %r to skip test' % + (self.var, self.value)) + self.logger.debug(msg) + if not self.case.td.get(self.var) == self.value: + self.case.skipTest(self.msg) + +@registerDecorator +class skipIfNotInDataVar(OETestDecorator): + """ + Skip test if value is not in data store's variable. + """ + + attrs = ('var', 'value', 'msg') + def setUpDecorator(self): + msg = ('Checking if %r value is in %r to run ' + 'the test' % (self.var, self.value)) + self.logger.debug(msg) + if not self.value in self.case.td.get(self.var): + self.case.skipTest(self.msg) + +@registerDecorator +class OETestDataDepends(OETestDecorator): + attrs = ('td_depends',) + + def setUpDecorator(self): + for v in self.td_depends: + try: + value = self.case.td[v] + except KeyError: + raise OEQAMissingVariable("Test case need %s variable but"\ + " isn't into td" % v) + +@registerDecorator +class skipIfNotFeature(OETestDecorator): + """ + Skip test based on DISTRO_FEATURES. + + value must be in distro features or it will skip the test + with msg as the reason. + """ + + attrs = ('value', 'msg') + + def setUpDecorator(self): + msg = ('Checking if %s is in DISTRO_FEATURES ' + 'or IMAGE_FEATURES' % (self.value)) + self.logger.debug(msg) + if not has_feature(self.case.td, self.value): + self.case.skipTest(self.msg) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py new file mode 100644 index 000000000..195711cf1 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/depends.py @@ -0,0 +1,94 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from unittest import SkipTest + +from oeqa.core.exception import OEQADependency + +from . import OETestDiscover, registerDecorator + +def _add_depends(registry, case, depends): + module_name = case.__module__ + class_name = case.__class__.__name__ + + case_id = case.id() + + for depend in depends: + dparts = depend.split('.') + + if len(dparts) == 1: + depend_id = ".".join((module_name, class_name, dparts[0])) + elif len(dparts) == 2: + depend_id = ".".join((module_name, dparts[0], dparts[1])) + else: + depend_id = depend + + if not case_id in registry: + registry[case_id] = [] + if not depend_id in registry[case_id]: + registry[case_id].append(depend_id) + +def _validate_test_case_depends(cases, depends): + for case in depends: + if not case in cases: + continue + for dep in depends[case]: + if not dep in cases: + raise OEQADependency("TestCase %s depends on %s and isn't available"\ + ", cases available %s." % (case, dep, str(cases.keys()))) + +def _order_test_case_by_depends(cases, depends): + def _dep_resolve(graph, node, resolved, seen): + seen.append(node) + for edge in graph[node]: + if edge not in resolved: + if edge in seen: + raise OEQADependency("Test cases %s and %s have a circular" \ + " dependency." % (node, edge)) + _dep_resolve(graph, edge, resolved, seen) + resolved.append(node) + + dep_graph = {} + dep_graph['__root__'] = cases.keys() + for case in cases: + if case in depends: + dep_graph[case] = depends[case] + else: + dep_graph[case] = [] + + cases_ordered = [] + _dep_resolve(dep_graph, '__root__', cases_ordered, []) + cases_ordered.remove('__root__') + + return [cases[case_id] for case_id in cases_ordered] + +def _skipTestDependency(case, depends): + results = case.tc._results + skipReasons = ['errors', 'failures', 'skipped'] + + for reason in skipReasons: + for test, _ in results[reason]: + if test.id() in depends: + raise SkipTest("Test case %s depends on %s and was in %s." \ + % (case.id(), test.id(), reason)) + +@registerDecorator +class OETestDepends(OETestDiscover): + attrs = ('depends',) + + def bind(self, registry, case): + super(OETestDepends, self).bind(registry, case) + if not registry.get('depends'): + registry['depends'] = {} + _add_depends(registry['depends'], case, self.depends) + + @staticmethod + def discover(registry): + if registry.get('depends'): + _validate_test_case_depends(registry['cases'], registry['depends']) + return _order_test_case_by_depends(registry['cases'], registry['depends']) + else: + return [registry['cases'][case_id] for case_id in registry['cases']] + + def setUpDecorator(self): + _skipTestDependency(self.case, self.depends) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py new file mode 100644 index 000000000..ea8017a55 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oeid.py @@ -0,0 +1,23 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from . import OETestFilter, registerDecorator +from oeqa.core.utils.misc import intToList + +def _idFilter(oeid, filters): + return False if oeid in filters else True + +@registerDecorator +class OETestID(OETestFilter): + attrs = ('oeid',) + + def bind(self, registry, case): + super(OETestID, self).bind(registry, case) + + def filtrate(self, filters): + if filters.get('oeid'): + filterx = intToList(filters['oeid'], 'oeid') + del filters['oeid'] + if _idFilter(self.oeid, filterx): + return True + return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py new file mode 100644 index 000000000..ad38ab78a --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetag.py @@ -0,0 +1,24 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from . import OETestFilter, registerDecorator +from oeqa.core.utils.misc import strToList + +def _tagFilter(tags, filters): + return False if set(tags) & set(filters) else True + +@registerDecorator +class OETestTag(OETestFilter): + attrs = ('oetag',) + + def bind(self, registry, case): + super(OETestTag, self).bind(registry, case) + self.oetag = strToList(self.oetag, 'oetag') + + def filtrate(self, filters): + if filters.get('oetag'): + filterx = strToList(filters['oetag'], 'oetag') + del filters['oetag'] + if _tagFilter(self.oetag, filterx): + return True + return False diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py new file mode 100644 index 000000000..a247583f7 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/decorator/oetimeout.py @@ -0,0 +1,25 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import signal +from . import OETestDecorator, registerDecorator +from oeqa.core.exception import OEQATimeoutError + +@registerDecorator +class OETimeout(OETestDecorator): + attrs = ('oetimeout',) + + def setUpDecorator(self): + timeout = self.oetimeout + def _timeoutHandler(signum, frame): + raise OEQATimeoutError("Timed out after %s " + "seconds of execution" % timeout) + + self.logger.debug("Setting up a %d second(s) timeout" % self.oetimeout) + self.alarmSignal = signal.signal(signal.SIGALRM, _timeoutHandler) + signal.alarm(self.oetimeout) + + def tearDownDecorator(self): + signal.alarm(0) + signal.signal(signal.SIGALRM, self.alarmSignal) + self.logger.debug("Removed SIGALRM handler") diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py b/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py new file mode 100644 index 000000000..2dfd8402c --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/exception.py @@ -0,0 +1,14 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +class OEQAException(Exception): + pass + +class OEQATimeoutError(OEQAException): + pass + +class OEQAMissingVariable(OEQAException): + pass + +class OEQADependency(OEQAException): + pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py b/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py new file mode 100644 index 000000000..63a170353 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/loader.py @@ -0,0 +1,272 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import sys +import unittest + +from oeqa.core.utils.path import findFile +from oeqa.core.utils.test import getSuiteModules, getCaseID + +from oeqa.core.case import OETestCase +from oeqa.core.decorator import decoratorClasses, OETestDecorator, \ + OETestFilter, OETestDiscover + +def _make_failed_test(classname, methodname, exception, suiteClass): + """ + When loading tests unittest framework stores the exception in a new + class created for be displayed into run(). + + For our purposes will be better to raise the exception in loading + step instead of wait to run the test suite. + """ + raise exception +unittest.loader._make_failed_test = _make_failed_test + +def _find_duplicated_modules(suite, directory): + for module in getSuiteModules(suite): + path = findFile('%s.py' % module, directory) + if path: + raise ImportError("Duplicated %s module found in %s" % (module, path)) + +class OETestLoader(unittest.TestLoader): + caseClass = OETestCase + + kwargs_names = ['testMethodPrefix', 'sortTestMethodUsing', 'suiteClass', + '_top_level_dir'] + + def __init__(self, tc, module_paths, modules, tests, modules_required, + filters, *args, **kwargs): + self.tc = tc + + self.modules = modules + self.tests = tests + self.modules_required = modules_required + + self.filters = filters + self.decorator_filters = [d for d in decoratorClasses if \ + issubclass(d, OETestFilter)] + self._validateFilters(self.filters, self.decorator_filters) + self.used_filters = [d for d in self.decorator_filters + for f in self.filters + if f in d.attrs] + + if isinstance(module_paths, str): + module_paths = [module_paths] + elif not isinstance(module_paths, list): + raise TypeError('module_paths must be a str or a list of str') + self.module_paths = module_paths + + for kwname in self.kwargs_names: + if kwname in kwargs: + setattr(self, kwname, kwargs[kwname]) + + self._patchCaseClass(self.caseClass) + + def _patchCaseClass(self, testCaseClass): + # Adds custom attributes to the OETestCase class + setattr(testCaseClass, 'tc', self.tc) + setattr(testCaseClass, 'td', self.tc.td) + setattr(testCaseClass, 'logger', self.tc.logger) + + def _validateFilters(self, filters, decorator_filters): + # Validate if filter isn't empty + for key,value in filters.items(): + if not value: + raise TypeError("Filter %s specified is empty" % key) + + # Validate unique attributes + attr_filters = [attr for clss in decorator_filters \ + for attr in clss.attrs] + dup_attr = [attr for attr in attr_filters + if attr_filters.count(attr) > 1] + if dup_attr: + raise TypeError('Detected duplicated attribute(s) %s in filter' + ' decorators' % ' ,'.join(dup_attr)) + + # Validate if filter is supported + for f in filters: + if f not in attr_filters: + classes = ', '.join([d.__name__ for d in decorator_filters]) + raise TypeError('Found "%s" filter but not declared in any of ' + '%s decorators' % (f, classes)) + + def _registerTestCase(self, case): + case_id = case.id() + self.tc._registry['cases'][case_id] = case + + def _handleTestCaseDecorators(self, case): + def _handle(obj): + if isinstance(obj, OETestDecorator): + if not obj.__class__ in decoratorClasses: + raise Exception("Decorator %s isn't registered" \ + " in decoratorClasses." % obj.__name__) + obj.bind(self.tc._registry, case) + + def _walk_closure(obj): + if hasattr(obj, '__closure__') and obj.__closure__: + for f in obj.__closure__: + obj = f.cell_contents + _handle(obj) + _walk_closure(obj) + method = getattr(case, case._testMethodName, None) + _walk_closure(method) + + def _filterTest(self, case): + """ + Returns True if test case must be filtered, False otherwise. + """ + if self.filters: + filters = self.filters.copy() + case_decorators = [cd for cd in case.decorators + if cd.__class__ in self.used_filters] + + # Iterate over case decorators to check if needs to be filtered. + for cd in case_decorators: + if cd.filtrate(filters): + return True + + # Case is missing one or more decorators for all the filters + # being used, so filter test case. + if filters: + return True + + return False + + def _getTestCase(self, testCaseClass, tcName): + if not hasattr(testCaseClass, '__oeqa_loader'): + # In order to support data_vars validation + # monkey patch the default setUp/tearDown{Class} to use + # the ones provided by OETestCase + setattr(testCaseClass, 'setUpClassMethod', + getattr(testCaseClass, 'setUpClass')) + setattr(testCaseClass, 'tearDownClassMethod', + getattr(testCaseClass, 'tearDownClass')) + setattr(testCaseClass, 'setUpClass', + testCaseClass._oeSetUpClass) + setattr(testCaseClass, 'tearDownClass', + testCaseClass._oeTearDownClass) + + # In order to support decorators initialization + # monkey patch the default setUp/tearDown to use + # a setUpDecorators/tearDownDecorators that methods + # will call setUp/tearDown original methods. + setattr(testCaseClass, 'setUpMethod', + getattr(testCaseClass, 'setUp')) + setattr(testCaseClass, 'tearDownMethod', + getattr(testCaseClass, 'tearDown')) + setattr(testCaseClass, 'setUp', testCaseClass._oeSetUp) + setattr(testCaseClass, 'tearDown', testCaseClass._oeTearDown) + + setattr(testCaseClass, '__oeqa_loader', True) + + case = testCaseClass(tcName) + setattr(case, 'decorators', []) + + return case + + def loadTestsFromTestCase(self, testCaseClass): + """ + Returns a suite of all tests cases contained in testCaseClass. + """ + if issubclass(testCaseClass, unittest.suite.TestSuite): + raise TypeError("Test cases should not be derived from TestSuite." \ + " Maybe you meant to derive %s from TestCase?" \ + % testCaseClass.__name__) + if not issubclass(testCaseClass, self.caseClass): + raise TypeError("Test %s is not derived from %s" % \ + (testCaseClass.__name__, self.caseClass.__name__)) + + testCaseNames = self.getTestCaseNames(testCaseClass) + if not testCaseNames and hasattr(testCaseClass, 'runTest'): + testCaseNames = ['runTest'] + + suite = [] + for tcName in testCaseNames: + case = self._getTestCase(testCaseClass, tcName) + # Filer by case id + if not (self.tests and not 'all' in self.tests + and not getCaseID(case) in self.tests): + self._handleTestCaseDecorators(case) + + # Filter by decorators + if not self._filterTest(case): + self._registerTestCase(case) + suite.append(case) + + return self.suiteClass(suite) + + def discover(self): + big_suite = self.suiteClass() + for path in self.module_paths: + _find_duplicated_modules(big_suite, path) + suite = super(OETestLoader, self).discover(path, + pattern='*.py', top_level_dir=path) + big_suite.addTests(suite) + + cases = None + discover_classes = [clss for clss in decoratorClasses + if issubclass(clss, OETestDiscover)] + for clss in discover_classes: + cases = clss.discover(self.tc._registry) + + return self.suiteClass(cases) if cases else big_suite + + # XXX After Python 3.5, remove backward compatibility hacks for + # use_load_tests deprecation via *args and **kws. See issue 16662. + if sys.version_info >= (3,5): + def loadTestsFromModule(self, module, *args, pattern=None, **kws): + """ + Returns a suite of all tests cases contained in module. + """ + if module.__name__ in sys.builtin_module_names: + msg = 'Tried to import %s test module but is a built-in' + raise ImportError(msg % module.__name__) + + # Normal test modules are loaded if no modules were specified, + # if module is in the specified module list or if 'all' is in + # module list. + # Underscore modules are loaded only if specified in module list. + load_module = True if not module.__name__.startswith('_') \ + and (not self.modules \ + or module.__name__ in self.modules \ + or 'all' in self.modules) \ + else False + + load_underscore = True if module.__name__.startswith('_') \ + and module.__name__ in self.modules \ + else False + + if load_module or load_underscore: + return super(OETestLoader, self).loadTestsFromModule( + module, *args, pattern=pattern, **kws) + else: + return self.suiteClass() + else: + def loadTestsFromModule(self, module, use_load_tests=True): + """ + Returns a suite of all tests cases contained in module. + """ + if module.__name__ in sys.builtin_module_names: + msg = 'Tried to import %s test module but is a built-in' + raise ImportError(msg % module.__name__) + + # Normal test modules are loaded if no modules were specified, + # if module is in the specified module list or if 'all' is in + # module list. + # Underscore modules are loaded only if specified in module list. + load_module = True if not module.__name__.startswith('_') \ + and (not self.modules \ + or module.__name__ in self.modules \ + or 'all' in self.modules) \ + else False + + load_underscore = True if module.__name__.startswith('_') \ + and module.__name__ in self.modules \ + else False + + if load_module or load_underscore: + return super(OETestLoader, self).loadTestsFromModule( + module, use_load_tests) + else: + return self.suiteClass() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py b/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py new file mode 100644 index 000000000..44ffecb0c --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/runner.py @@ -0,0 +1,76 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import time +import unittest +import logging + +xmlEnabled = False +try: + import xmlrunner + from xmlrunner.result import _XMLTestResult as _TestResult + from xmlrunner.runner import XMLTestRunner as _TestRunner + xmlEnabled = True +except ImportError: + # use the base runner instead + from unittest import TextTestResult as _TestResult + from unittest import TextTestRunner as _TestRunner + +class OEStreamLogger(object): + def __init__(self, logger): + self.logger = logger + self.buffer = "" + + def write(self, msg): + if len(msg) > 1 and msg[0] != '\n': + self.buffer += msg + else: + self.logger.log(logging.INFO, self.buffer.rstrip("\n")) + self.buffer = "" + + def flush(self): + for handler in self.logger.handlers: + handler.flush() + +class OETestResult(_TestResult): + def __init__(self, tc, *args, **kwargs): + super(OETestResult, self).__init__(*args, **kwargs) + + self.tc = tc + + self.tc._results['failures'] = self.failures + self.tc._results['errors'] = self.errors + self.tc._results['skipped'] = self.skipped + self.tc._results['expectedFailures'] = self.expectedFailures + + def startTest(self, test): + super(OETestResult, self).startTest(test) + +class OETestRunner(_TestRunner): + def __init__(self, tc, *args, **kwargs): + if xmlEnabled: + if not kwargs.get('output'): + kwargs['output'] = os.path.join(os.getcwd(), + 'TestResults_%s_%s' % (time.strftime("%Y%m%d%H%M%S"), os.getpid())) + + super(OETestRunner, self).__init__(*args, **kwargs) + self.tc = tc + self.resultclass = OETestResult + + # XXX: The unittest-xml-reporting package defines _make_result method instead + # of _makeResult standard on unittest. + if xmlEnabled: + def _make_result(self): + """ + Creates a TestResult object which will be used to store + information about the executed tests. + """ + # override in subclasses if necessary. + return self.resultclass(self.tc, + self.stream, self.descriptions, self.verbosity, self.elapsed_times + ) + else: + def _makeResult(self): + return self.resultclass(self.tc, self.stream, self.descriptions, + self.verbosity) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py new file mode 100644 index 000000000..d2468bc25 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/target/__init__.py @@ -0,0 +1,33 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from abc import abstractmethod + +class OETarget(object): + + def __init__(self, logger, *args, **kwargs): + self.logger = logger + + @abstractmethod + def start(self): + pass + + @abstractmethod + def stop(self): + pass + + @abstractmethod + def run(self, cmd, timeout=None): + pass + + @abstractmethod + def copyTo(self, localSrc, remoteDst): + pass + + @abstractmethod + def copyFrom(self, remoteSrc, localDst): + pass + + @abstractmethod + def copyDirTo(self, localSrc, remoteDst): + pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py new file mode 100644 index 000000000..2dc521c21 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/target/qemu.py @@ -0,0 +1,45 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import sys +import signal +import time + +from .ssh import OESSHTarget +from oeqa.utils.qemurunner import QemuRunner + +supported_fstypes = ['ext3', 'ext4', 'cpio.gz', 'wic', 'elf'] + +class OEQemuTarget(OESSHTarget): + def __init__(self, logger, ip, server_ip, timeout=300, user='root', + port=None, machine='', rootfs='', kernel='', kvm=False, + dump_dir='', dump_host_cmds='', display='', bootlog='', + tmpdir='', dir_image='', boottime=60, **kwargs): + + super(OEQemuTarget, self).__init__(logger, ip, server_ip, timeout, + user, port) + + self.ip = ip + self.server_ip = server_ip + self.machine = machine + self.rootfs = rootfs + self.kernel = kernel + self.kvm = kvm + + self.runner = QemuRunner(machine=machine, rootfs=rootfs, tmpdir=tmpdir, + deploy_dir_image=dir_image, display=display, + logfile=bootlog, boottime=boottime, + use_kvm=kvm, dump_dir=dump_dir, + dump_host_cmds=dump_host_cmds) + + def start(self, params=None, extra_bootparams=None): + if self.runner.start(params, extra_bootparams=extra_bootparams): + self.ip = self.runner.ip + self.server_ip = self.runner.server_ip + else: + self.stop() + raise RuntimeError("FAILED to start qemu - check the task log and the boot log") + + def stop(self): + self.runner.stop() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py b/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py new file mode 100644 index 000000000..b80939c0e --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/target/ssh.py @@ -0,0 +1,266 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import time +import select +import logging +import subprocess + +from . import OETarget + +class OESSHTarget(OETarget): + def __init__(self, logger, ip, server_ip, timeout=300, user='root', + port=None, **kwargs): + if not logger: + logger = logging.getLogger('target') + logger.setLevel(logging.INFO) + filePath = os.path.join(os.getcwd(), 'remoteTarget.log') + fileHandler = logging.FileHandler(filePath, 'w', 'utf-8') + formatter = logging.Formatter( + '%(asctime)s.%(msecs)03d %(levelname)s: %(message)s', + '%H:%M:%S') + fileHandler.setFormatter(formatter) + logger.addHandler(fileHandler) + + super(OESSHTarget, self).__init__(logger) + self.ip = ip + self.server_ip = server_ip + self.timeout = timeout + self.user = user + ssh_options = [ + '-o', 'UserKnownHostsFile=/dev/null', + '-o', 'StrictHostKeyChecking=no', + '-o', 'LogLevel=ERROR' + ] + self.ssh = ['ssh', '-l', self.user ] + ssh_options + self.scp = ['scp'] + ssh_options + if port: + self.ssh = self.ssh + [ '-p', port ] + self.scp = self.scp + [ '-P', port ] + + def start(self, **kwargs): + pass + + def stop(self, **kwargs): + pass + + def _run(self, command, timeout=None, ignore_status=True): + """ + Runs command in target using SSHProcess. + """ + self.logger.debug("[Running]$ %s" % " ".join(command)) + + starttime = time.time() + status, output = SSHCall(command, self.logger, timeout) + self.logger.debug("[Command returned '%d' after %.2f seconds]" + "" % (status, time.time() - starttime)) + + if status and not ignore_status: + raise AssertionError("Command '%s' returned non-zero exit " + "status %d:\n%s" % (command, status, output)) + + return (status, output) + + def run(self, command, timeout=None): + """ + Runs command in target. + + command: Command to run on target. + timeout: <value>: Kill command after <val> seconds. + None: Kill command default value seconds. + 0: No timeout, runs until return. + """ + targetCmd = 'export PATH=/usr/sbin:/sbin:/usr/bin:/bin; %s' % command + sshCmd = self.ssh + [self.ip, targetCmd] + + if timeout: + processTimeout = timeout + elif timeout==0: + processTimeout = None + else: + processTimeout = self.timeout + + status, output = self._run(sshCmd, processTimeout, True) + self.logger.info('\nCommand: %s\nOutput: %s\n' % (command, output)) + return (status, output) + + def copyTo(self, localSrc, remoteDst): + """ + Copy file to target. + + If local file is symlink, recreate symlink in target. + """ + if os.path.islink(localSrc): + link = os.readlink(localSrc) + dstDir, dstBase = os.path.split(remoteDst) + sshCmd = 'cd %s; ln -s %s %s' % (dstDir, link, dstBase) + return self.run(sshCmd) + + else: + remotePath = '%s@%s:%s' % (self.user, self.ip, remoteDst) + scpCmd = self.scp + [localSrc, remotePath] + return self._run(scpCmd, ignore_status=False) + + def copyFrom(self, remoteSrc, localDst): + """ + Copy file from target. + """ + remotePath = '%s@%s:%s' % (self.user, self.ip, remoteSrc) + scpCmd = self.scp + [remotePath, localDst] + return self._run(scpCmd, ignore_status=False) + + def copyDirTo(self, localSrc, remoteDst): + """ + Copy recursively localSrc directory to remoteDst in target. + """ + + for root, dirs, files in os.walk(localSrc): + # Create directories in the target as needed + for d in dirs: + tmpDir = os.path.join(root, d).replace(localSrc, "") + newDir = os.path.join(remoteDst, tmpDir.lstrip("/")) + cmd = "mkdir -p %s" % newDir + self.run(cmd) + + # Copy files into the target + for f in files: + tmpFile = os.path.join(root, f).replace(localSrc, "") + dstFile = os.path.join(remoteDst, tmpFile.lstrip("/")) + srcFile = os.path.join(root, f) + self.copyTo(srcFile, dstFile) + + def deleteFiles(self, remotePath, files): + """ + Deletes files in target's remotePath. + """ + + cmd = "rm" + if not isinstance(files, list): + files = [files] + + for f in files: + cmd = "%s %s" % (cmd, os.path.join(remotePath, f)) + + self.run(cmd) + + + def deleteDir(self, remotePath): + """ + Deletes target's remotePath directory. + """ + + cmd = "rmdir %s" % remotePath + self.run(cmd) + + + def deleteDirStructure(self, localPath, remotePath): + """ + Delete recursively localPath structure directory in target's remotePath. + + This function is very usefult to delete a package that is installed in + the DUT and the host running the test has such package extracted in tmp + directory. + + Example: + pwd: /home/user/tmp + tree: . + └── work + ├── dir1 + │ └── file1 + └── dir2 + + localpath = "/home/user/tmp" and remotepath = "/home/user" + + With the above variables this function will try to delete the + directory in the DUT in this order: + /home/user/work/dir1/file1 + /home/user/work/dir1 (if dir is empty) + /home/user/work/dir2 (if dir is empty) + /home/user/work (if dir is empty) + """ + + for root, dirs, files in os.walk(localPath, topdown=False): + # Delete files first + tmpDir = os.path.join(root).replace(localPath, "") + remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) + self.deleteFiles(remoteDir, files) + + # Remove dirs if empty + for d in dirs: + tmpDir = os.path.join(root, d).replace(localPath, "") + remoteDir = os.path.join(remotePath, tmpDir.lstrip("/")) + self.deleteDir(remoteDir) + +def SSHCall(command, logger, timeout=None, **opts): + + def run(): + nonlocal output + nonlocal process + starttime = time.time() + process = subprocess.Popen(command, **options) + if timeout: + endtime = starttime + timeout + eof = False + while time.time() < endtime and not eof: + logger.debug('time: %s, endtime: %s' % (time.time(), endtime)) + try: + if select.select([process.stdout], [], [], 5)[0] != []: + data = os.read(process.stdout.fileno(), 1024) + if not data: + process.stdout.close() + eof = True + else: + data = data.decode("utf-8") + output += data + logger.debug('Partial data from SSH call: %s' % data) + endtime = time.time() + timeout + except InterruptedError: + continue + + # process hasn't returned yet + if not eof: + process.terminate() + time.sleep(5) + try: + process.kill() + except OSError: + pass + endtime = time.time() - starttime + lastline = ("\nProcess killed - no output for %d seconds. Total" + " running time: %d seconds." % (timeout, endtime)) + logger.debug('Received data from SSH call %s ' % lastline) + output += lastline + + else: + output = process.communicate()[0].decode("utf-8") + logger.debug('Data from SSH call: %s' % output.rstrip()) + + options = { + "stdout": subprocess.PIPE, + "stderr": subprocess.STDOUT, + "stdin": None, + "shell": False, + "bufsize": -1, + "preexec_fn": os.setsid, + } + options.update(opts) + output = '' + process = None + + # Unset DISPLAY which means we won't trigger SSH_ASKPASS + env = os.environ.copy() + if "DISPLAY" in env: + del env['DISPLAY'] + options['env'] = env + + try: + run() + except: + # Need to guard against a SystemExit or other exception ocurring + # whilst running and ensure we don't leave a process behind. + if process.poll() is None: + process.kill() + logger.debug('Something went wrong, killing SSH process') + raise + return (process.wait(), output.rstrip()) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/__init__.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py new file mode 100644 index 000000000..88003a6ad --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/data.py @@ -0,0 +1,20 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.oetag import OETestTag +from oeqa.core.decorator.data import OETestDataDepends + +class DataTest(OETestCase): + data_vars = ['IMAGE', 'ARCH'] + + @OETestDataDepends(['MACHINE',]) + @OETestTag('dataTestOk') + def testDataOk(self): + self.assertEqual(self.td.get('IMAGE'), 'core-image-minimal') + self.assertEqual(self.td.get('ARCH'), 'x86') + self.assertEqual(self.td.get('MACHINE'), 'qemuarm') + + @OETestTag('dataTestFail') + def testDataFail(self): + pass diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py new file mode 100644 index 000000000..17cdd90b1 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/depends.py @@ -0,0 +1,38 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.depends import OETestDepends + +class DependsTest(OETestCase): + + def testDependsFirst(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsFirst']) + def testDependsSecond(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsSecond']) + def testDependsThird(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsSecond']) + def testDependsFourth(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsThird', 'testDependsFourth']) + def testDependsFifth(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsCircular3']) + def testDependsCircular1(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsCircular1']) + def testDependsCircular2(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestDepends(['testDependsCircular2']) + def testDependsCircular3(self): + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py new file mode 100644 index 000000000..038d44593 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/invalid/oeid.py @@ -0,0 +1,15 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase + +class AnotherIDTest(OETestCase): + + def testAnotherIdGood(self): + self.assertTrue(True, msg='How is this possible?') + + def testAnotherIdOther(self): + self.assertTrue(True, msg='How is this possible?') + + def testAnotherIdNone(self): + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py new file mode 100644 index 000000000..c9ffd1777 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/loader/valid/another.py @@ -0,0 +1,9 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase + +class AnotherTest(OETestCase): + + def testAnother(self): + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py new file mode 100644 index 000000000..c2d3d32f2 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oeid.py @@ -0,0 +1,18 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.oeid import OETestID + +class IDTest(OETestCase): + + @OETestID(101) + def testIdGood(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestID(102) + def testIdOther(self): + self.assertTrue(True, msg='How is this possible?') + + def testIdNone(self): + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py new file mode 100644 index 000000000..0cae02e75 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/oetag.py @@ -0,0 +1,18 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.oetag import OETestTag + +class TagTest(OETestCase): + + @OETestTag('goodTag') + def testTagGood(self): + self.assertTrue(True, msg='How is this possible?') + + @OETestTag('otherTag') + def testTagOther(self): + self.assertTrue(True, msg='How is this possible?') + + def testTagNone(self): + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py new file mode 100644 index 000000000..870c3157f --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/cases/timeout.py @@ -0,0 +1,18 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +from time import sleep + +from oeqa.core.case import OETestCase +from oeqa.core.decorator.oetimeout import OETimeout + +class TimeoutTest(OETestCase): + + @OETimeout(1) + def testTimeoutPass(self): + self.assertTrue(True, msg='How is this possible?') + + @OETimeout(1) + def testTimeoutFail(self): + sleep(2) + self.assertTrue(True, msg='How is this possible?') diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py new file mode 100644 index 000000000..52b18a1c3 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/common.py @@ -0,0 +1,35 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import sys +import os + +import unittest +import logging +import os + +logger = logging.getLogger("oeqa") +logger.setLevel(logging.INFO) +consoleHandler = logging.StreamHandler() +formatter = logging.Formatter('OEQATest: %(message)s') +consoleHandler.setFormatter(formatter) +logger.addHandler(consoleHandler) + +def setup_sys_path(): + directory = os.path.dirname(os.path.abspath(__file__)) + oeqa_lib = os.path.realpath(os.path.join(directory, '../../../')) + if not oeqa_lib in sys.path: + sys.path.insert(0, oeqa_lib) + +class TestBase(unittest.TestCase): + def setUp(self): + self.logger = logger + directory = os.path.dirname(os.path.abspath(__file__)) + self.cases_path = os.path.join(directory, 'cases') + + def _testLoader(self, d={}, modules=[], tests=[], filters={}): + from oeqa.core.context import OETestContext + tc = OETestContext(d, self.logger) + tc.loadTests(self.cases_path, modules=modules, tests=tests, + filters=filters) + return tc diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py new file mode 100755 index 000000000..320468cbe --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_data.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import unittest +import logging +import os + +from common import setup_sys_path, TestBase +setup_sys_path() + +from oeqa.core.exception import OEQAMissingVariable +from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames + +class TestData(TestBase): + modules = ['data'] + + def test_data_fail_missing_variable(self): + expectedException = "oeqa.core.exception.OEQAMissingVariable" + + tc = self._testLoader(modules=self.modules) + self.assertEqual(False, tc.runTests().wasSuccessful()) + for test, data in tc._results['errors']: + expect = False + if expectedException in data: + expect = True + + self.assertTrue(expect) + + def test_data_fail_wrong_variable(self): + expectedError = 'AssertionError' + d = {'IMAGE' : 'core-image-sato', 'ARCH' : 'arm'} + + tc = self._testLoader(d=d, modules=self.modules) + self.assertEqual(False, tc.runTests().wasSuccessful()) + for test, data in tc._results['failures']: + expect = False + if expectedError in data: + expect = True + + self.assertTrue(expect) + + def test_data_ok(self): + d = {'IMAGE' : 'core-image-minimal', 'ARCH' : 'x86', 'MACHINE' : 'qemuarm'} + + tc = self._testLoader(d=d, modules=self.modules) + self.assertEqual(True, tc.runTests().wasSuccessful()) + +if __name__ == '__main__': + unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py new file mode 100755 index 000000000..f7d11e885 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_decorators.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import signal +import unittest + +from common import setup_sys_path, TestBase +setup_sys_path() + +from oeqa.core.exception import OEQADependency +from oeqa.core.utils.test import getCaseMethod, getSuiteCasesNames, getSuiteCasesIDs + +class TestFilterDecorator(TestBase): + + def _runFilterTest(self, modules, filters, expect, msg): + tc = self._testLoader(modules=modules, filters=filters) + test_loaded = set(getSuiteCasesNames(tc.suites)) + self.assertEqual(expect, test_loaded, msg=msg) + + def test_oetag(self): + # Get all cases without filtering. + filter_all = {} + test_all = {'testTagGood', 'testTagOther', 'testTagNone'} + msg_all = 'Failed to get all oetag cases without filtering.' + + # Get cases with 'goodTag'. + filter_good = {'oetag':'goodTag'} + test_good = {'testTagGood'} + msg_good = 'Failed to get just one test filtering with "goodTag" oetag.' + + # Get cases with an invalid tag. + filter_invalid = {'oetag':'invalidTag'} + test_invalid = set() + msg_invalid = 'Failed to filter all test using an invalid oetag.' + + tests = ((filter_all, test_all, msg_all), + (filter_good, test_good, msg_good), + (filter_invalid, test_invalid, msg_invalid)) + + for test in tests: + self._runFilterTest(['oetag'], test[0], test[1], test[2]) + + def test_oeid(self): + # Get all cases without filtering. + filter_all = {} + test_all = {'testIdGood', 'testIdOther', 'testIdNone'} + msg_all = 'Failed to get all oeid cases without filtering.' + + # Get cases with '101' oeid. + filter_good = {'oeid': 101} + test_good = {'testIdGood'} + msg_good = 'Failed to get just one tes filtering with "101" oeid.' + + # Get cases with an invalid id. + filter_invalid = {'oeid':999} + test_invalid = set() + msg_invalid = 'Failed to filter all test using an invalid oeid.' + + tests = ((filter_all, test_all, msg_all), + (filter_good, test_good, msg_good), + (filter_invalid, test_invalid, msg_invalid)) + + for test in tests: + self._runFilterTest(['oeid'], test[0], test[1], test[2]) + +class TestDependsDecorator(TestBase): + modules = ['depends'] + + def test_depends_order(self): + tests = ['depends.DependsTest.testDependsFirst', + 'depends.DependsTest.testDependsSecond', + 'depends.DependsTest.testDependsThird', + 'depends.DependsTest.testDependsFourth', + 'depends.DependsTest.testDependsFifth'] + tests2 = list(tests) + tests2[2], tests2[3] = tests[3], tests[2] + tc = self._testLoader(modules=self.modules, tests=tests) + test_loaded = getSuiteCasesIDs(tc.suites) + result = True if test_loaded == tests or test_loaded == tests2 else False + msg = 'Failed to order tests using OETestDepends decorator.\nTest order:'\ + ' %s.\nExpected: %s\nOr: %s' % (test_loaded, tests, tests2) + self.assertTrue(result, msg=msg) + + def test_depends_fail_missing_dependency(self): + expect = "TestCase depends.DependsTest.testDependsSecond depends on "\ + "depends.DependsTest.testDependsFirst and isn't available" + tests = ['depends.DependsTest.testDependsSecond'] + try: + # Must throw OEQADependency because missing 'testDependsFirst' + tc = self._testLoader(modules=self.modules, tests=tests) + self.fail('Expected OEQADependency exception') + except OEQADependency as e: + result = True if expect in str(e) else False + msg = 'Expected OEQADependency exception missing testDependsFirst test' + self.assertTrue(result, msg=msg) + + def test_depends_fail_circular_dependency(self): + expect = 'have a circular dependency' + tests = ['depends.DependsTest.testDependsCircular1', + 'depends.DependsTest.testDependsCircular2', + 'depends.DependsTest.testDependsCircular3'] + try: + # Must throw OEQADependency because circular dependency + tc = self._testLoader(modules=self.modules, tests=tests) + self.fail('Expected OEQADependency exception') + except OEQADependency as e: + result = True if expect in str(e) else False + msg = 'Expected OEQADependency exception having a circular dependency' + self.assertTrue(result, msg=msg) + +class TestTimeoutDecorator(TestBase): + modules = ['timeout'] + + def test_timeout(self): + tests = ['timeout.TimeoutTest.testTimeoutPass'] + msg = 'Failed to run test using OETestTimeout' + alarm_signal = signal.getsignal(signal.SIGALRM) + tc = self._testLoader(modules=self.modules, tests=tests) + self.assertTrue(tc.runTests().wasSuccessful(), msg=msg) + msg = "OETestTimeout didn't restore SIGALRM" + self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg) + + def test_timeout_fail(self): + tests = ['timeout.TimeoutTest.testTimeoutFail'] + msg = "OETestTimeout test didn't timeout as expected" + alarm_signal = signal.getsignal(signal.SIGALRM) + tc = self._testLoader(modules=self.modules, tests=tests) + self.assertFalse(tc.runTests().wasSuccessful(), msg=msg) + msg = "OETestTimeout didn't restore SIGALRM" + self.assertIs(alarm_signal, signal.getsignal(signal.SIGALRM), msg=msg) + +if __name__ == '__main__': + unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py new file mode 100755 index 000000000..b79b8bad4 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_loader.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import unittest + +from common import setup_sys_path, TestBase +setup_sys_path() + +from oeqa.core.exception import OEQADependency +from oeqa.core.utils.test import getSuiteModules, getSuiteCasesIDs + +class TestLoader(TestBase): + + def test_fail_empty_filter(self): + filters = {'oetag' : ''} + expect = 'Filter oetag specified is empty' + msg = 'Expected TypeError exception for having invalid filter' + try: + # Must throw TypeError because empty filter + tc = self._testLoader(filters=filters) + self.fail(msg) + except TypeError as e: + result = True if expect in str(e) else False + self.assertTrue(result, msg=msg) + + def test_fail_invalid_filter(self): + filters = {'invalid' : 'good'} + expect = 'filter but not declared in any of' + msg = 'Expected TypeError exception for having invalid filter' + try: + # Must throw TypeError because invalid filter + tc = self._testLoader(filters=filters) + self.fail(msg) + except TypeError as e: + result = True if expect in str(e) else False + self.assertTrue(result, msg=msg) + + def test_fail_duplicated_module(self): + cases_path = self.cases_path + invalid_path = os.path.join(cases_path, 'loader', 'invalid') + self.cases_path = [self.cases_path, invalid_path] + expect = 'Duplicated oeid module found in' + msg = 'Expected ImportError exception for having duplicated module' + try: + # Must throw ImportEror because duplicated module + tc = self._testLoader() + self.fail(msg) + except ImportError as e: + result = True if expect in str(e) else False + self.assertTrue(result, msg=msg) + finally: + self.cases_path = cases_path + + def test_filter_modules(self): + expected_modules = {'oeid', 'oetag'} + tc = self._testLoader(modules=expected_modules) + modules = getSuiteModules(tc.suites) + msg = 'Expected just %s modules' % ', '.join(expected_modules) + self.assertEqual(modules, expected_modules, msg=msg) + + def test_filter_cases(self): + modules = ['oeid', 'oetag', 'data'] + expected_cases = {'data.DataTest.testDataOk', + 'oetag.TagTest.testTagGood', + 'oeid.IDTest.testIdGood'} + tc = self._testLoader(modules=modules, tests=expected_cases) + cases = set(getSuiteCasesIDs(tc.suites)) + msg = 'Expected just %s cases' % ', '.join(expected_cases) + self.assertEqual(cases, expected_cases, msg=msg) + + def test_import_from_paths(self): + cases_path = self.cases_path + cases2_path = os.path.join(cases_path, 'loader', 'valid') + expected_modules = {'oeid', 'another'} + self.cases_path = [self.cases_path, cases2_path] + tc = self._testLoader(modules=expected_modules) + modules = getSuiteModules(tc.suites) + self.cases_path = cases_path + msg = 'Expected modules from two different paths' + self.assertEqual(modules, expected_modules, msg=msg) + +if __name__ == '__main__': + unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py new file mode 100755 index 000000000..a3f3861fe --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/tests/test_runner.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import unittest +import logging +import tempfile + +from common import setup_sys_path, TestBase +setup_sys_path() + +from oeqa.core.runner import OEStreamLogger + +class TestRunner(TestBase): + def test_stream_logger(self): + fp = tempfile.TemporaryFile(mode='w+') + + logging.basicConfig(format='%(message)s', stream=fp) + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + oeSL = OEStreamLogger(logger) + + lines = ['init', 'bigline_' * 65535, 'morebigline_' * 65535 * 4, 'end'] + for line in lines: + oeSL.write(line) + + fp.seek(0) + fp_lines = fp.readlines() + for i, fp_line in enumerate(fp_lines): + fp_line = fp_line.strip() + self.assertEqual(lines[i], fp_line) + + fp.close() + +if __name__ == '__main__': + unittest.main() diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/__init__.py diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py new file mode 100644 index 000000000..0b223b5d0 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/misc.py @@ -0,0 +1,44 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +def toList(obj, obj_type, obj_name="Object"): + if isinstance(obj, obj_type): + return [obj] + elif isinstance(obj, list): + return obj + else: + raise TypeError("%s must be %s or list" % (obj_name, obj_type)) + +def toSet(obj, obj_type, obj_name="Object"): + if isinstance(obj, obj_type): + return {obj} + elif isinstance(obj, list): + return set(obj) + elif isinstance(obj, set): + return obj + else: + raise TypeError("%s must be %s or set" % (obj_name, obj_type)) + +def strToList(obj, obj_name="Object"): + return toList(obj, str, obj_name) + +def strToSet(obj, obj_name="Object"): + return toSet(obj, str, obj_name) + +def intToList(obj, obj_name="Object"): + return toList(obj, int, obj_name) + +def dataStoteToDict(d, variables): + data = {} + + for v in variables: + data[v] = d.getVar(v) + + return data + +def updateTestData(d, td, variables): + """ + Updates variables with values of data store to test data. + """ + for var in variables: + td[var] = d.getVar(var) diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py new file mode 100644 index 000000000..a21caad5c --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/path.py @@ -0,0 +1,19 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import sys + +def findFile(file_name, directory): + """ + Search for a file in directory and returns its complete path. + """ + for r, d, f in os.walk(directory): + if file_name in f: + return os.path.join(r, file_name) + return None + +def remove_safe(path): + if os.path.exists(path): + os.remove(path) + diff --git a/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py new file mode 100644 index 000000000..88d5d1398 --- /dev/null +++ b/import-layers/yocto-poky/meta/lib/oeqa/core/utils/test.py @@ -0,0 +1,86 @@ +# Copyright (C) 2016 Intel Corporation +# Released under the MIT license (see COPYING.MIT) + +import os +import inspect +import unittest + +def getSuiteCases(suite): + """ + Returns individual test from a test suite. + """ + tests = [] + + if isinstance(suite, unittest.TestCase): + tests.append(suite) + elif isinstance(suite, unittest.suite.TestSuite): + for item in suite: + tests.extend(getSuiteCases(item)) + + return tests + +def getSuiteModules(suite): + """ + Returns modules in a test suite. + """ + modules = set() + for test in getSuiteCases(suite): + modules.add(getCaseModule(test)) + return modules + +def getSuiteCasesInfo(suite, func): + """ + Returns test case info from suite. Info is fetched from func. + """ + tests = [] + for test in getSuiteCases(suite): + tests.append(func(test)) + return tests + +def getSuiteCasesNames(suite): + """ + Returns test case names from suite. + """ + return getSuiteCasesInfo(suite, getCaseMethod) + +def getSuiteCasesIDs(suite): + """ + Returns test case ids from suite. + """ + return getSuiteCasesInfo(suite, getCaseID) + +def getSuiteCasesFiles(suite): + """ + Returns test case files paths from suite. + """ + return getSuiteCasesInfo(suite, getCaseFile) + +def getCaseModule(test_case): + """ + Returns test case module name. + """ + return test_case.__module__ + +def getCaseClass(test_case): + """ + Returns test case class name. + """ + return test_case.__class__.__name__ + +def getCaseID(test_case): + """ + Returns test case complete id. + """ + return test_case.id() + +def getCaseFile(test_case): + """ + Returns test case file path. + """ + return inspect.getsourcefile(test_case.__class__) + +def getCaseMethod(test_case): + """ + Returns test case method name. + """ + return getCaseID(test_case).split('.')[-1] |