diff options
author | Dave Cobbley <david.j.cobbley@linux.intel.com> | 2018-08-14 20:05:37 +0300 |
---|---|---|
committer | Brad Bishop <bradleyb@fuzziesquirrel.com> | 2018-08-23 04:26:31 +0300 |
commit | eb8dc40360f0cfef56fb6947cc817a547d6d9bc6 (patch) | |
tree | de291a73dc37168da6370e2cf16c347d1eba9df8 /poky/bitbake/lib/bb/tests | |
parent | 9c3cf826d853102535ead04cebc2d6023eff3032 (diff) | |
download | openbmc-eb8dc40360f0cfef56fb6947cc817a547d6d9bc6.tar.xz |
[Subtree] Removing import-layers directory
As part of the move to subtrees, need to bring all the import layers
content to the top level.
Change-Id: I4a163d10898cbc6e11c27f776f60e1a470049d8f
Signed-off-by: Dave Cobbley <david.j.cobbley@linux.intel.com>
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
Diffstat (limited to 'poky/bitbake/lib/bb/tests')
-rw-r--r-- | poky/bitbake/lib/bb/tests/__init__.py | 0 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/codeparser.py | 428 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/cow.py | 136 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/data.py | 607 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/event.py | 986 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/fetch.py | 1573 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/parse.py | 185 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/utils.py | 603 |
8 files changed, 4518 insertions, 0 deletions
diff --git a/poky/bitbake/lib/bb/tests/__init__.py b/poky/bitbake/lib/bb/tests/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/poky/bitbake/lib/bb/tests/__init__.py diff --git a/poky/bitbake/lib/bb/tests/codeparser.py b/poky/bitbake/lib/bb/tests/codeparser.py new file mode 100644 index 000000000..e30e78c15 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/codeparser.py @@ -0,0 +1,428 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Test for codeparser.py +# +# Copyright (C) 2010 Chris Larson +# Copyright (C) 2012 Richard Purdie +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import logging +import bb + +logger = logging.getLogger('BitBake.TestCodeParser') + +# bb.data references bb.parse but can't directly import due to circular dependencies. +# Hack around it for now :( +import bb.parse +import bb.data + +class ReferenceTest(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + + def setEmptyVars(self, varlist): + for k in varlist: + self.d.setVar(k, "") + + def setValues(self, values): + for k, v in values.items(): + self.d.setVar(k, v) + + def assertReferences(self, refs): + self.assertEqual(self.references, refs) + + def assertExecs(self, execs): + self.assertEqual(self.execs, execs) + + def assertContains(self, contains): + self.assertEqual(self.contains, contains) + +class VariableReferenceTest(ReferenceTest): + + def parseExpression(self, exp): + parsedvar = self.d.expandWithRefs(exp, None) + self.references = parsedvar.references + + def test_simple_reference(self): + self.setEmptyVars(["FOO"]) + self.parseExpression("${FOO}") + self.assertReferences(set(["FOO"])) + + def test_nested_reference(self): + self.setEmptyVars(["BAR"]) + self.d.setVar("FOO", "BAR") + self.parseExpression("${${FOO}}") + self.assertReferences(set(["FOO", "BAR"])) + + def test_python_reference(self): + self.setEmptyVars(["BAR"]) + self.parseExpression("${@d.getVar('BAR') + 'foo'}") + self.assertReferences(set(["BAR"])) + +class ShellReferenceTest(ReferenceTest): + + def parseExpression(self, exp): + parsedvar = self.d.expandWithRefs(exp, None) + parser = bb.codeparser.ShellParser("ParserTest", logger) + parser.parse_shell(parsedvar.value) + + self.references = parsedvar.references + self.execs = parser.execs + + def test_quotes_inside_assign(self): + self.parseExpression('foo=foo"bar"baz') + self.assertReferences(set([])) + + def test_quotes_inside_arg(self): + self.parseExpression('sed s#"bar baz"#"alpha beta"#g') + self.assertExecs(set(["sed"])) + + def test_arg_continuation(self): + self.parseExpression("sed -i -e s,foo,bar,g \\\n *.pc") + self.assertExecs(set(["sed"])) + + def test_dollar_in_quoted(self): + self.parseExpression('sed -i -e "foo$" *.pc') + self.assertExecs(set(["sed"])) + + def test_quotes_inside_arg_continuation(self): + self.setEmptyVars(["bindir", "D", "libdir"]) + self.parseExpression(""" +sed -i -e s#"moc_location=.*$"#"moc_location=${bindir}/moc4"# \\ +-e s#"uic_location=.*$"#"uic_location=${bindir}/uic4"# \\ +${D}${libdir}/pkgconfig/*.pc +""") + self.assertReferences(set(["bindir", "D", "libdir"])) + + def test_assign_subshell_expansion(self): + self.parseExpression("foo=$(echo bar)") + self.assertExecs(set(["echo"])) + + def test_shell_unexpanded(self): + self.setEmptyVars(["QT_BASE_NAME"]) + self.parseExpression('echo "${QT_BASE_NAME}"') + self.assertExecs(set(["echo"])) + self.assertReferences(set(["QT_BASE_NAME"])) + + def test_incomplete_varexp_single_quotes(self): + self.parseExpression("sed -i -e 's:IP{:I${:g' $pc") + self.assertExecs(set(["sed"])) + + + def test_until(self): + self.parseExpression("until false; do echo true; done") + self.assertExecs(set(["false", "echo"])) + self.assertReferences(set()) + + def test_case(self): + self.parseExpression(""" +case $foo in +*) +bar +;; +esac +""") + self.assertExecs(set(["bar"])) + self.assertReferences(set()) + + def test_assign_exec(self): + self.parseExpression("a=b c='foo bar' alpha 1 2 3") + self.assertExecs(set(["alpha"])) + + def test_redirect_to_file(self): + self.setEmptyVars(["foo"]) + self.parseExpression("echo foo >${foo}/bar") + self.assertExecs(set(["echo"])) + self.assertReferences(set(["foo"])) + + def test_heredoc(self): + self.setEmptyVars(["theta"]) + self.parseExpression(""" +cat <<END +alpha +beta +${theta} +END +""") + self.assertReferences(set(["theta"])) + + def test_redirect_from_heredoc(self): + v = ["B", "SHADOW_MAILDIR", "SHADOW_MAILFILE", "SHADOW_UTMPDIR", "SHADOW_LOGDIR", "bindir"] + self.setEmptyVars(v) + self.parseExpression(""" +cat <<END >${B}/cachedpaths +shadow_cv_maildir=${SHADOW_MAILDIR} +shadow_cv_mailfile=${SHADOW_MAILFILE} +shadow_cv_utmpdir=${SHADOW_UTMPDIR} +shadow_cv_logdir=${SHADOW_LOGDIR} +shadow_cv_passwd_dir=${bindir} +END +""") + self.assertReferences(set(v)) + self.assertExecs(set(["cat"])) + +# def test_incomplete_command_expansion(self): +# self.assertRaises(reftracker.ShellSyntaxError, reftracker.execs, +# bbvalue.shparse("cp foo`", self.d), self.d) + +# def test_rogue_dollarsign(self): +# self.setValues({"D" : "/tmp"}) +# self.parseExpression("install -d ${D}$") +# self.assertReferences(set(["D"])) +# self.assertExecs(set(["install"])) + + +class PythonReferenceTest(ReferenceTest): + + def setUp(self): + self.d = bb.data.init() + if hasattr(bb.utils, "_context"): + self.context = bb.utils._context + else: + import builtins + self.context = builtins.__dict__ + + def parseExpression(self, exp): + parsedvar = self.d.expandWithRefs(exp, None) + parser = bb.codeparser.PythonParser("ParserTest", logger) + parser.parse_python(parsedvar.value) + + self.references = parsedvar.references | parser.references + self.execs = parser.execs + self.contains = parser.contains + + @staticmethod + def indent(value): + """Python Snippets have to be indented, python values don't have to +be. These unit tests are testing snippets.""" + return " " + value + + def test_getvar_reference(self): + self.parseExpression("d.getVar('foo')") + self.assertReferences(set(["foo"])) + self.assertExecs(set()) + + def test_getvar_computed_reference(self): + self.parseExpression("d.getVar('f' + 'o' + 'o')") + self.assertReferences(set()) + self.assertExecs(set()) + + def test_getvar_exec_reference(self): + self.parseExpression("eval('d.getVar(\"foo\")')") + self.assertReferences(set()) + self.assertExecs(set(["eval"])) + + def test_var_reference(self): + self.context["foo"] = lambda x: x + self.setEmptyVars(["FOO"]) + self.parseExpression("foo('${FOO}')") + self.assertReferences(set(["FOO"])) + self.assertExecs(set(["foo"])) + del self.context["foo"] + + def test_var_exec(self): + for etype in ("func", "task"): + self.d.setVar("do_something", "echo 'hi mom! ${FOO}'") + self.d.setVarFlag("do_something", etype, True) + self.parseExpression("bb.build.exec_func('do_something', d)") + self.assertReferences(set([])) + self.assertExecs(set(["do_something"])) + + def test_function_reference(self): + self.context["testfunc"] = lambda msg: bb.msg.note(1, None, msg) + self.d.setVar("FOO", "Hello, World!") + self.parseExpression("testfunc('${FOO}')") + self.assertReferences(set(["FOO"])) + self.assertExecs(set(["testfunc"])) + del self.context["testfunc"] + + def test_qualified_function_reference(self): + self.parseExpression("time.time()") + self.assertExecs(set(["time.time"])) + + def test_qualified_function_reference_2(self): + self.parseExpression("os.path.dirname('/foo/bar')") + self.assertExecs(set(["os.path.dirname"])) + + def test_qualified_function_reference_nested(self): + self.parseExpression("time.strftime('%Y%m%d',time.gmtime())") + self.assertExecs(set(["time.strftime", "time.gmtime"])) + + def test_function_reference_chained(self): + self.context["testget"] = lambda: "\tstrip me " + self.parseExpression("testget().strip()") + self.assertExecs(set(["testget"])) + del self.context["testget"] + + def test_contains(self): + self.parseExpression('bb.utils.contains("TESTVAR", "one", "true", "false", d)') + self.assertContains({'TESTVAR': {'one'}}) + + def test_contains_multi(self): + self.parseExpression('bb.utils.contains("TESTVAR", "one two", "true", "false", d)') + self.assertContains({'TESTVAR': {'one two'}}) + + def test_contains_any(self): + self.parseExpression('bb.utils.contains_any("TESTVAR", "hello", "true", "false", d)') + self.assertContains({'TESTVAR': {'hello'}}) + + def test_contains_any_multi(self): + self.parseExpression('bb.utils.contains_any("TESTVAR", "one two three", "true", "false", d)') + self.assertContains({'TESTVAR': {'one', 'two', 'three'}}) + + def test_contains_filter(self): + self.parseExpression('bb.utils.filter("TESTVAR", "hello there world", d)') + self.assertContains({'TESTVAR': {'hello', 'there', 'world'}}) + + +class DependencyReferenceTest(ReferenceTest): + + pydata = """ +d.getVar('somevar') +def test(d): + foo = 'bar %s' % 'foo' +def test2(d): + d.getVar(foo) + d.getVar('bar', False) + test2(d) + +def a(): + \"\"\"some + stuff + \"\"\" + return "heh" + +test(d) + +d.expand(d.getVar("something", False)) +d.expand("${inexpand} somethingelse") +d.getVar(a(), False) +""" + + def test_python(self): + self.d.setVar("FOO", self.pydata) + self.setEmptyVars(["inexpand", "a", "test2", "test"]) + self.d.setVarFlags("FOO", { + "func": True, + "python": True, + "lineno": 1, + "filename": "example.bb", + }) + + deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d) + + self.assertEqual(deps, set(["somevar", "bar", "something", "inexpand", "test", "test2", "a"])) + + + shelldata = """ +foo () { +bar +} +{ +echo baz +$(heh) +eval `moo` +} +a=b +c=d +( +true && false +test -f foo +testval=something +$testval +) || aiee +! inverted +echo ${somevar} + +case foo in +bar) +echo bar +;; +baz) +echo baz +;; +foo*) +echo foo +;; +esac +""" + + def test_shell(self): + execs = ["bar", "echo", "heh", "moo", "true", "aiee"] + self.d.setVar("somevar", "heh") + self.d.setVar("inverted", "echo inverted...") + self.d.setVarFlag("inverted", "func", True) + self.d.setVar("FOO", self.shelldata) + self.d.setVarFlags("FOO", {"func": True}) + self.setEmptyVars(execs) + + deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d) + + self.assertEqual(deps, set(["somevar", "inverted"] + execs)) + + + def test_vardeps(self): + self.d.setVar("oe_libinstall", "echo test") + self.d.setVar("FOO", "foo=oe_libinstall; eval $foo") + self.d.setVarFlag("FOO", "vardeps", "oe_libinstall") + + deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d) + + self.assertEqual(deps, set(["oe_libinstall"])) + + def test_vardeps_expand(self): + self.d.setVar("oe_libinstall", "echo test") + self.d.setVar("FOO", "foo=oe_libinstall; eval $foo") + self.d.setVarFlag("FOO", "vardeps", "${@'oe_libinstall'}") + + deps, values = bb.data.build_dependencies("FOO", set(self.d.keys()), set(), set(), self.d) + + self.assertEqual(deps, set(["oe_libinstall"])) + + def test_contains_vardeps(self): + expr = '${@bb.utils.filter("TESTVAR", "somevalue anothervalue", d)} \ + ${@bb.utils.contains("TESTVAR", "testval testval2", "yetanothervalue", "", d)} \ + ${@bb.utils.contains("TESTVAR", "testval2 testval3", "blah", "", d)} \ + ${@bb.utils.contains_any("TESTVAR", "testval2 testval3", "lastone", "", d)}' + parsedvar = self.d.expandWithRefs(expr, None) + # Check contains + self.assertEqual(parsedvar.contains, {'TESTVAR': {'testval2 testval3', 'anothervalue', 'somevalue', 'testval testval2', 'testval2', 'testval3'}}) + # Check dependencies + self.d.setVar('ANOTHERVAR', expr) + self.d.setVar('TESTVAR', 'anothervalue testval testval2') + deps, values = bb.data.build_dependencies("ANOTHERVAR", set(self.d.keys()), set(), set(), self.d) + self.assertEqual(sorted(values.splitlines()), + sorted([expr, + 'TESTVAR{anothervalue} = Set', + 'TESTVAR{somevalue} = Unset', + 'TESTVAR{testval testval2} = Set', + 'TESTVAR{testval2 testval3} = Unset', + 'TESTVAR{testval2} = Set', + 'TESTVAR{testval3} = Unset' + ])) + # Check final value + self.assertEqual(self.d.getVar('ANOTHERVAR').split(), ['anothervalue', 'yetanothervalue', 'lastone']) + + #Currently no wildcard support + #def test_vardeps_wildcards(self): + # self.d.setVar("oe_libinstall", "echo test") + # self.d.setVar("FOO", "foo=oe_libinstall; eval $foo") + # self.d.setVarFlag("FOO", "vardeps", "oe_*") + # self.assertEquals(deps, set(["oe_libinstall"])) + + diff --git a/poky/bitbake/lib/bb/tests/cow.py b/poky/bitbake/lib/bb/tests/cow.py new file mode 100644 index 000000000..d149d84d0 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/cow.py @@ -0,0 +1,136 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Tests for Copy-on-Write (cow.py) +# +# Copyright 2006 Holger Freyther <freyther@handhelds.org> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import os + +class COWTestCase(unittest.TestCase): + """ + Test case for the COW module from mithro + """ + + def testGetSet(self): + """ + Test and set + """ + from bb.COW import COWDictBase + a = COWDictBase.copy() + + self.assertEqual(False, 'a' in a) + + a['a'] = 'a' + a['b'] = 'b' + self.assertEqual(True, 'a' in a) + self.assertEqual(True, 'b' in a) + self.assertEqual('a', a['a'] ) + self.assertEqual('b', a['b'] ) + + def testCopyCopy(self): + """ + Test the copy of copies + """ + + from bb.COW import COWDictBase + + # create two COW dict 'instances' + b = COWDictBase.copy() + c = COWDictBase.copy() + + # assign some keys to one instance, some keys to another + b['a'] = 10 + b['c'] = 20 + c['a'] = 30 + + # test separation of the two instances + self.assertEqual(False, 'c' in c) + self.assertEqual(30, c['a']) + self.assertEqual(10, b['a']) + + # test copy + b_2 = b.copy() + c_2 = c.copy() + + self.assertEqual(False, 'c' in c_2) + self.assertEqual(10, b_2['a']) + + b_2['d'] = 40 + self.assertEqual(False, 'd' in c_2) + self.assertEqual(True, 'd' in b_2) + self.assertEqual(40, b_2['d']) + self.assertEqual(False, 'd' in b) + self.assertEqual(False, 'd' in c) + + c_2['d'] = 30 + self.assertEqual(True, 'd' in c_2) + self.assertEqual(True, 'd' in b_2) + self.assertEqual(30, c_2['d']) + self.assertEqual(40, b_2['d']) + self.assertEqual(False, 'd' in b) + self.assertEqual(False, 'd' in c) + + # test copy of the copy + c_3 = c_2.copy() + b_3 = b_2.copy() + b_3_2 = b_2.copy() + + c_3['e'] = 4711 + self.assertEqual(4711, c_3['e']) + self.assertEqual(False, 'e' in c_2) + self.assertEqual(False, 'e' in b_3) + self.assertEqual(False, 'e' in b_3_2) + self.assertEqual(False, 'e' in b_2) + + b_3['e'] = 'viel' + self.assertEqual('viel', b_3['e']) + self.assertEqual(4711, c_3['e']) + self.assertEqual(False, 'e' in c_2) + self.assertEqual(True, 'e' in b_3) + self.assertEqual(False, 'e' in b_3_2) + self.assertEqual(False, 'e' in b_2) + + def testCow(self): + from bb.COW import COWDictBase + c = COWDictBase.copy() + c['123'] = 1027 + c['other'] = 4711 + c['d'] = { 'abc' : 10, 'bcd' : 20 } + + copy = c.copy() + + self.assertEqual(1027, c['123']) + self.assertEqual(4711, c['other']) + self.assertEqual({'abc':10, 'bcd':20}, c['d']) + self.assertEqual(1027, copy['123']) + self.assertEqual(4711, copy['other']) + self.assertEqual({'abc':10, 'bcd':20}, copy['d']) + + # cow it now + copy['123'] = 1028 + copy['other'] = 4712 + copy['d']['abc'] = 20 + + + self.assertEqual(1027, c['123']) + self.assertEqual(4711, c['other']) + self.assertEqual({'abc':10, 'bcd':20}, c['d']) + self.assertEqual(1028, copy['123']) + self.assertEqual(4712, copy['other']) + self.assertEqual({'abc':20, 'bcd':20}, copy['d']) diff --git a/poky/bitbake/lib/bb/tests/data.py b/poky/bitbake/lib/bb/tests/data.py new file mode 100644 index 000000000..a4a9dd30f --- /dev/null +++ b/poky/bitbake/lib/bb/tests/data.py @@ -0,0 +1,607 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Tests for the Data Store (data.py/data_smart.py) +# +# Copyright (C) 2010 Chris Larson +# Copyright (C) 2012 Richard Purdie +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import bb +import bb.data +import bb.parse +import logging + +class LogRecord(): + def __enter__(self): + logs = [] + class LogHandler(logging.Handler): + def emit(self, record): + logs.append(record) + logger = logging.getLogger("BitBake") + handler = LogHandler() + self.handler = handler + logger.addHandler(handler) + return logs + def __exit__(self, type, value, traceback): + logger = logging.getLogger("BitBake") + logger.removeHandler(self.handler) + return + +def logContains(item, logs): + for l in logs: + m = l.getMessage() + if item in m: + return True + return False + +class DataExpansions(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d["foo"] = "value_of_foo" + self.d["bar"] = "value_of_bar" + self.d["value_of_foo"] = "value_of_'value_of_foo'" + + def test_one_var(self): + val = self.d.expand("${foo}") + self.assertEqual(str(val), "value_of_foo") + + def test_indirect_one_var(self): + val = self.d.expand("${${foo}}") + self.assertEqual(str(val), "value_of_'value_of_foo'") + + def test_indirect_and_another(self): + val = self.d.expand("${${foo}} ${bar}") + self.assertEqual(str(val), "value_of_'value_of_foo' value_of_bar") + + def test_python_snippet(self): + val = self.d.expand("${@5*12}") + self.assertEqual(str(val), "60") + + def test_expand_in_python_snippet(self): + val = self.d.expand("${@'boo ' + '${foo}'}") + self.assertEqual(str(val), "boo value_of_foo") + + def test_python_snippet_getvar(self): + val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}") + self.assertEqual(str(val), "value_of_foo value_of_bar") + + def test_python_unexpanded(self): + self.d.setVar("bar", "${unsetvar}") + val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}") + self.assertEqual(str(val), "${@d.getVar('foo') + ' ${unsetvar}'}") + + def test_python_snippet_syntax_error(self): + self.d.setVar("FOO", "${@foo = 5}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_python_snippet_runtime_error(self): + self.d.setVar("FOO", "${@int('test')}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_python_snippet_error_path(self): + self.d.setVar("FOO", "foo value ${BAR}") + self.d.setVar("BAR", "bar value ${@int('test')}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_value_containing_value(self): + val = self.d.expand("${@d.getVar('foo') + ' ${bar}'}") + self.assertEqual(str(val), "value_of_foo value_of_bar") + + def test_reference_undefined_var(self): + val = self.d.expand("${undefinedvar} meh") + self.assertEqual(str(val), "${undefinedvar} meh") + + def test_double_reference(self): + self.d.setVar("BAR", "bar value") + self.d.setVar("FOO", "${BAR} foo ${BAR}") + val = self.d.getVar("FOO") + self.assertEqual(str(val), "bar value foo bar value") + + def test_direct_recursion(self): + self.d.setVar("FOO", "${FOO}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_indirect_recursion(self): + self.d.setVar("FOO", "${BAR}") + self.d.setVar("BAR", "${BAZ}") + self.d.setVar("BAZ", "${FOO}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_recursion_exception(self): + self.d.setVar("FOO", "${BAR}") + self.d.setVar("BAR", "${${@'FOO'}}") + self.assertRaises(bb.data_smart.ExpansionError, self.d.getVar, "FOO", True) + + def test_incomplete_varexp_single_quotes(self): + self.d.setVar("FOO", "sed -i -e 's:IP{:I${:g' $pc") + val = self.d.getVar("FOO") + self.assertEqual(str(val), "sed -i -e 's:IP{:I${:g' $pc") + + def test_nonstring(self): + self.d.setVar("TEST", 5) + val = self.d.getVar("TEST") + self.assertEqual(str(val), "5") + + def test_rename(self): + self.d.renameVar("foo", "newfoo") + self.assertEqual(self.d.getVar("newfoo", False), "value_of_foo") + self.assertEqual(self.d.getVar("foo", False), None) + + def test_deletion(self): + self.d.delVar("foo") + self.assertEqual(self.d.getVar("foo", False), None) + + def test_keys(self): + keys = list(self.d.keys()) + self.assertCountEqual(keys, ['value_of_foo', 'foo', 'bar']) + + def test_keys_deletion(self): + newd = bb.data.createCopy(self.d) + newd.delVar("bar") + keys = list(newd.keys()) + self.assertCountEqual(keys, ['value_of_foo', 'foo']) + +class TestNestedExpansions(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d["foo"] = "foo" + self.d["bar"] = "bar" + self.d["value_of_foobar"] = "187" + + def test_refs(self): + val = self.d.expand("${value_of_${foo}${bar}}") + self.assertEqual(str(val), "187") + + #def test_python_refs(self): + # val = self.d.expand("${@${@3}**2 + ${@4}**2}") + # self.assertEqual(str(val), "25") + + def test_ref_in_python_ref(self): + val = self.d.expand("${@'${foo}' + 'bar'}") + self.assertEqual(str(val), "foobar") + + def test_python_ref_in_ref(self): + val = self.d.expand("${${@'f'+'o'+'o'}}") + self.assertEqual(str(val), "foo") + + def test_deep_nesting(self): + depth = 100 + val = self.d.expand("${" * depth + "foo" + "}" * depth) + self.assertEqual(str(val), "foo") + + #def test_deep_python_nesting(self): + # depth = 50 + # val = self.d.expand("${@" * depth + "1" + "+1}" * depth) + # self.assertEqual(str(val), str(depth + 1)) + + def test_mixed(self): + val = self.d.expand("${value_of_${@('${foo}'+'bar')[0:3]}${${@'BAR'.lower()}}}") + self.assertEqual(str(val), "187") + + def test_runtime(self): + val = self.d.expand("${${@'value_of' + '_f'+'o'+'o'+'b'+'a'+'r'}}") + self.assertEqual(str(val), "187") + +class TestMemoize(unittest.TestCase): + def test_memoized(self): + d = bb.data.init() + d.setVar("FOO", "bar") + self.assertTrue(d.getVar("FOO", False) is d.getVar("FOO", False)) + + def test_not_memoized(self): + d1 = bb.data.init() + d2 = bb.data.init() + d1.setVar("FOO", "bar") + d2.setVar("FOO", "bar2") + self.assertTrue(d1.getVar("FOO", False) is not d2.getVar("FOO", False)) + + def test_changed_after_memoized(self): + d = bb.data.init() + d.setVar("foo", "value of foo") + self.assertEqual(str(d.getVar("foo", False)), "value of foo") + d.setVar("foo", "second value of foo") + self.assertEqual(str(d.getVar("foo", False)), "second value of foo") + + def test_same_value(self): + d = bb.data.init() + d.setVar("foo", "value of") + d.setVar("bar", "value of") + self.assertEqual(d.getVar("foo", False), + d.getVar("bar", False)) + +class TestConcat(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("FOO", "foo") + self.d.setVar("VAL", "val") + self.d.setVar("BAR", "bar") + + def test_prepend(self): + self.d.setVar("TEST", "${VAL}") + self.d.prependVar("TEST", "${FOO}:") + self.assertEqual(self.d.getVar("TEST"), "foo:val") + + def test_append(self): + self.d.setVar("TEST", "${VAL}") + self.d.appendVar("TEST", ":${BAR}") + self.assertEqual(self.d.getVar("TEST"), "val:bar") + + def test_multiple_append(self): + self.d.setVar("TEST", "${VAL}") + self.d.prependVar("TEST", "${FOO}:") + self.d.appendVar("TEST", ":val2") + self.d.appendVar("TEST", ":${BAR}") + self.assertEqual(self.d.getVar("TEST"), "foo:val:val2:bar") + +class TestConcatOverride(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("FOO", "foo") + self.d.setVar("VAL", "val") + self.d.setVar("BAR", "bar") + + def test_prepend(self): + self.d.setVar("TEST", "${VAL}") + self.d.setVar("TEST_prepend", "${FOO}:") + self.assertEqual(self.d.getVar("TEST"), "foo:val") + + def test_append(self): + self.d.setVar("TEST", "${VAL}") + self.d.setVar("TEST_append", ":${BAR}") + self.assertEqual(self.d.getVar("TEST"), "val:bar") + + def test_multiple_append(self): + self.d.setVar("TEST", "${VAL}") + self.d.setVar("TEST_prepend", "${FOO}:") + self.d.setVar("TEST_append", ":val2") + self.d.setVar("TEST_append", ":${BAR}") + self.assertEqual(self.d.getVar("TEST"), "foo:val:val2:bar") + + def test_append_unset(self): + self.d.setVar("TEST_prepend", "${FOO}:") + self.d.setVar("TEST_append", ":val2") + self.d.setVar("TEST_append", ":${BAR}") + self.assertEqual(self.d.getVar("TEST"), "foo::val2:bar") + + def test_remove(self): + self.d.setVar("TEST", "${VAL} ${BAR}") + self.d.setVar("TEST_remove", "val") + self.assertEqual(self.d.getVar("TEST"), "bar") + + def test_remove_cleared(self): + self.d.setVar("TEST", "${VAL} ${BAR}") + self.d.setVar("TEST_remove", "val") + self.d.setVar("TEST", "${VAL} ${BAR}") + self.assertEqual(self.d.getVar("TEST"), "val bar") + + # Ensure the value is unchanged if we have an inactive remove override + # (including that whitespace is preserved) + def test_remove_inactive_override(self): + self.d.setVar("TEST", "${VAL} ${BAR} 123") + self.d.setVar("TEST_remove_inactiveoverride", "val") + self.assertEqual(self.d.getVar("TEST"), "val bar 123") + + def test_doubleref_remove(self): + self.d.setVar("TEST", "${VAL} ${BAR}") + self.d.setVar("TEST_remove", "val") + self.d.setVar("TEST_TEST", "${TEST} ${TEST}") + self.assertEqual(self.d.getVar("TEST_TEST"), "bar bar") + + def test_empty_remove(self): + self.d.setVar("TEST", "") + self.d.setVar("TEST_remove", "val") + self.assertEqual(self.d.getVar("TEST"), "") + + def test_remove_expansion(self): + self.d.setVar("BAR", "Z") + self.d.setVar("TEST", "${BAR}/X Y") + self.d.setVar("TEST_remove", "${BAR}/X") + self.assertEqual(self.d.getVar("TEST"), "Y") + + def test_remove_expansion_items(self): + self.d.setVar("TEST", "A B C D") + self.d.setVar("BAR", "B D") + self.d.setVar("TEST_remove", "${BAR}") + self.assertEqual(self.d.getVar("TEST"), "A C") + +class TestOverrides(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("OVERRIDES", "foo:bar:local") + self.d.setVar("TEST", "testvalue") + + def test_no_override(self): + self.assertEqual(self.d.getVar("TEST"), "testvalue") + + def test_one_override(self): + self.d.setVar("TEST_bar", "testvalue2") + self.assertEqual(self.d.getVar("TEST"), "testvalue2") + + def test_one_override_unset(self): + self.d.setVar("TEST2_bar", "testvalue2") + + self.assertEqual(self.d.getVar("TEST2"), "testvalue2") + self.assertCountEqual(list(self.d.keys()), ['TEST', 'TEST2', 'OVERRIDES', 'TEST2_bar']) + + def test_multiple_override(self): + self.d.setVar("TEST_bar", "testvalue2") + self.d.setVar("TEST_local", "testvalue3") + self.d.setVar("TEST_foo", "testvalue4") + self.assertEqual(self.d.getVar("TEST"), "testvalue3") + self.assertCountEqual(list(self.d.keys()), ['TEST', 'TEST_foo', 'OVERRIDES', 'TEST_bar', 'TEST_local']) + + def test_multiple_combined_overrides(self): + self.d.setVar("TEST_local_foo_bar", "testvalue3") + self.assertEqual(self.d.getVar("TEST"), "testvalue3") + + def test_multiple_overrides_unset(self): + self.d.setVar("TEST2_local_foo_bar", "testvalue3") + self.assertEqual(self.d.getVar("TEST2"), "testvalue3") + + def test_keyexpansion_override(self): + self.d.setVar("LOCAL", "local") + self.d.setVar("TEST_bar", "testvalue2") + self.d.setVar("TEST_${LOCAL}", "testvalue3") + self.d.setVar("TEST_foo", "testvalue4") + bb.data.expandKeys(self.d) + self.assertEqual(self.d.getVar("TEST"), "testvalue3") + + def test_rename_override(self): + self.d.setVar("ALTERNATIVE_ncurses-tools_class-target", "a") + self.d.setVar("OVERRIDES", "class-target") + self.d.renameVar("ALTERNATIVE_ncurses-tools", "ALTERNATIVE_lib32-ncurses-tools") + self.assertEqual(self.d.getVar("ALTERNATIVE_lib32-ncurses-tools"), "a") + + def test_underscore_override(self): + self.d.setVar("TEST_bar", "testvalue2") + self.d.setVar("TEST_some_val", "testvalue3") + self.d.setVar("TEST_foo", "testvalue4") + self.d.setVar("OVERRIDES", "foo:bar:some_val") + self.assertEqual(self.d.getVar("TEST"), "testvalue3") + +class TestKeyExpansion(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("FOO", "foo") + self.d.setVar("BAR", "foo") + + def test_keyexpand(self): + self.d.setVar("VAL_${FOO}", "A") + self.d.setVar("VAL_${BAR}", "B") + with LogRecord() as logs: + bb.data.expandKeys(self.d) + self.assertTrue(logContains("Variable key VAL_${FOO} (A) replaces original key VAL_foo (B)", logs)) + self.assertEqual(self.d.getVar("VAL_foo"), "A") + +class TestFlags(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("foo", "value of foo") + self.d.setVarFlag("foo", "flag1", "value of flag1") + self.d.setVarFlag("foo", "flag2", "value of flag2") + + def test_setflag(self): + self.assertEqual(self.d.getVarFlag("foo", "flag1", False), "value of flag1") + self.assertEqual(self.d.getVarFlag("foo", "flag2", False), "value of flag2") + + def test_delflag(self): + self.d.delVarFlag("foo", "flag2") + self.assertEqual(self.d.getVarFlag("foo", "flag1", False), "value of flag1") + self.assertEqual(self.d.getVarFlag("foo", "flag2", False), None) + + +class Contains(unittest.TestCase): + def setUp(self): + self.d = bb.data.init() + self.d.setVar("SOMEFLAG", "a b c") + + def test_contains(self): + self.assertTrue(bb.utils.contains("SOMEFLAG", "a", True, False, self.d)) + self.assertTrue(bb.utils.contains("SOMEFLAG", "b", True, False, self.d)) + self.assertTrue(bb.utils.contains("SOMEFLAG", "c", True, False, self.d)) + + self.assertTrue(bb.utils.contains("SOMEFLAG", "a b", True, False, self.d)) + self.assertTrue(bb.utils.contains("SOMEFLAG", "b c", True, False, self.d)) + self.assertTrue(bb.utils.contains("SOMEFLAG", "c a", True, False, self.d)) + + self.assertTrue(bb.utils.contains("SOMEFLAG", "a b c", True, False, self.d)) + self.assertTrue(bb.utils.contains("SOMEFLAG", "c b a", True, False, self.d)) + + self.assertFalse(bb.utils.contains("SOMEFLAG", "x", True, False, self.d)) + self.assertFalse(bb.utils.contains("SOMEFLAG", "a x", True, False, self.d)) + self.assertFalse(bb.utils.contains("SOMEFLAG", "x c b", True, False, self.d)) + self.assertFalse(bb.utils.contains("SOMEFLAG", "x c b a", True, False, self.d)) + + def test_contains_any(self): + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a", True, False, self.d)) + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "b", True, False, self.d)) + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "c", True, False, self.d)) + + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a b", True, False, self.d)) + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "b c", True, False, self.d)) + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "c a", True, False, self.d)) + + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "a x", True, False, self.d)) + self.assertTrue(bb.utils.contains_any("SOMEFLAG", "x c", True, False, self.d)) + + self.assertFalse(bb.utils.contains_any("SOMEFLAG", "x", True, False, self.d)) + self.assertFalse(bb.utils.contains_any("SOMEFLAG", "x y z", True, False, self.d)) + + +class Serialize(unittest.TestCase): + + def test_serialize(self): + import tempfile + import pickle + d = bb.data.init() + d.enableTracking() + d.setVar('HELLO', 'world') + d.setVarFlag('HELLO', 'other', 'planet') + with tempfile.NamedTemporaryFile(delete=False) as tmpfile: + tmpfilename = tmpfile.name + pickle.dump(d, tmpfile) + + with open(tmpfilename, 'rb') as f: + newd = pickle.load(f) + + os.remove(tmpfilename) + + self.assertEqual(d, newd) + self.assertEqual(newd.getVar('HELLO'), 'world') + self.assertEqual(newd.getVarFlag('HELLO', 'other'), 'planet') + + +# Remote datastore tests +# These really only test the interface, since in actual usage we have a +# tinfoil connector that does everything over RPC, and this doesn't test +# that. + +class TestConnector: + d = None + def __init__(self, d): + self.d = d + def getVar(self, name): + return self.d._findVar(name) + def getKeys(self): + return set(self.d.keys()) + def getVarHistory(self, name): + return self.d.varhistory.variable(name) + def expandPythonRef(self, varname, expr, d): + localdata = self.d.createCopy() + for key in d.localkeys(): + localdata.setVar(d.getVar(key)) + varparse = bb.data_smart.VariableParse(varname, localdata) + return varparse.python_sub(expr) + def setVar(self, name, value): + self.d.setVar(name, value) + def setVarFlag(self, name, flag, value): + self.d.setVarFlag(name, flag, value) + def delVar(self, name): + self.d.delVar(name) + return False + def delVarFlag(self, name, flag): + self.d.delVarFlag(name, flag) + return False + def renameVar(self, name, newname): + self.d.renameVar(name, newname) + return False + +class Remote(unittest.TestCase): + def test_remote(self): + + d1 = bb.data.init() + d1.enableTracking() + d2 = bb.data.init() + d2.enableTracking() + connector = TestConnector(d1) + + d2.setVar('_remote_data', connector) + + d1.setVar('HELLO', 'world') + d1.setVarFlag('OTHER', 'flagname', 'flagvalue') + self.assertEqual(d2.getVar('HELLO'), 'world') + self.assertEqual(d2.expand('${HELLO}'), 'world') + self.assertEqual(d2.expand('${@d.getVar("HELLO")}'), 'world') + self.assertIn('flagname', d2.getVarFlags('OTHER')) + self.assertEqual(d2.getVarFlag('OTHER', 'flagname'), 'flagvalue') + self.assertEqual(d1.varhistory.variable('HELLO'), d2.varhistory.variable('HELLO')) + # Test setVar on client side affects server + d2.setVar('HELLO', 'other-world') + self.assertEqual(d1.getVar('HELLO'), 'other-world') + # Test setVarFlag on client side affects server + d2.setVarFlag('HELLO', 'flagname', 'flagvalue') + self.assertEqual(d1.getVarFlag('HELLO', 'flagname'), 'flagvalue') + # Test client side data is incorporated in python expansion (which is done on server) + d2.setVar('FOO', 'bar') + self.assertEqual(d2.expand('${@d.getVar("FOO")}'), 'bar') + # Test overrides work + d1.setVar('FOO_test', 'baz') + d1.appendVar('OVERRIDES', ':test') + self.assertEqual(d2.getVar('FOO'), 'baz') + + +# Remote equivalents of local test classes +# Note that these aren't perfect since we only test in one direction + +class RemoteDataExpansions(DataExpansions): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1["foo"] = "value_of_foo" + self.d1["bar"] = "value_of_bar" + self.d1["value_of_foo"] = "value_of_'value_of_foo'" + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteNestedExpansions(TestNestedExpansions): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1["foo"] = "foo" + self.d1["bar"] = "bar" + self.d1["value_of_foobar"] = "187" + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteConcat(TestConcat): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1.setVar("FOO", "foo") + self.d1.setVar("VAL", "val") + self.d1.setVar("BAR", "bar") + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteConcatOverride(TestConcatOverride): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1.setVar("FOO", "foo") + self.d1.setVar("VAL", "val") + self.d1.setVar("BAR", "bar") + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteOverrides(TestOverrides): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1.setVar("OVERRIDES", "foo:bar:local") + self.d1.setVar("TEST", "testvalue") + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteKeyExpansion(TestKeyExpansion): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1.setVar("FOO", "foo") + self.d1.setVar("BAR", "foo") + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) + +class TestRemoteFlags(TestFlags): + def setUp(self): + self.d1 = bb.data.init() + self.d = bb.data.init() + self.d1.setVar("foo", "value of foo") + self.d1.setVarFlag("foo", "flag1", "value of flag1") + self.d1.setVarFlag("foo", "flag2", "value of flag2") + connector = TestConnector(self.d1) + self.d.setVar('_remote_data', connector) diff --git a/poky/bitbake/lib/bb/tests/event.py b/poky/bitbake/lib/bb/tests/event.py new file mode 100644 index 000000000..d3a5f6269 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/event.py @@ -0,0 +1,986 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Tests for the Event implementation (event.py) +# +# Copyright (C) 2017 Intel Corporation +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import bb +import logging +import bb.compat +import bb.event +import importlib +import threading +import time +import pickle +from unittest.mock import Mock +from unittest.mock import call +from bb.msg import BBLogFormatter + + +class EventQueueStubBase(object): + """ Base class for EventQueueStub classes """ + def __init__(self): + self.event_calls = [] + return + + def _store_event_data_string(self, event): + if isinstance(event, logging.LogRecord): + formatter = BBLogFormatter("%(levelname)s: %(message)s") + self.event_calls.append(formatter.format(event)) + else: + self.event_calls.append(bb.event.getName(event)) + return + + +class EventQueueStub(EventQueueStubBase): + """ Class used as specification for UI event handler queue stub objects """ + def __init__(self): + super(EventQueueStub, self).__init__() + + def send(self, event): + super(EventQueueStub, self)._store_event_data_string(event) + + +class PickleEventQueueStub(EventQueueStubBase): + """ Class used as specification for UI event handler queue stub objects + with sendpickle method """ + def __init__(self): + super(PickleEventQueueStub, self).__init__() + + def sendpickle(self, pickled_event): + event = pickle.loads(pickled_event) + super(PickleEventQueueStub, self)._store_event_data_string(event) + + +class UIClientStub(object): + """ Class used as specification for UI event handler stub objects """ + def __init__(self): + self.event = None + + +class EventHandlingTest(unittest.TestCase): + """ Event handling test class """ + + + def setUp(self): + self._test_process = Mock() + ui_client1 = UIClientStub() + ui_client2 = UIClientStub() + self._test_ui1 = Mock(wraps=ui_client1) + self._test_ui2 = Mock(wraps=ui_client2) + importlib.reload(bb.event) + + def _create_test_handlers(self): + """ Method used to create a test handler ordered dictionary """ + test_handlers = bb.compat.OrderedDict() + test_handlers["handler1"] = self._test_process.handler1 + test_handlers["handler2"] = self._test_process.handler2 + return test_handlers + + def test_class_handlers(self): + """ Test set_class_handlers and get_class_handlers methods """ + test_handlers = self._create_test_handlers() + bb.event.set_class_handlers(test_handlers) + self.assertEqual(test_handlers, + bb.event.get_class_handlers()) + + def test_handlers(self): + """ Test set_handlers and get_handlers """ + test_handlers = self._create_test_handlers() + bb.event.set_handlers(test_handlers) + self.assertEqual(test_handlers, + bb.event.get_handlers()) + + def test_clean_class_handlers(self): + """ Test clean_class_handlers method """ + cleanDict = bb.compat.OrderedDict() + self.assertEqual(cleanDict, + bb.event.clean_class_handlers()) + + def test_register(self): + """ Test register method for class handlers """ + result = bb.event.register("handler", self._test_process.handler) + self.assertEqual(result, bb.event.Registered) + handlers_dict = bb.event.get_class_handlers() + self.assertIn("handler", handlers_dict) + + def test_already_registered(self): + """ Test detection of an already registed class handler """ + bb.event.register("handler", self._test_process.handler) + handlers_dict = bb.event.get_class_handlers() + self.assertIn("handler", handlers_dict) + result = bb.event.register("handler", self._test_process.handler) + self.assertEqual(result, bb.event.AlreadyRegistered) + + def test_register_from_string(self): + """ Test register method receiving code in string """ + result = bb.event.register("string_handler", " return True") + self.assertEqual(result, bb.event.Registered) + handlers_dict = bb.event.get_class_handlers() + self.assertIn("string_handler", handlers_dict) + + def test_register_with_mask(self): + """ Test register method with event masking """ + mask = ["bb.event.OperationStarted", + "bb.event.OperationCompleted"] + result = bb.event.register("event_handler", + self._test_process.event_handler, + mask) + self.assertEqual(result, bb.event.Registered) + handlers_dict = bb.event.get_class_handlers() + self.assertIn("event_handler", handlers_dict) + + def test_remove(self): + """ Test remove method for class handlers """ + test_handlers = self._create_test_handlers() + bb.event.set_class_handlers(test_handlers) + count = len(test_handlers) + bb.event.remove("handler1", None) + test_handlers = bb.event.get_class_handlers() + self.assertEqual(len(test_handlers), count - 1) + with self.assertRaises(KeyError): + bb.event.remove("handler1", None) + + def test_execute_handler(self): + """ Test execute_handler method for class handlers """ + mask = ["bb.event.OperationProgress"] + result = bb.event.register("event_handler", + self._test_process.event_handler, + mask) + self.assertEqual(result, bb.event.Registered) + event = bb.event.OperationProgress(current=10, total=100) + bb.event.execute_handler("event_handler", + self._test_process.event_handler, + event, + None) + self._test_process.event_handler.assert_called_once_with(event) + + def test_fire_class_handlers(self): + """ Test fire_class_handlers method """ + mask = ["bb.event.OperationStarted"] + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + mask) + self.assertEqual(result, bb.event.Registered) + result = bb.event.register("event_handler2", + self._test_process.event_handler2, + "*") + self.assertEqual(result, bb.event.Registered) + event1 = bb.event.OperationStarted() + event2 = bb.event.OperationCompleted(total=123) + bb.event.fire_class_handlers(event1, None) + bb.event.fire_class_handlers(event2, None) + bb.event.fire_class_handlers(event2, None) + expected_event_handler1 = [call(event1)] + expected_event_handler2 = [call(event1), + call(event2), + call(event2)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected_event_handler1) + self.assertEqual(self._test_process.event_handler2.call_args_list, + expected_event_handler2) + + def test_class_handler_filters(self): + """ Test filters for class handlers """ + mask = ["bb.event.OperationStarted"] + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + mask) + self.assertEqual(result, bb.event.Registered) + result = bb.event.register("event_handler2", + self._test_process.event_handler2, + "*") + self.assertEqual(result, bb.event.Registered) + bb.event.set_eventfilter( + lambda name, handler, event, d : + name == 'event_handler2' and + bb.event.getName(event) == "OperationStarted") + event1 = bb.event.OperationStarted() + event2 = bb.event.OperationCompleted(total=123) + bb.event.fire_class_handlers(event1, None) + bb.event.fire_class_handlers(event2, None) + bb.event.fire_class_handlers(event2, None) + expected_event_handler1 = [] + expected_event_handler2 = [call(event1)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected_event_handler1) + self.assertEqual(self._test_process.event_handler2.call_args_list, + expected_event_handler2) + + def test_change_handler_event_mapping(self): + """ Test changing the event mapping for class handlers """ + event1 = bb.event.OperationStarted() + event2 = bb.event.OperationCompleted(total=123) + + # register handler for all events + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + "*") + self.assertEqual(result, bb.event.Registered) + bb.event.fire_class_handlers(event1, None) + bb.event.fire_class_handlers(event2, None) + expected = [call(event1), call(event2)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected) + + # unregister handler and register it only for OperationStarted + bb.event.remove("event_handler1", + self._test_process.event_handler1) + mask = ["bb.event.OperationStarted"] + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + mask) + self.assertEqual(result, bb.event.Registered) + bb.event.fire_class_handlers(event1, None) + bb.event.fire_class_handlers(event2, None) + expected = [call(event1), call(event2), call(event1)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected) + + # unregister handler and register it only for OperationCompleted + bb.event.remove("event_handler1", + self._test_process.event_handler1) + mask = ["bb.event.OperationCompleted"] + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + mask) + self.assertEqual(result, bb.event.Registered) + bb.event.fire_class_handlers(event1, None) + bb.event.fire_class_handlers(event2, None) + expected = [call(event1), call(event2), call(event1), call(event2)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected) + + def test_register_UIHhandler(self): + """ Test register_UIHhandler method """ + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + + def test_UIHhandler_already_registered(self): + """ Test registering an UIHhandler already existing """ + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 2) + + def test_unregister_UIHhandler(self): + """ Test unregister_UIHhandler method """ + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + result = bb.event.unregister_UIHhandler(1) + self.assertIs(result, None) + + def test_fire_ui_handlers(self): + """ Test fire_ui_handlers method """ + self._test_ui1.event = Mock(spec_set=EventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + self._test_ui2.event = Mock(spec_set=PickleEventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) + self.assertEqual(result, 2) + event1 = bb.event.OperationStarted() + bb.event.fire_ui_handlers(event1, None) + expected = [call(event1)] + self.assertEqual(self._test_ui1.event.send.call_args_list, + expected) + expected = [call(pickle.dumps(event1))] + self.assertEqual(self._test_ui2.event.sendpickle.call_args_list, + expected) + + def test_ui_handler_mask_filter(self): + """ Test filters for UI handlers """ + mask = ["bb.event.OperationStarted"] + debug_domains = {} + self._test_ui1.event = Mock(spec_set=EventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask) + self._test_ui2.event = Mock(spec_set=PickleEventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) + bb.event.set_UIHmask(result, logging.INFO, debug_domains, mask) + + event1 = bb.event.OperationStarted() + event2 = bb.event.OperationCompleted(total=1) + + bb.event.fire_ui_handlers(event1, None) + bb.event.fire_ui_handlers(event2, None) + expected = [call(event1)] + self.assertEqual(self._test_ui1.event.send.call_args_list, + expected) + expected = [call(pickle.dumps(event1))] + self.assertEqual(self._test_ui2.event.sendpickle.call_args_list, + expected) + + def test_ui_handler_log_filter(self): + """ Test log filters for UI handlers """ + mask = ["*"] + debug_domains = {'BitBake.Foo': logging.WARNING} + + self._test_ui1.event = EventQueueStub() + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask) + self._test_ui2.event = PickleEventQueueStub() + result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) + bb.event.set_UIHmask(result, logging.ERROR, debug_domains, mask) + + event1 = bb.event.OperationStarted() + bb.event.fire_ui_handlers(event1, None) # All events match + + event_log_handler = bb.event.LogHandler() + logger = logging.getLogger("BitBake") + logger.addHandler(event_log_handler) + logger1 = logging.getLogger("BitBake.Foo") + logger1.warning("Test warning LogRecord1") # Matches debug_domains level + logger1.info("Test info LogRecord") # Filtered out + logger2 = logging.getLogger("BitBake.Bar") + logger2.error("Test error LogRecord") # Matches filter base level + logger2.warning("Test warning LogRecord2") # Filtered out + logger.removeHandler(event_log_handler) + + expected = ['OperationStarted', + 'WARNING: Test warning LogRecord1', + 'ERROR: Test error LogRecord'] + self.assertEqual(self._test_ui1.event.event_calls, expected) + self.assertEqual(self._test_ui2.event.event_calls, expected) + + def test_fire(self): + """ Test fire method used to trigger class and ui event handlers """ + mask = ["bb.event.ConfigParsed"] + result = bb.event.register("event_handler1", + self._test_process.event_handler1, + mask) + + self._test_ui1.event = Mock(spec_set=EventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + + event1 = bb.event.ConfigParsed() + bb.event.fire(event1, None) + expected = [call(event1)] + self.assertEqual(self._test_process.event_handler1.call_args_list, + expected) + self.assertEqual(self._test_ui1.event.send.call_args_list, + expected) + + def test_fire_from_worker(self): + """ Test fire_from_worker method """ + self._test_ui1.event = Mock(spec_set=EventQueueStub) + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + event1 = bb.event.ConfigParsed() + bb.event.fire_from_worker(event1, None) + expected = [call(event1)] + self.assertEqual(self._test_ui1.event.send.call_args_list, + expected) + + def test_worker_fire(self): + """ Test the triggering of bb.event.worker_fire callback """ + bb.event.worker_fire = Mock() + event = bb.event.Event() + bb.event.fire(event, None) + expected = [call(event, None)] + self.assertEqual(bb.event.worker_fire.call_args_list, expected) + + def test_print_ui_queue(self): + """ Test print_ui_queue method """ + event1 = bb.event.OperationStarted() + event2 = bb.event.OperationCompleted(total=123) + bb.event.fire(event1, None) + bb.event.fire(event2, None) + event_log_handler = bb.event.LogHandler() + logger = logging.getLogger("BitBake") + logger.addHandler(event_log_handler) + logger.info("Test info LogRecord") + logger.warning("Test warning LogRecord") + with self.assertLogs("BitBake", level="INFO") as cm: + bb.event.print_ui_queue() + logger.removeHandler(event_log_handler) + self.assertEqual(cm.output, + ["INFO:BitBake:Test info LogRecord", + "WARNING:BitBake:Test warning LogRecord"]) + + def _set_threadlock_test_mockups(self): + """ Create UI event handler mockups used in enable and disable + threadlock tests """ + def ui1_event_send(event): + if type(event) is bb.event.ConfigParsed: + self._threadlock_test_calls.append("w1_ui1") + if type(event) is bb.event.OperationStarted: + self._threadlock_test_calls.append("w2_ui1") + time.sleep(2) + + def ui2_event_send(event): + if type(event) is bb.event.ConfigParsed: + self._threadlock_test_calls.append("w1_ui2") + if type(event) is bb.event.OperationStarted: + self._threadlock_test_calls.append("w2_ui2") + time.sleep(2) + + self._threadlock_test_calls = [] + self._test_ui1.event = EventQueueStub() + self._test_ui1.event.send = ui1_event_send + result = bb.event.register_UIHhandler(self._test_ui1, mainui=True) + self.assertEqual(result, 1) + self._test_ui2.event = EventQueueStub() + self._test_ui2.event.send = ui2_event_send + result = bb.event.register_UIHhandler(self._test_ui2, mainui=True) + self.assertEqual(result, 2) + + def _set_and_run_threadlock_test_workers(self): + """ Create and run the workers used to trigger events in enable and + disable threadlock tests """ + worker1 = threading.Thread(target=self._thread_lock_test_worker1) + worker2 = threading.Thread(target=self._thread_lock_test_worker2) + worker1.start() + time.sleep(1) + worker2.start() + worker1.join() + worker2.join() + + def _thread_lock_test_worker1(self): + """ First worker used to fire the ConfigParsed event for enable and + disable threadlocks tests """ + bb.event.fire(bb.event.ConfigParsed(), None) + + def _thread_lock_test_worker2(self): + """ Second worker used to fire the OperationStarted event for enable + and disable threadlocks tests """ + bb.event.fire(bb.event.OperationStarted(), None) + + def test_enable_threadlock(self): + """ Test enable_threadlock method """ + self._set_threadlock_test_mockups() + bb.event.enable_threadlock() + self._set_and_run_threadlock_test_workers() + # Calls to UI handlers should be in order as all the registered + # handlers for the event coming from the first worker should be + # called before processing the event from the second worker. + self.assertEqual(self._threadlock_test_calls, + ["w1_ui1", "w1_ui2", "w2_ui1", "w2_ui2"]) + + + def test_disable_threadlock(self): + """ Test disable_threadlock method """ + self._set_threadlock_test_mockups() + bb.event.disable_threadlock() + self._set_and_run_threadlock_test_workers() + # Calls to UI handlers should be intertwined together. Thanks to the + # delay in the registered handlers for the event coming from the first + # worker, the event coming from the second worker starts being + # processed before finishing handling the first worker event. + self.assertEqual(self._threadlock_test_calls, + ["w1_ui1", "w2_ui1", "w1_ui2", "w2_ui2"]) + + +class EventClassesTest(unittest.TestCase): + """ Event classes test class """ + + _worker_pid = 54321 + + def setUp(self): + bb.event.worker_pid = EventClassesTest._worker_pid + + def test_Event(self): + """ Test the Event base class """ + event = bb.event.Event() + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_HeartbeatEvent(self): + """ Test the HeartbeatEvent class """ + time = 10 + event = bb.event.HeartbeatEvent(time) + self.assertEqual(event.time, time) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_OperationStarted(self): + """ Test OperationStarted event class """ + msg = "Foo Bar" + event = bb.event.OperationStarted(msg) + self.assertEqual(event.msg, msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_OperationCompleted(self): + """ Test OperationCompleted event class """ + msg = "Foo Bar" + total = 123 + event = bb.event.OperationCompleted(total, msg) + self.assertEqual(event.msg, msg) + self.assertEqual(event.total, total) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_OperationProgress(self): + """ Test OperationProgress event class """ + msg = "Foo Bar" + total = 123 + current = 111 + event = bb.event.OperationProgress(current, total, msg) + self.assertEqual(event.msg, msg + ": %s/%s" % (current, total)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ConfigParsed(self): + """ Test the ConfigParsed class """ + event = bb.event.ConfigParsed() + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_MultiConfigParsed(self): + """ Test MultiConfigParsed event class """ + mcdata = {"foobar": "Foo Bar"} + event = bb.event.MultiConfigParsed(mcdata) + self.assertEqual(event.mcdata, mcdata) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_RecipeEvent(self): + """ Test RecipeEvent event base class """ + callback = lambda a: 2 * a + event = bb.event.RecipeEvent(callback) + self.assertEqual(event.fn(1), callback(1)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_RecipePreFinalise(self): + """ Test RecipePreFinalise event class """ + callback = lambda a: 2 * a + event = bb.event.RecipePreFinalise(callback) + self.assertEqual(event.fn(1), callback(1)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_RecipeTaskPreProcess(self): + """ Test RecipeTaskPreProcess event class """ + callback = lambda a: 2 * a + tasklist = [("foobar", callback)] + event = bb.event.RecipeTaskPreProcess(callback, tasklist) + self.assertEqual(event.fn(1), callback(1)) + self.assertEqual(event.tasklist, tasklist) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_RecipeParsed(self): + """ Test RecipeParsed event base class """ + callback = lambda a: 2 * a + event = bb.event.RecipeParsed(callback) + self.assertEqual(event.fn(1), callback(1)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_StampUpdate(self): + targets = ["foo", "bar"] + stampfns = [lambda:"foobar"] + event = bb.event.StampUpdate(targets, stampfns) + self.assertEqual(event.targets, targets) + self.assertEqual(event.stampPrefix, stampfns) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_BuildBase(self): + """ Test base class for bitbake build events """ + name = "foo" + pkgs = ["bar"] + failures = 123 + event = bb.event.BuildBase(name, pkgs, failures) + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), failures) + name = event.name = "bar" + pkgs = event.pkgs = ["foo"] + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), failures) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_BuildInit(self): + """ Test class for bitbake build invocation events """ + event = bb.event.BuildInit() + self.assertEqual(event.name, None) + self.assertEqual(event.pkgs, []) + self.assertEqual(event.getFailures(), 0) + name = event.name = "bar" + pkgs = event.pkgs = ["foo"] + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), 0) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_BuildStarted(self): + """ Test class for build started events """ + name = "foo" + pkgs = ["bar"] + failures = 123 + event = bb.event.BuildStarted(name, pkgs, failures) + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), failures) + self.assertEqual(event.msg, "Building Started") + name = event.name = "bar" + pkgs = event.pkgs = ["foo"] + msg = event.msg = "foobar" + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), failures) + self.assertEqual(event.msg, msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_BuildCompleted(self): + """ Test class for build completed events """ + total = 1000 + name = "foo" + pkgs = ["bar"] + failures = 123 + interrupted = 1 + event = bb.event.BuildCompleted(total, name, pkgs, failures, + interrupted) + self.assertEqual(event.name, name) + self.assertEqual(event.pkgs, pkgs) + self.assertEqual(event.getFailures(), failures) + self.assertEqual(event.msg, "Building Failed") + event2 = bb.event.BuildCompleted(total, name, pkgs) + self.assertEqual(event2.name, name) + self.assertEqual(event2.pkgs, pkgs) + self.assertEqual(event2.getFailures(), 0) + self.assertEqual(event2.msg, "Building Succeeded") + self.assertEqual(event2.pid, EventClassesTest._worker_pid) + + def test_DiskFull(self): + """ Test DiskFull event class """ + dev = "/dev/foo" + type = "ext4" + freespace = "104M" + mountpoint = "/" + event = bb.event.DiskFull(dev, type, freespace, mountpoint) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_MonitorDiskEvent(self): + """ Test MonitorDiskEvent class """ + available_bytes = 10000000 + free_bytes = 90000000 + total_bytes = 1000000000 + du = bb.event.DiskUsageSample(available_bytes, free_bytes, + total_bytes) + event = bb.event.MonitorDiskEvent(du) + self.assertEqual(event.disk_usage.available_bytes, available_bytes) + self.assertEqual(event.disk_usage.free_bytes, free_bytes) + self.assertEqual(event.disk_usage.total_bytes, total_bytes) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_NoProvider(self): + """ Test NoProvider event class """ + item = "foobar" + event1 = bb.event.NoProvider(item) + self.assertEqual(event1.getItem(), item) + self.assertEqual(event1.isRuntime(), False) + self.assertEqual(str(event1), "Nothing PROVIDES 'foobar'") + runtime = True + dependees = ["foo", "bar"] + reasons = None + close_matches = ["foibar", "footbar"] + event2 = bb.event.NoProvider(item, runtime, dependees, reasons, + close_matches) + self.assertEqual(event2.isRuntime(), True) + expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS" + " on or otherwise requires it). Close matches:\n" + " foibar\n" + " footbar") + self.assertEqual(str(event2), expected) + reasons = ["Item does not exist on database"] + close_matches = ["foibar", "footbar"] + event3 = bb.event.NoProvider(item, runtime, dependees, reasons, + close_matches) + expected = ("Nothing RPROVIDES 'foobar' (but foo, bar RDEPENDS" + " on or otherwise requires it)\n" + "Item does not exist on database") + self.assertEqual(str(event3), expected) + self.assertEqual(event3.pid, EventClassesTest._worker_pid) + + def test_MultipleProviders(self): + """ Test MultipleProviders event class """ + item = "foobar" + candidates = ["foobarv1", "foobars"] + event1 = bb.event.MultipleProviders(item, candidates) + self.assertEqual(event1.isRuntime(), False) + self.assertEqual(event1.getItem(), item) + self.assertEqual(event1.getCandidates(), candidates) + expected = ("Multiple providers are available for foobar (foobarv1," + " foobars)\n" + "Consider defining a PREFERRED_PROVIDER entry to match " + "foobar") + self.assertEqual(str(event1), expected) + runtime = True + event2 = bb.event.MultipleProviders(item, candidates, runtime) + self.assertEqual(event2.isRuntime(), runtime) + expected = ("Multiple providers are available for runtime foobar " + "(foobarv1, foobars)\n" + "Consider defining a PREFERRED_RPROVIDER entry to match " + "foobar") + self.assertEqual(str(event2), expected) + self.assertEqual(event2.pid, EventClassesTest._worker_pid) + + def test_ParseStarted(self): + """ Test ParseStarted event class """ + total = 123 + event = bb.event.ParseStarted(total) + self.assertEqual(event.msg, "Recipe parsing Started") + self.assertEqual(event.total, total) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ParseCompleted(self): + """ Test ParseCompleted event class """ + cached = 10 + parsed = 13 + skipped = 7 + virtuals = 2 + masked = 1 + errors = 0 + total = 23 + event = bb.event.ParseCompleted(cached, parsed, skipped, masked, + virtuals, errors, total) + self.assertEqual(event.msg, "Recipe parsing Completed") + expected = [cached, parsed, skipped, virtuals, masked, errors, + cached + parsed, total] + actual = [event.cached, event.parsed, event.skipped, event.virtuals, + event.masked, event.errors, event.sofar, event.total] + self.assertEqual(str(actual), str(expected)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ParseProgress(self): + """ Test ParseProgress event class """ + current = 10 + total = 100 + event = bb.event.ParseProgress(current, total) + self.assertEqual(event.msg, + "Recipe parsing" + ": %s/%s" % (current, total)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_CacheLoadStarted(self): + """ Test CacheLoadStarted event class """ + total = 123 + event = bb.event.CacheLoadStarted(total) + self.assertEqual(event.msg, "Loading cache Started") + self.assertEqual(event.total, total) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_CacheLoadProgress(self): + """ Test CacheLoadProgress event class """ + current = 10 + total = 100 + event = bb.event.CacheLoadProgress(current, total) + self.assertEqual(event.msg, + "Loading cache" + ": %s/%s" % (current, total)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_CacheLoadCompleted(self): + """ Test CacheLoadCompleted event class """ + total = 23 + num_entries = 12 + event = bb.event.CacheLoadCompleted(total, num_entries) + self.assertEqual(event.msg, "Loading cache Completed") + expected = [total, num_entries] + actual = [event.total, event.num_entries] + self.assertEqual(str(actual), str(expected)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_TreeDataPreparationStarted(self): + """ Test TreeDataPreparationStarted event class """ + event = bb.event.TreeDataPreparationStarted() + self.assertEqual(event.msg, "Preparing tree data Started") + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_TreeDataPreparationProgress(self): + """ Test TreeDataPreparationProgress event class """ + current = 10 + total = 100 + event = bb.event.TreeDataPreparationProgress(current, total) + self.assertEqual(event.msg, + "Preparing tree data" + ": %s/%s" % (current, total)) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_TreeDataPreparationCompleted(self): + """ Test TreeDataPreparationCompleted event class """ + total = 23 + event = bb.event.TreeDataPreparationCompleted(total) + self.assertEqual(event.msg, "Preparing tree data Completed") + self.assertEqual(event.total, total) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_DepTreeGenerated(self): + """ Test DepTreeGenerated event class """ + depgraph = Mock() + event = bb.event.DepTreeGenerated(depgraph) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_TargetsTreeGenerated(self): + """ Test TargetsTreeGenerated event class """ + model = Mock() + event = bb.event.TargetsTreeGenerated(model) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ReachableStamps(self): + """ Test ReachableStamps event class """ + stamps = [Mock(), Mock()] + event = bb.event.ReachableStamps(stamps) + self.assertEqual(event.stamps, stamps) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_FilesMatchingFound(self): + """ Test FilesMatchingFound event class """ + pattern = "foo.*bar" + matches = ["foobar"] + event = bb.event.FilesMatchingFound(pattern, matches) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ConfigFilesFound(self): + """ Test ConfigFilesFound event class """ + variable = "FOO_BAR" + values = ["foo", "bar"] + event = bb.event.ConfigFilesFound(variable, values) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ConfigFilePathFound(self): + """ Test ConfigFilePathFound event class """ + path = "/foo/bar" + event = bb.event.ConfigFilePathFound(path) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_message_classes(self): + """ Test message event classes """ + msg = "foobar foo bar" + event = bb.event.MsgBase(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgDebug(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgNote(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgWarn(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgError(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgFatal(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + event = bb.event.MsgPlain(msg) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_LogExecTTY(self): + """ Test LogExecTTY event class """ + msg = "foo bar" + prog = "foo.sh" + sleep_delay = 10 + retries = 3 + event = bb.event.LogExecTTY(msg, prog, sleep_delay, retries) + self.assertEqual(event.msg, msg) + self.assertEqual(event.prog, prog) + self.assertEqual(event.sleep_delay, sleep_delay) + self.assertEqual(event.retries, retries) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def _throw_zero_division_exception(self): + a = 1 / 0 + return + + def _worker_handler(self, event, d): + self._returned_event = event + return + + def test_LogHandler(self): + """ Test LogHandler class """ + logger = logging.getLogger("TestEventClasses") + logger.propagate = False + handler = bb.event.LogHandler(logging.INFO) + logger.addHandler(handler) + bb.event.worker_fire = self._worker_handler + try: + self._throw_zero_division_exception() + except ZeroDivisionError as ex: + logger.exception(ex) + event = self._returned_event + try: + pe = pickle.dumps(event) + newevent = pickle.loads(pe) + except: + self.fail('Logged event is not serializable') + self.assertEqual(event.taskpid, EventClassesTest._worker_pid) + + def test_MetadataEvent(self): + """ Test MetadataEvent class """ + eventtype = "footype" + eventdata = {"foo": "bar"} + event = bb.event.MetadataEvent(eventtype, eventdata) + self.assertEqual(event.type, eventtype) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ProcessStarted(self): + """ Test ProcessStarted class """ + processname = "foo" + total = 9783128974 + event = bb.event.ProcessStarted(processname, total) + self.assertEqual(event.processname, processname) + self.assertEqual(event.total, total) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ProcessProgress(self): + """ Test ProcessProgress class """ + processname = "foo" + progress = 243224 + event = bb.event.ProcessProgress(processname, progress) + self.assertEqual(event.processname, processname) + self.assertEqual(event.progress, progress) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_ProcessFinished(self): + """ Test ProcessFinished class """ + processname = "foo" + total = 1242342344 + event = bb.event.ProcessFinished(processname) + self.assertEqual(event.processname, processname) + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_SanityCheck(self): + """ Test SanityCheck class """ + event1 = bb.event.SanityCheck() + self.assertEqual(event1.generateevents, True) + self.assertEqual(event1.pid, EventClassesTest._worker_pid) + generateevents = False + event2 = bb.event.SanityCheck(generateevents) + self.assertEqual(event2.generateevents, generateevents) + self.assertEqual(event2.pid, EventClassesTest._worker_pid) + + def test_SanityCheckPassed(self): + """ Test SanityCheckPassed class """ + event = bb.event.SanityCheckPassed() + self.assertEqual(event.pid, EventClassesTest._worker_pid) + + def test_SanityCheckFailed(self): + """ Test SanityCheckFailed class """ + msg = "The sanity test failed." + event1 = bb.event.SanityCheckFailed(msg) + self.assertEqual(event1.pid, EventClassesTest._worker_pid) + network_error = True + event2 = bb.event.SanityCheckFailed(msg, network_error) + self.assertEqual(event2.pid, EventClassesTest._worker_pid) + + def test_network_event_classes(self): + """ Test network event classes """ + event1 = bb.event.NetworkTest() + generateevents = False + self.assertEqual(event1.pid, EventClassesTest._worker_pid) + event2 = bb.event.NetworkTest(generateevents) + self.assertEqual(event2.pid, EventClassesTest._worker_pid) + event3 = bb.event.NetworkTestPassed() + self.assertEqual(event3.pid, EventClassesTest._worker_pid) + event4 = bb.event.NetworkTestFailed() + self.assertEqual(event4.pid, EventClassesTest._worker_pid) + + def test_FindSigInfoResult(self): + """ Test FindSigInfoResult event class """ + result = [Mock()] + event = bb.event.FindSigInfoResult(result) + self.assertEqual(event.result, result) + self.assertEqual(event.pid, EventClassesTest._worker_pid) diff --git a/poky/bitbake/lib/bb/tests/fetch.py b/poky/bitbake/lib/bb/tests/fetch.py new file mode 100644 index 000000000..74859f9d3 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/fetch.py @@ -0,0 +1,1573 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Tests for the Fetcher (fetch2/) +# +# Copyright (C) 2012 Richard Purdie +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import hashlib +import tempfile +import subprocess +import collections +import os +from bb.fetch2 import URI +from bb.fetch2 import FetchMethod +import bb + +def skipIfNoNetwork(): + if os.environ.get("BB_SKIP_NETTESTS") == "yes": + return unittest.skip("Network tests being skipped") + return lambda f: f + +class URITest(unittest.TestCase): + test_uris = { + "http://www.google.com/index.html" : { + 'uri': 'http://www.google.com/index.html', + 'scheme': 'http', + 'hostname': 'www.google.com', + 'port': None, + 'hostport': 'www.google.com', + 'path': '/index.html', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': False + }, + "http://www.google.com/index.html;param1=value1" : { + 'uri': 'http://www.google.com/index.html;param1=value1', + 'scheme': 'http', + 'hostname': 'www.google.com', + 'port': None, + 'hostport': 'www.google.com', + 'path': '/index.html', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': { + 'param1': 'value1' + }, + 'query': {}, + 'relative': False + }, + "http://www.example.org/index.html?param1=value1" : { + 'uri': 'http://www.example.org/index.html?param1=value1', + 'scheme': 'http', + 'hostname': 'www.example.org', + 'port': None, + 'hostport': 'www.example.org', + 'path': '/index.html', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': { + 'param1': 'value1' + }, + 'relative': False + }, + "http://www.example.org/index.html?qparam1=qvalue1;param2=value2" : { + 'uri': 'http://www.example.org/index.html?qparam1=qvalue1;param2=value2', + 'scheme': 'http', + 'hostname': 'www.example.org', + 'port': None, + 'hostport': 'www.example.org', + 'path': '/index.html', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': { + 'param2': 'value2' + }, + 'query': { + 'qparam1': 'qvalue1' + }, + 'relative': False + }, + "http://www.example.com:8080/index.html" : { + 'uri': 'http://www.example.com:8080/index.html', + 'scheme': 'http', + 'hostname': 'www.example.com', + 'port': 8080, + 'hostport': 'www.example.com:8080', + 'path': '/index.html', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': False + }, + "cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg" : { + 'uri': 'cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg', + 'scheme': 'cvs', + 'hostname': 'cvs.handhelds.org', + 'port': None, + 'hostport': 'cvs.handhelds.org', + 'path': '/cvs', + 'userinfo': 'anoncvs', + 'username': 'anoncvs', + 'password': '', + 'params': { + 'module': 'familiar/dist/ipkg' + }, + 'query': {}, + 'relative': False + }, + "cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg": { + 'uri': 'cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg', + 'scheme': 'cvs', + 'hostname': 'cvs.handhelds.org', + 'port': None, + 'hostport': 'cvs.handhelds.org', + 'path': '/cvs', + 'userinfo': 'anoncvs:anonymous', + 'username': 'anoncvs', + 'password': 'anonymous', + 'params': collections.OrderedDict([ + ('tag', 'V0-99-81'), + ('module', 'familiar/dist/ipkg') + ]), + 'query': {}, + 'relative': False + }, + "file://example.diff": { # NOTE: Not RFC compliant! + 'uri': 'file:example.diff', + 'scheme': 'file', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': 'example.diff', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': True + }, + "file:example.diff": { # NOTE: RFC compliant version of the former + 'uri': 'file:example.diff', + 'scheme': 'file', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': 'example.diff', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': True + }, + "file:///tmp/example.diff": { + 'uri': 'file:///tmp/example.diff', + 'scheme': 'file', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': '/tmp/example.diff', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': False + }, + "git:///path/example.git": { + 'uri': 'git:///path/example.git', + 'scheme': 'git', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': '/path/example.git', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': False + }, + "git:path/example.git": { + 'uri': 'git:path/example.git', + 'scheme': 'git', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': 'path/example.git', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': True + }, + "git://example.net/path/example.git": { + 'uri': 'git://example.net/path/example.git', + 'scheme': 'git', + 'hostname': 'example.net', + 'port': None, + 'hostport': 'example.net', + 'path': '/path/example.git', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {}, + 'query': {}, + 'relative': False + }, + "http://somesite.net;someparam=1": { + 'uri': 'http://somesite.net;someparam=1', + 'scheme': 'http', + 'hostname': 'somesite.net', + 'port': None, + 'hostport': 'somesite.net', + 'path': '', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {"someparam" : "1"}, + 'query': {}, + 'relative': False + }, + "file://somelocation;someparam=1": { + 'uri': 'file:somelocation;someparam=1', + 'scheme': 'file', + 'hostname': '', + 'port': None, + 'hostport': '', + 'path': 'somelocation', + 'userinfo': '', + 'userinfo': '', + 'username': '', + 'password': '', + 'params': {"someparam" : "1"}, + 'query': {}, + 'relative': True + } + + } + + def test_uri(self): + for test_uri, ref in self.test_uris.items(): + uri = URI(test_uri) + + self.assertEqual(str(uri), ref['uri']) + + # expected attributes + self.assertEqual(uri.scheme, ref['scheme']) + + self.assertEqual(uri.userinfo, ref['userinfo']) + self.assertEqual(uri.username, ref['username']) + self.assertEqual(uri.password, ref['password']) + + self.assertEqual(uri.hostname, ref['hostname']) + self.assertEqual(uri.port, ref['port']) + self.assertEqual(uri.hostport, ref['hostport']) + + self.assertEqual(uri.path, ref['path']) + self.assertEqual(uri.params, ref['params']) + + self.assertEqual(uri.relative, ref['relative']) + + def test_dict(self): + for test in self.test_uris.values(): + uri = URI() + + self.assertEqual(uri.scheme, '') + self.assertEqual(uri.userinfo, '') + self.assertEqual(uri.username, '') + self.assertEqual(uri.password, '') + self.assertEqual(uri.hostname, '') + self.assertEqual(uri.port, None) + self.assertEqual(uri.path, '') + self.assertEqual(uri.params, {}) + + + uri.scheme = test['scheme'] + self.assertEqual(uri.scheme, test['scheme']) + + uri.userinfo = test['userinfo'] + self.assertEqual(uri.userinfo, test['userinfo']) + self.assertEqual(uri.username, test['username']) + self.assertEqual(uri.password, test['password']) + + # make sure changing the values doesn't do anything unexpected + uri.username = 'changeme' + self.assertEqual(uri.username, 'changeme') + self.assertEqual(uri.password, test['password']) + uri.password = 'insecure' + self.assertEqual(uri.username, 'changeme') + self.assertEqual(uri.password, 'insecure') + + # reset back after our trickery + uri.userinfo = test['userinfo'] + self.assertEqual(uri.userinfo, test['userinfo']) + self.assertEqual(uri.username, test['username']) + self.assertEqual(uri.password, test['password']) + + uri.hostname = test['hostname'] + self.assertEqual(uri.hostname, test['hostname']) + self.assertEqual(uri.hostport, test['hostname']) + + uri.port = test['port'] + self.assertEqual(uri.port, test['port']) + self.assertEqual(uri.hostport, test['hostport']) + + uri.path = test['path'] + self.assertEqual(uri.path, test['path']) + + uri.params = test['params'] + self.assertEqual(uri.params, test['params']) + + uri.query = test['query'] + self.assertEqual(uri.query, test['query']) + + self.assertEqual(str(uri), test['uri']) + + uri.params = {} + self.assertEqual(uri.params, {}) + self.assertEqual(str(uri), (str(uri).split(";"))[0]) + +class FetcherTest(unittest.TestCase): + + def setUp(self): + self.origdir = os.getcwd() + self.d = bb.data.init() + self.tempdir = tempfile.mkdtemp() + self.dldir = os.path.join(self.tempdir, "download") + os.mkdir(self.dldir) + self.d.setVar("DL_DIR", self.dldir) + self.unpackdir = os.path.join(self.tempdir, "unpacked") + os.mkdir(self.unpackdir) + persistdir = os.path.join(self.tempdir, "persistdata") + self.d.setVar("PERSISTENT_DIR", persistdir) + + def tearDown(self): + os.chdir(self.origdir) + if os.environ.get("BB_TMPDIR_NOCLEAN") == "yes": + print("Not cleaning up %s. Please remove manually." % self.tempdir) + else: + bb.utils.prunedir(self.tempdir) + +class MirrorUriTest(FetcherTest): + + replaceuris = { + ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "http://somewhere.org/somedir/") + : "http://somewhere.org/somedir/git2_git.invalid.infradead.org.mtd-utils.git.tar.gz", + ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/somedir/\\2;protocol=http") + : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/somedir/\\2;protocol=http") + : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/([^/]+/)*([^/]*)", "git://somewhere.org/\\2;protocol=http") + : "git://somewhere.org/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + ("git://someserver.org/bitbake;tag=1234567890123456789012345678901234567890", "git://someserver.org/bitbake", "git://git.openembedded.org/bitbake") + : "git://git.openembedded.org/bitbake;tag=1234567890123456789012345678901234567890", + ("file://sstate-xyz.tgz", "file://.*", "file:///somewhere/1234/sstate-cache") + : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz", + ("file://sstate-xyz.tgz", "file://.*", "file:///somewhere/1234/sstate-cache/") + : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz", + ("http://somewhere.org/somedir1/somedir2/somefile_1.2.3.tar.gz", "http://.*/.*", "http://somewhere2.org/somedir3") + : "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz", + ("http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz") + : "http://somewhere2.org/somedir3/somefile_1.2.3.tar.gz", + ("http://www.apache.org/dist/subversion/subversion-1.7.1.tar.bz2", "http://www.apache.org/dist", "http://archive.apache.org/dist") + : "http://archive.apache.org/dist/subversion/subversion-1.7.1.tar.bz2", + ("http://www.apache.org/dist/subversion/subversion-1.7.1.tar.bz2", "http://.*/.*", "file:///somepath/downloads/") + : "file:///somepath/downloads/subversion-1.7.1.tar.bz2", + ("git://git.invalid.infradead.org/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/BASENAME;protocol=http") + : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/BASENAME;protocol=http") + : "git://somewhere.org/somedir/mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + ("git://git.invalid.infradead.org/foo/mtd-utils.git;tag=1234567890123456789012345678901234567890", "git://.*/.*", "git://somewhere.org/somedir/MIRRORNAME;protocol=http") + : "git://somewhere.org/somedir/git.invalid.infradead.org.foo.mtd-utils.git;tag=1234567890123456789012345678901234567890;protocol=http", + + #Renaming files doesn't work + #("http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere.org/somedir1/somefile_1.2.3.tar.gz", "http://somewhere2.org/somedir3/somefile_2.3.4.tar.gz") : "http://somewhere2.org/somedir3/somefile_2.3.4.tar.gz" + #("file://sstate-xyz.tgz", "file://.*/.*", "file:///somewhere/1234/sstate-cache") : "file:///somewhere/1234/sstate-cache/sstate-xyz.tgz", + } + + mirrorvar = "http://.*/.* file:///somepath/downloads/ \n" \ + "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n" \ + "https://.*/.* file:///someotherpath/downloads/ \n" \ + "http://.*/.* file:///someotherpath/downloads/ \n" + + def test_urireplace(self): + for k, v in self.replaceuris.items(): + ud = bb.fetch.FetchData(k[0], self.d) + ud.setup_localpath(self.d) + mirrors = bb.fetch2.mirror_from_string("%s %s" % (k[1], k[2])) + newuris, uds = bb.fetch2.build_mirroruris(ud, mirrors, self.d) + self.assertEqual([v], newuris) + + def test_urilist1(self): + fetcher = bb.fetch.FetchData("http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d) + mirrors = bb.fetch2.mirror_from_string(self.mirrorvar) + uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d) + self.assertEqual(uris, ['file:///somepath/downloads/bitbake-1.0.tar.gz', 'file:///someotherpath/downloads/bitbake-1.0.tar.gz']) + + def test_urilist2(self): + # Catch https:// -> files:// bug + fetcher = bb.fetch.FetchData("https://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d) + mirrors = bb.fetch2.mirror_from_string(self.mirrorvar) + uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d) + self.assertEqual(uris, ['file:///someotherpath/downloads/bitbake-1.0.tar.gz']) + + def test_mirror_of_mirror(self): + # Test if mirror of a mirror works + mirrorvar = self.mirrorvar + " http://.*/.* http://otherdownloads.yoctoproject.org/downloads/ \n" + mirrorvar = mirrorvar + " http://otherdownloads.yoctoproject.org/.* http://downloads2.yoctoproject.org/downloads/ \n" + fetcher = bb.fetch.FetchData("http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d) + mirrors = bb.fetch2.mirror_from_string(mirrorvar) + uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d) + self.assertEqual(uris, ['file:///somepath/downloads/bitbake-1.0.tar.gz', + 'file:///someotherpath/downloads/bitbake-1.0.tar.gz', + 'http://otherdownloads.yoctoproject.org/downloads/bitbake-1.0.tar.gz', + 'http://downloads2.yoctoproject.org/downloads/bitbake-1.0.tar.gz']) + + recmirrorvar = "https://.*/[^/]* http://AAAA/A/A/A/ \n" \ + "https://.*/[^/]* https://BBBB/B/B/B/ \n" + + def test_recursive(self): + fetcher = bb.fetch.FetchData("https://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", self.d) + mirrors = bb.fetch2.mirror_from_string(self.recmirrorvar) + uris, uds = bb.fetch2.build_mirroruris(fetcher, mirrors, self.d) + self.assertEqual(uris, ['http://AAAA/A/A/A/bitbake/bitbake-1.0.tar.gz', + 'https://BBBB/B/B/B/bitbake/bitbake-1.0.tar.gz', + 'http://AAAA/A/A/A/B/B/bitbake/bitbake-1.0.tar.gz']) + +class FetcherLocalTest(FetcherTest): + def setUp(self): + def touch(fn): + with open(fn, 'a'): + os.utime(fn, None) + + super(FetcherLocalTest, self).setUp() + self.localsrcdir = os.path.join(self.tempdir, 'localsrc') + os.makedirs(self.localsrcdir) + touch(os.path.join(self.localsrcdir, 'a')) + touch(os.path.join(self.localsrcdir, 'b')) + os.makedirs(os.path.join(self.localsrcdir, 'dir')) + touch(os.path.join(self.localsrcdir, 'dir', 'c')) + touch(os.path.join(self.localsrcdir, 'dir', 'd')) + os.makedirs(os.path.join(self.localsrcdir, 'dir', 'subdir')) + touch(os.path.join(self.localsrcdir, 'dir', 'subdir', 'e')) + self.d.setVar("FILESPATH", self.localsrcdir) + + def fetchUnpack(self, uris): + fetcher = bb.fetch.Fetch(uris, self.d) + fetcher.download() + fetcher.unpack(self.unpackdir) + flst = [] + for root, dirs, files in os.walk(self.unpackdir): + for f in files: + flst.append(os.path.relpath(os.path.join(root, f), self.unpackdir)) + flst.sort() + return flst + + def test_local(self): + tree = self.fetchUnpack(['file://a', 'file://dir/c']) + self.assertEqual(tree, ['a', 'dir/c']) + + def test_local_wildcard(self): + tree = self.fetchUnpack(['file://a', 'file://dir/*']) + self.assertEqual(tree, ['a', 'dir/c', 'dir/d', 'dir/subdir/e']) + + def test_local_dir(self): + tree = self.fetchUnpack(['file://a', 'file://dir']) + self.assertEqual(tree, ['a', 'dir/c', 'dir/d', 'dir/subdir/e']) + + def test_local_subdir(self): + tree = self.fetchUnpack(['file://dir/subdir']) + self.assertEqual(tree, ['dir/subdir/e']) + + def test_local_subdir_file(self): + tree = self.fetchUnpack(['file://dir/subdir/e']) + self.assertEqual(tree, ['dir/subdir/e']) + + def test_local_subdirparam(self): + tree = self.fetchUnpack(['file://a;subdir=bar', 'file://dir;subdir=foo/moo']) + self.assertEqual(tree, ['bar/a', 'foo/moo/dir/c', 'foo/moo/dir/d', 'foo/moo/dir/subdir/e']) + + def test_local_deepsubdirparam(self): + tree = self.fetchUnpack(['file://dir/subdir/e;subdir=bar']) + self.assertEqual(tree, ['bar/dir/subdir/e']) + + def test_local_absolutedir(self): + # Unpacking to an absolute path that is a subdirectory of the root + # should work + tree = self.fetchUnpack(['file://a;subdir=%s' % os.path.join(self.unpackdir, 'bar')]) + + # Unpacking to an absolute path outside of the root should fail + with self.assertRaises(bb.fetch2.UnpackError): + self.fetchUnpack(['file://a;subdir=/bin/sh']) + +class FetcherNoNetworkTest(FetcherTest): + def setUp(self): + super().setUp() + # all test cases are based on not having network + self.d.setVar("BB_NO_NETWORK", "1") + + def test_missing(self): + string = "this is a test file\n".encode("utf-8") + self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest()) + self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest()) + + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d) + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + + def test_valid_missing_donestamp(self): + # create the file in the download directory with correct hash + string = "this is a test file\n".encode("utf-8") + with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb") as f: + f.write(string) + + self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest()) + self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest()) + + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d) + fetcher.download() + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + + def test_invalid_missing_donestamp(self): + # create an invalid file in the download directory with incorrect hash + string = "this is a test file\n".encode("utf-8") + with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"): + pass + + self.d.setVarFlag("SRC_URI", "md5sum", hashlib.md5(string).hexdigest()) + self.d.setVarFlag("SRC_URI", "sha256sum", hashlib.sha256(string).hexdigest()) + + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/test-file.tar.gz"], self.d) + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + # the existing file should not exist or should have be moved to "bad-checksum" + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + + def test_nochecksums_missing(self): + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + # ssh fetch does not support checksums + fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d) + # attempts to download with missing donestamp + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + + def test_nochecksums_missing_donestamp(self): + # create a file in the download directory + with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"): + pass + + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + # ssh fetch does not support checksums + fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d) + # attempts to download with missing donestamp + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + + def test_nochecksums_has_donestamp(self): + # create a file in the download directory with the donestamp + with open(os.path.join(self.dldir, "test-file.tar.gz"), "wb"): + pass + with open(os.path.join(self.dldir, "test-file.tar.gz.done"), "wb"): + pass + + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + # ssh fetch does not support checksums + fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d) + # should not fetch + fetcher.download() + # both files should still exist + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + + def test_nochecksums_missing_has_donestamp(self): + # create a file in the download directory with the donestamp + with open(os.path.join(self.dldir, "test-file.tar.gz.done"), "wb"): + pass + + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertTrue(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + # ssh fetch does not support checksums + fetcher = bb.fetch.Fetch(["ssh://invalid@invalid.yoctoproject.org/test-file.tar.gz"], self.d) + with self.assertRaises(bb.fetch2.NetworkAccess): + fetcher.download() + # both files should still exist + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz"))) + self.assertFalse(os.path.exists(os.path.join(self.dldir, "test-file.tar.gz.done"))) + +class FetcherNetworkTest(FetcherTest): + @skipIfNoNetwork() + def test_fetch(self): + fetcher = bb.fetch.Fetch(["http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", "http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.1.tar.gz"], self.d) + fetcher.download() + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749) + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.1.tar.gz"), 57892) + self.d.setVar("BB_NO_NETWORK", "1") + fetcher = bb.fetch.Fetch(["http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz", "http://downloads.yoctoproject.org/releases/bitbake/bitbake-1.1.tar.gz"], self.d) + fetcher.download() + fetcher.unpack(self.unpackdir) + self.assertEqual(len(os.listdir(self.unpackdir + "/bitbake-1.0/")), 9) + self.assertEqual(len(os.listdir(self.unpackdir + "/bitbake-1.1/")), 9) + + @skipIfNoNetwork() + def test_fetch_mirror(self): + self.d.setVar("MIRRORS", "http://.*/.* http://downloads.yoctoproject.org/releases/bitbake") + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d) + fetcher.download() + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749) + + @skipIfNoNetwork() + def test_fetch_mirror_of_mirror(self): + self.d.setVar("MIRRORS", "http://.*/.* http://invalid2.yoctoproject.org/ \n http://invalid2.yoctoproject.org/.* http://downloads.yoctoproject.org/releases/bitbake") + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d) + fetcher.download() + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749) + + @skipIfNoNetwork() + def test_fetch_file_mirror_of_mirror(self): + self.d.setVar("MIRRORS", "http://.*/.* file:///some1where/ \n file:///some1where/.* file://some2where/ \n file://some2where/.* http://downloads.yoctoproject.org/releases/bitbake") + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d) + os.mkdir(self.dldir + "/some2where") + fetcher.download() + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749) + + @skipIfNoNetwork() + def test_fetch_premirror(self): + self.d.setVar("PREMIRRORS", "http://.*/.* http://downloads.yoctoproject.org/releases/bitbake") + fetcher = bb.fetch.Fetch(["http://invalid.yoctoproject.org/releases/bitbake/bitbake-1.0.tar.gz"], self.d) + fetcher.download() + self.assertEqual(os.path.getsize(self.dldir + "/bitbake-1.0.tar.gz"), 57749) + + @skipIfNoNetwork() + def gitfetcher(self, url1, url2): + def checkrevision(self, fetcher): + fetcher.unpack(self.unpackdir) + revision = bb.process.run("git rev-parse HEAD", shell=True, cwd=self.unpackdir + "/git")[0].strip() + self.assertEqual(revision, "270a05b0b4ba0959fe0624d2a4885d7b70426da5") + + self.d.setVar("BB_GENERATE_MIRROR_TARBALLS", "1") + self.d.setVar("SRCREV", "270a05b0b4ba0959fe0624d2a4885d7b70426da5") + fetcher = bb.fetch.Fetch([url1], self.d) + fetcher.download() + checkrevision(self, fetcher) + # Wipe out the dldir clone and the unpacked source, turn off the network and check mirror tarball works + bb.utils.prunedir(self.dldir + "/git2/") + bb.utils.prunedir(self.unpackdir) + self.d.setVar("BB_NO_NETWORK", "1") + fetcher = bb.fetch.Fetch([url2], self.d) + fetcher.download() + checkrevision(self, fetcher) + + @skipIfNoNetwork() + def test_gitfetch(self): + url1 = url2 = "git://git.openembedded.org/bitbake" + self.gitfetcher(url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_goodsrcrev(self): + # SRCREV is set but matches rev= parameter + url1 = url2 = "git://git.openembedded.org/bitbake;rev=270a05b0b4ba0959fe0624d2a4885d7b70426da5" + self.gitfetcher(url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_badsrcrev(self): + # SRCREV is set but does not match rev= parameter + url1 = url2 = "git://git.openembedded.org/bitbake;rev=dead05b0b4ba0959fe0624d2a4885d7b70426da5" + self.assertRaises(bb.fetch.FetchError, self.gitfetcher, url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_tagandrev(self): + # SRCREV is set but does not match rev= parameter + url1 = url2 = "git://git.openembedded.org/bitbake;rev=270a05b0b4ba0959fe0624d2a4885d7b70426da5;tag=270a05b0b4ba0959fe0624d2a4885d7b70426da5" + self.assertRaises(bb.fetch.FetchError, self.gitfetcher, url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_localusehead(self): + # Create dummy local Git repo + src_dir = tempfile.mkdtemp(dir=self.tempdir, + prefix='gitfetch_localusehead_') + src_dir = os.path.abspath(src_dir) + bb.process.run("git init", cwd=src_dir) + bb.process.run("git commit --allow-empty -m'Dummy commit'", + cwd=src_dir) + # Use other branch than master + bb.process.run("git checkout -b my-devel", cwd=src_dir) + bb.process.run("git commit --allow-empty -m'Dummy commit 2'", + cwd=src_dir) + stdout = bb.process.run("git rev-parse HEAD", cwd=src_dir) + orig_rev = stdout[0].strip() + + # Fetch and check revision + self.d.setVar("SRCREV", "AUTOINC") + url = "git://" + src_dir + ";protocol=file;usehead=1" + fetcher = bb.fetch.Fetch([url], self.d) + fetcher.download() + fetcher.unpack(self.unpackdir) + stdout = bb.process.run("git rev-parse HEAD", + cwd=os.path.join(self.unpackdir, 'git')) + unpack_rev = stdout[0].strip() + self.assertEqual(orig_rev, unpack_rev) + + @skipIfNoNetwork() + def test_gitfetch_remoteusehead(self): + url = "git://git.openembedded.org/bitbake;usehead=1" + self.assertRaises(bb.fetch.ParameterError, self.gitfetcher, url, url) + + @skipIfNoNetwork() + def test_gitfetch_premirror(self): + url1 = "git://git.openembedded.org/bitbake" + url2 = "git://someserver.org/bitbake" + self.d.setVar("PREMIRRORS", "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n") + self.gitfetcher(url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_premirror2(self): + url1 = url2 = "git://someserver.org/bitbake" + self.d.setVar("PREMIRRORS", "git://someserver.org/bitbake git://git.openembedded.org/bitbake \n") + self.gitfetcher(url1, url2) + + @skipIfNoNetwork() + def test_gitfetch_premirror3(self): + realurl = "git://git.openembedded.org/bitbake" + dummyurl = "git://someserver.org/bitbake" + self.sourcedir = self.unpackdir.replace("unpacked", "sourcemirror.git") + os.chdir(self.tempdir) + bb.process.run("git clone %s %s 2> /dev/null" % (realurl, self.sourcedir), shell=True) + self.d.setVar("PREMIRRORS", "%s git://%s;protocol=file \n" % (dummyurl, self.sourcedir)) + self.gitfetcher(dummyurl, dummyurl) + + @skipIfNoNetwork() + def test_git_submodule(self): + fetcher = bb.fetch.Fetch(["gitsm://git.yoctoproject.org/git-submodule-test;rev=f12e57f2edf0aa534cf1616fa983d165a92b0842"], self.d) + fetcher.download() + # Previous cwd has been deleted + os.chdir(os.path.dirname(self.unpackdir)) + fetcher.unpack(self.unpackdir) + + +class TrustedNetworksTest(FetcherTest): + def test_trusted_network(self): + # Ensure trusted_network returns False when the host IS in the list. + url = "git://Someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org someserver.org server2.org server3.org") + self.assertTrue(bb.fetch.trusted_network(self.d, url)) + + def test_wild_trusted_network(self): + # Ensure trusted_network returns true when the *.host IS in the list. + url = "git://Someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org") + self.assertTrue(bb.fetch.trusted_network(self.d, url)) + + def test_prefix_wild_trusted_network(self): + # Ensure trusted_network returns true when the prefix matches *.host. + url = "git://git.Someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org") + self.assertTrue(bb.fetch.trusted_network(self.d, url)) + + def test_two_prefix_wild_trusted_network(self): + # Ensure trusted_network returns true when the prefix matches *.host. + url = "git://something.git.Someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org *.someserver.org server2.org server3.org") + self.assertTrue(bb.fetch.trusted_network(self.d, url)) + + def test_port_trusted_network(self): + # Ensure trusted_network returns True, even if the url specifies a port. + url = "git://someserver.org:8080/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "someserver.org") + self.assertTrue(bb.fetch.trusted_network(self.d, url)) + + def test_untrusted_network(self): + # Ensure trusted_network returns False when the host is NOT in the list. + url = "git://someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org server2.org server3.org") + self.assertFalse(bb.fetch.trusted_network(self.d, url)) + + def test_wild_untrusted_network(self): + # Ensure trusted_network returns False when the host is NOT in the list. + url = "git://*.someserver.org/foo;rev=1" + self.d.setVar("BB_ALLOWED_NETWORKS", "server1.org server2.org server3.org") + self.assertFalse(bb.fetch.trusted_network(self.d, url)) + +class URLHandle(unittest.TestCase): + + datatable = { + "http://www.google.com/index.html" : ('http', 'www.google.com', '/index.html', '', '', {}), + "cvs://anoncvs@cvs.handhelds.org/cvs;module=familiar/dist/ipkg" : ('cvs', 'cvs.handhelds.org', '/cvs', 'anoncvs', '', {'module': 'familiar/dist/ipkg'}), + "cvs://anoncvs:anonymous@cvs.handhelds.org/cvs;tag=V0-99-81;module=familiar/dist/ipkg" : ('cvs', 'cvs.handhelds.org', '/cvs', 'anoncvs', 'anonymous', collections.OrderedDict([('tag', 'V0-99-81'), ('module', 'familiar/dist/ipkg')])), + "git://git.openembedded.org/bitbake;branch=@foo" : ('git', 'git.openembedded.org', '/bitbake', '', '', {'branch': '@foo'}), + "file://somelocation;someparam=1": ('file', '', 'somelocation', '', '', {'someparam': '1'}), + } + # we require a pathname to encodeurl but users can still pass such urls to + # decodeurl and we need to handle them + decodedata = datatable.copy() + decodedata.update({ + "http://somesite.net;someparam=1": ('http', 'somesite.net', '', '', '', {'someparam': '1'}), + }) + + def test_decodeurl(self): + for k, v in self.decodedata.items(): + result = bb.fetch.decodeurl(k) + self.assertEqual(result, v) + + def test_encodeurl(self): + for k, v in self.datatable.items(): + result = bb.fetch.encodeurl(v) + self.assertEqual(result, k) + +class FetchLatestVersionTest(FetcherTest): + + test_git_uris = { + # version pattern "X.Y.Z" + ("mx-1.0", "git://github.com/clutter-project/mx.git;branch=mx-1.4", "9b1db6b8060bd00b121a692f942404a24ae2960f", "") + : "1.99.4", + # version pattern "vX.Y" + ("mtd-utils", "git://git.infradead.org/mtd-utils.git", "ca39eb1d98e736109c64ff9c1aa2a6ecca222d8f", "") + : "1.5.0", + # version pattern "pkg_name-X.Y" + ("presentproto", "git://anongit.freedesktop.org/git/xorg/proto/presentproto", "24f3a56e541b0a9e6c6ee76081f441221a120ef9", "") + : "1.0", + # version pattern "pkg_name-vX.Y.Z" + ("dtc", "git://git.qemu.org/dtc.git", "65cc4d2748a2c2e6f27f1cf39e07a5dbabd80ebf", "") + : "1.4.0", + # combination version pattern + ("sysprof", "git://git.gnome.org/sysprof", "cd44ee6644c3641507fb53b8a2a69137f2971219", "") + : "1.2.0", + ("u-boot-mkimage", "git://git.denx.de/u-boot.git;branch=master;protocol=git", "62c175fbb8a0f9a926c88294ea9f7e88eb898f6c", "") + : "2014.01", + # version pattern "yyyymmdd" + ("mobile-broadband-provider-info", "git://git.gnome.org/mobile-broadband-provider-info", "4ed19e11c2975105b71b956440acdb25d46a347d", "") + : "20120614", + # packages with a valid UPSTREAM_CHECK_GITTAGREGEX + ("xf86-video-omap", "git://anongit.freedesktop.org/xorg/driver/xf86-video-omap", "ae0394e687f1a77e966cf72f895da91840dffb8f", "(?P<pver>(\d+\.(\d\.?)*))") + : "0.4.3", + ("build-appliance-image", "git://git.yoctoproject.org/poky", "b37dd451a52622d5b570183a81583cc34c2ff555", "(?P<pver>(([0-9][\.|_]?)+[0-9]))") + : "11.0.0", + ("chkconfig-alternatives-native", "git://github.com/kergoth/chkconfig;branch=sysroot", "cd437ecbd8986c894442f8fce1e0061e20f04dee", "chkconfig\-(?P<pver>((\d+[\.\-_]*)+))") + : "1.3.59", + ("remake", "git://github.com/rocky/remake.git", "f05508e521987c8494c92d9c2871aec46307d51d", "(?P<pver>(\d+\.(\d+\.)*\d*(\+dbg\d+(\.\d+)*)*))") + : "3.82+dbg0.9", + } + + test_wget_uris = { + # packages with versions inside directory name + ("util-linux", "http://kernel.org/pub/linux/utils/util-linux/v2.23/util-linux-2.24.2.tar.bz2", "", "") + : "2.24.2", + ("enchant", "http://www.abisource.com/downloads/enchant/1.6.0/enchant-1.6.0.tar.gz", "", "") + : "1.6.0", + ("cmake", "http://www.cmake.org/files/v2.8/cmake-2.8.12.1.tar.gz", "", "") + : "2.8.12.1", + # packages with versions only in current directory + ("eglic", "http://downloads.yoctoproject.org/releases/eglibc/eglibc-2.18-svnr23787.tar.bz2", "", "") + : "2.19", + ("gnu-config", "http://downloads.yoctoproject.org/releases/gnu-config/gnu-config-20120814.tar.bz2", "", "") + : "20120814", + # packages with "99" in the name of possible version + ("pulseaudio", "http://freedesktop.org/software/pulseaudio/releases/pulseaudio-4.0.tar.xz", "", "") + : "5.0", + ("xserver-xorg", "http://xorg.freedesktop.org/releases/individual/xserver/xorg-server-1.15.1.tar.bz2", "", "") + : "1.15.1", + # packages with valid UPSTREAM_CHECK_URI and UPSTREAM_CHECK_REGEX + ("cups", "http://www.cups.org/software/1.7.2/cups-1.7.2-source.tar.bz2", "https://github.com/apple/cups/releases", "(?P<name>cups\-)(?P<pver>((\d+[\.\-_]*)+))\-source\.tar\.gz") + : "2.0.0", + ("db", "http://download.oracle.com/berkeley-db/db-5.3.21.tar.gz", "http://www.oracle.com/technetwork/products/berkeleydb/downloads/index-082944.html", "http://download.oracle.com/otn/berkeley-db/(?P<name>db-)(?P<pver>((\d+[\.\-_]*)+))\.tar\.gz") + : "6.1.19", + } + + @skipIfNoNetwork() + def test_git_latest_versionstring(self): + for k, v in self.test_git_uris.items(): + self.d.setVar("PN", k[0]) + self.d.setVar("SRCREV", k[2]) + self.d.setVar("UPSTREAM_CHECK_GITTAGREGEX", k[3]) + ud = bb.fetch2.FetchData(k[1], self.d) + pupver= ud.method.latest_versionstring(ud, self.d) + verstring = pupver[0] + self.assertTrue(verstring, msg="Could not find upstream version for %s" % k[0]) + r = bb.utils.vercmp_string(v, verstring) + self.assertTrue(r == -1 or r == 0, msg="Package %s, version: %s <= %s" % (k[0], v, verstring)) + + @skipIfNoNetwork() + def test_wget_latest_versionstring(self): + for k, v in self.test_wget_uris.items(): + self.d.setVar("PN", k[0]) + self.d.setVar("UPSTREAM_CHECK_URI", k[2]) + self.d.setVar("UPSTREAM_CHECK_REGEX", k[3]) + ud = bb.fetch2.FetchData(k[1], self.d) + pupver = ud.method.latest_versionstring(ud, self.d) + verstring = pupver[0] + self.assertTrue(verstring, msg="Could not find upstream version for %s" % k[0]) + r = bb.utils.vercmp_string(v, verstring) + self.assertTrue(r == -1 or r == 0, msg="Package %s, version: %s <= %s" % (k[0], v, verstring)) + + +class FetchCheckStatusTest(FetcherTest): + test_wget_uris = ["http://www.cups.org/software/1.7.2/cups-1.7.2-source.tar.bz2", + "http://www.cups.org/", + "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.1.tar.gz", + "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.2.tar.gz", + "http://downloads.yoctoproject.org/releases/sato/sato-engine-0.3.tar.gz", + "https://yoctoproject.org/", + "https://yoctoproject.org/documentation", + "http://downloads.yoctoproject.org/releases/opkg/opkg-0.1.7.tar.gz", + "http://downloads.yoctoproject.org/releases/opkg/opkg-0.3.0.tar.gz", + "ftp://sourceware.org/pub/libffi/libffi-1.20.tar.gz", + "http://ftp.gnu.org/gnu/autoconf/autoconf-2.60.tar.gz", + "https://ftp.gnu.org/gnu/chess/gnuchess-5.08.tar.gz", + "https://ftp.gnu.org/gnu/gmp/gmp-4.0.tar.gz", + # GitHub releases are hosted on Amazon S3, which doesn't support HEAD + "https://github.com/kergoth/tslib/releases/download/1.1/tslib-1.1.tar.xz" + ] + + @skipIfNoNetwork() + def test_wget_checkstatus(self): + fetch = bb.fetch2.Fetch(self.test_wget_uris, self.d) + for u in self.test_wget_uris: + with self.subTest(url=u): + ud = fetch.ud[u] + m = ud.method + ret = m.checkstatus(fetch, ud, self.d) + self.assertTrue(ret, msg="URI %s, can't check status" % (u)) + + @skipIfNoNetwork() + def test_wget_checkstatus_connection_cache(self): + from bb.fetch2 import FetchConnectionCache + + connection_cache = FetchConnectionCache() + fetch = bb.fetch2.Fetch(self.test_wget_uris, self.d, + connection_cache = connection_cache) + + for u in self.test_wget_uris: + with self.subTest(url=u): + ud = fetch.ud[u] + m = ud.method + ret = m.checkstatus(fetch, ud, self.d) + self.assertTrue(ret, msg="URI %s, can't check status" % (u)) + + connection_cache.close_connections() + + +class GitMakeShallowTest(FetcherTest): + def setUp(self): + FetcherTest.setUp(self) + self.gitdir = os.path.join(self.tempdir, 'gitshallow') + bb.utils.mkdirhier(self.gitdir) + bb.process.run('git init', cwd=self.gitdir) + + def assertRefs(self, expected_refs): + actual_refs = self.git(['for-each-ref', '--format=%(refname)']).splitlines() + full_expected = self.git(['rev-parse', '--symbolic-full-name'] + expected_refs).splitlines() + self.assertEqual(sorted(full_expected), sorted(actual_refs)) + + def assertRevCount(self, expected_count, args=None): + if args is None: + args = ['HEAD'] + revs = self.git(['rev-list'] + args) + actual_count = len(revs.splitlines()) + self.assertEqual(expected_count, actual_count, msg='Object count `%d` is not the expected `%d`' % (actual_count, expected_count)) + + def git(self, cmd): + if isinstance(cmd, str): + cmd = 'git ' + cmd + else: + cmd = ['git'] + cmd + return bb.process.run(cmd, cwd=self.gitdir)[0] + + def make_shallow(self, args=None): + if args is None: + args = ['HEAD'] + return bb.process.run([bb.fetch2.git.Git.make_shallow_path] + args, cwd=self.gitdir) + + def add_empty_file(self, path, msg=None): + if msg is None: + msg = path + open(os.path.join(self.gitdir, path), 'w').close() + self.git(['add', path]) + self.git(['commit', '-m', msg, path]) + + def test_make_shallow_single_branch_no_merge(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2) + self.make_shallow() + self.assertRevCount(1) + + def test_make_shallow_single_branch_one_merge(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('checkout -b a_branch') + self.add_empty_file('c') + self.git('checkout master') + self.add_empty_file('d') + self.git('merge --no-ff --no-edit a_branch') + self.git('branch -d a_branch') + self.add_empty_file('e') + self.assertRevCount(6) + self.make_shallow(['HEAD~2']) + self.assertRevCount(5) + + def test_make_shallow_at_merge(self): + self.add_empty_file('a') + self.git('checkout -b a_branch') + self.add_empty_file('b') + self.git('checkout master') + self.git('merge --no-ff --no-edit a_branch') + self.git('branch -d a_branch') + self.assertRevCount(3) + self.make_shallow() + self.assertRevCount(1) + + def test_make_shallow_annotated_tag(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('tag -a -m a_tag a_tag') + self.assertRevCount(2) + self.make_shallow(['a_tag']) + self.assertRevCount(1) + + def test_make_shallow_multi_ref(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('checkout -b a_branch') + self.add_empty_file('c') + self.git('checkout master') + self.add_empty_file('d') + self.git('checkout -b a_branch_2') + self.add_empty_file('a_tag') + self.git('tag a_tag') + self.git('checkout master') + self.git('branch -D a_branch_2') + self.add_empty_file('e') + self.assertRevCount(6, ['--all']) + self.make_shallow() + self.assertRevCount(5, ['--all']) + + def test_make_shallow_multi_ref_trim(self): + self.add_empty_file('a') + self.git('checkout -b a_branch') + self.add_empty_file('c') + self.git('checkout master') + self.assertRevCount(1) + self.assertRevCount(2, ['--all']) + self.assertRefs(['master', 'a_branch']) + self.make_shallow(['-r', 'master', 'HEAD']) + self.assertRevCount(1, ['--all']) + self.assertRefs(['master']) + + def test_make_shallow_noop(self): + self.add_empty_file('a') + self.assertRevCount(1) + self.make_shallow() + self.assertRevCount(1) + + @skipIfNoNetwork() + def test_make_shallow_bitbake(self): + self.git('remote add origin https://github.com/openembedded/bitbake') + self.git('fetch --tags origin') + orig_revs = len(self.git('rev-list --all').splitlines()) + self.make_shallow(['refs/tags/1.10.0']) + self.assertRevCount(orig_revs - 1746, ['--all']) + +class GitShallowTest(FetcherTest): + def setUp(self): + FetcherTest.setUp(self) + self.gitdir = os.path.join(self.tempdir, 'git') + self.srcdir = os.path.join(self.tempdir, 'gitsource') + + bb.utils.mkdirhier(self.srcdir) + self.git('init', cwd=self.srcdir) + self.d.setVar('WORKDIR', self.tempdir) + self.d.setVar('S', self.gitdir) + self.d.delVar('PREMIRRORS') + self.d.delVar('MIRRORS') + + uri = 'git://%s;protocol=file;subdir=${S}' % self.srcdir + self.d.setVar('SRC_URI', uri) + self.d.setVar('SRCREV', '${AUTOREV}') + self.d.setVar('AUTOREV', '${@bb.fetch2.get_autorev(d)}') + + self.d.setVar('BB_GIT_SHALLOW', '1') + self.d.setVar('BB_GENERATE_MIRROR_TARBALLS', '0') + self.d.setVar('BB_GENERATE_SHALLOW_TARBALLS', '1') + + def assertRefs(self, expected_refs, cwd=None): + if cwd is None: + cwd = self.gitdir + actual_refs = self.git(['for-each-ref', '--format=%(refname)'], cwd=cwd).splitlines() + full_expected = self.git(['rev-parse', '--symbolic-full-name'] + expected_refs, cwd=cwd).splitlines() + self.assertEqual(sorted(set(full_expected)), sorted(set(actual_refs))) + + def assertRevCount(self, expected_count, args=None, cwd=None): + if args is None: + args = ['HEAD'] + if cwd is None: + cwd = self.gitdir + revs = self.git(['rev-list'] + args, cwd=cwd) + actual_count = len(revs.splitlines()) + self.assertEqual(expected_count, actual_count, msg='Object count `%d` is not the expected `%d`' % (actual_count, expected_count)) + + def git(self, cmd, cwd=None): + if isinstance(cmd, str): + cmd = 'git ' + cmd + else: + cmd = ['git'] + cmd + if cwd is None: + cwd = self.gitdir + return bb.process.run(cmd, cwd=cwd)[0] + + def add_empty_file(self, path, cwd=None, msg=None): + if msg is None: + msg = path + if cwd is None: + cwd = self.srcdir + open(os.path.join(cwd, path), 'w').close() + self.git(['add', path], cwd) + self.git(['commit', '-m', msg, path], cwd) + + def fetch(self, uri=None): + if uri is None: + uris = self.d.getVar('SRC_URI', True).split() + uri = uris[0] + d = self.d + else: + d = self.d.createCopy() + d.setVar('SRC_URI', uri) + uri = d.expand(uri) + uris = [uri] + + fetcher = bb.fetch2.Fetch(uris, d) + fetcher.download() + ud = fetcher.ud[uri] + return fetcher, ud + + def fetch_and_unpack(self, uri=None): + fetcher, ud = self.fetch(uri) + fetcher.unpack(self.d.getVar('WORKDIR')) + assert os.path.exists(self.d.getVar('S')) + return fetcher, ud + + def fetch_shallow(self, uri=None, disabled=False, keepclone=False): + """Fetch a uri, generating a shallow tarball, then unpack using it""" + fetcher, ud = self.fetch_and_unpack(uri) + assert os.path.exists(ud.clonedir), 'Git clone in DLDIR (%s) does not exist for uri %s' % (ud.clonedir, uri) + + # Confirm that the unpacked repo is unshallow + if not disabled: + assert os.path.exists(os.path.join(self.dldir, ud.mirrortarballs[0])) + + # fetch and unpack, from the shallow tarball + bb.utils.remove(self.gitdir, recurse=True) + bb.utils.remove(ud.clonedir, recurse=True) + + # confirm that the unpacked repo is used when no git clone or git + # mirror tarball is available + fetcher, ud = self.fetch_and_unpack(uri) + if not disabled: + assert os.path.exists(os.path.join(self.gitdir, '.git', 'shallow')), 'Unpacked git repository at %s is not shallow' % self.gitdir + else: + assert not os.path.exists(os.path.join(self.gitdir, '.git', 'shallow')), 'Unpacked git repository at %s is shallow' % self.gitdir + return fetcher, ud + + def test_shallow_disabled(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW', '0') + self.fetch_shallow(disabled=True) + self.assertRevCount(2) + + def test_shallow_nobranch(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + srcrev = self.git('rev-parse HEAD', cwd=self.srcdir).strip() + self.d.setVar('SRCREV', srcrev) + uri = self.d.getVar('SRC_URI', True).split()[0] + uri = '%s;nobranch=1;bare=1' % uri + + self.fetch_shallow(uri) + self.assertRevCount(1) + + # shallow refs are used to ensure the srcrev sticks around when we + # have no other branches referencing it + self.assertRefs(['refs/shallow/default']) + + def test_shallow_default_depth_1(self): + # Create initial git repo + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.fetch_shallow() + self.assertRevCount(1) + + def test_shallow_depth_0_disables(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.fetch_shallow(disabled=True) + self.assertRevCount(2) + + def test_shallow_depth_default_override(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '2') + self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '1') + self.fetch_shallow() + self.assertRevCount(1) + + def test_shallow_depth_default_override_disable(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.add_empty_file('c') + self.assertRevCount(3, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '2') + self.fetch_shallow() + self.assertRevCount(2) + + def test_current_shallow_out_of_date_clone(self): + # Create initial git repo + self.add_empty_file('a') + self.add_empty_file('b') + self.add_empty_file('c') + self.assertRevCount(3, cwd=self.srcdir) + + # Clone and generate mirror tarball + fetcher, ud = self.fetch() + + # Ensure we have a current mirror tarball, but an out of date clone + self.git('update-ref refs/heads/master refs/heads/master~1', cwd=ud.clonedir) + self.assertRevCount(2, cwd=ud.clonedir) + + # Fetch and unpack, from the current tarball, not the out of date clone + bb.utils.remove(self.gitdir, recurse=True) + fetcher, ud = self.fetch() + fetcher.unpack(self.d.getVar('WORKDIR')) + self.assertRevCount(1) + + def test_shallow_single_branch_no_merge(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.fetch_shallow() + self.assertRevCount(1) + assert os.path.exists(os.path.join(self.gitdir, 'a')) + assert os.path.exists(os.path.join(self.gitdir, 'b')) + + def test_shallow_no_dangling(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.assertRevCount(2, cwd=self.srcdir) + + self.fetch_shallow() + self.assertRevCount(1) + assert not self.git('fsck --dangling') + + def test_shallow_srcrev_branch_truncation(self): + self.add_empty_file('a') + self.add_empty_file('b') + b_commit = self.git('rev-parse HEAD', cwd=self.srcdir).rstrip() + self.add_empty_file('c') + self.assertRevCount(3, cwd=self.srcdir) + + self.d.setVar('SRCREV', b_commit) + self.fetch_shallow() + + # The 'c' commit was removed entirely, and 'a' was removed from history + self.assertRevCount(1, ['--all']) + self.assertEqual(self.git('rev-parse HEAD').strip(), b_commit) + assert os.path.exists(os.path.join(self.gitdir, 'a')) + assert os.path.exists(os.path.join(self.gitdir, 'b')) + assert not os.path.exists(os.path.join(self.gitdir, 'c')) + + def test_shallow_ref_pruning(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('branch a_branch', cwd=self.srcdir) + self.assertRefs(['master', 'a_branch'], cwd=self.srcdir) + self.assertRevCount(2, cwd=self.srcdir) + + self.fetch_shallow() + + self.assertRefs(['master', 'origin/master']) + self.assertRevCount(1) + + def test_shallow_submodules(self): + self.add_empty_file('a') + self.add_empty_file('b') + + smdir = os.path.join(self.tempdir, 'gitsubmodule') + bb.utils.mkdirhier(smdir) + self.git('init', cwd=smdir) + self.add_empty_file('asub', cwd=smdir) + + self.git('submodule init', cwd=self.srcdir) + self.git('submodule add file://%s' % smdir, cwd=self.srcdir) + self.git('submodule update', cwd=self.srcdir) + self.git('commit -m submodule -a', cwd=self.srcdir) + + uri = 'gitsm://%s;protocol=file;subdir=${S}' % self.srcdir + fetcher, ud = self.fetch_shallow(uri) + + self.assertRevCount(1) + assert './.git/modules/' in bb.process.run('tar -tzf %s' % os.path.join(self.dldir, ud.mirrortarballs[0]))[0] + assert os.listdir(os.path.join(self.gitdir, 'gitsubmodule')) + + if any(os.path.exists(os.path.join(p, 'git-annex')) for p in os.environ.get('PATH').split(':')): + def test_shallow_annex(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('annex init', cwd=self.srcdir) + open(os.path.join(self.srcdir, 'c'), 'w').close() + self.git('annex add c', cwd=self.srcdir) + self.git('commit -m annex-c -a', cwd=self.srcdir) + bb.process.run('chmod u+w -R %s' % os.path.join(self.srcdir, '.git', 'annex')) + + uri = 'gitannex://%s;protocol=file;subdir=${S}' % self.srcdir + fetcher, ud = self.fetch_shallow(uri) + + self.assertRevCount(1) + assert './.git/annex/' in bb.process.run('tar -tzf %s' % os.path.join(self.dldir, ud.mirrortarballs[0]))[0] + assert os.path.exists(os.path.join(self.gitdir, 'c')) + + def test_shallow_multi_one_uri(self): + # Create initial git repo + self.add_empty_file('a') + self.add_empty_file('b') + self.git('checkout -b a_branch', cwd=self.srcdir) + self.add_empty_file('c') + self.add_empty_file('d') + self.git('checkout master', cwd=self.srcdir) + self.git('tag v0.0 a_branch', cwd=self.srcdir) + self.add_empty_file('e') + self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir) + self.add_empty_file('f') + self.assertRevCount(7, cwd=self.srcdir) + + uri = self.d.getVar('SRC_URI', True).split()[0] + uri = '%s;branch=master,a_branch;name=master,a_branch' % uri + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0') + self.d.setVar('SRCREV_master', '${AUTOREV}') + self.d.setVar('SRCREV_a_branch', '${AUTOREV}') + + self.fetch_shallow(uri) + + self.assertRevCount(5) + self.assertRefs(['master', 'origin/master', 'origin/a_branch']) + + def test_shallow_multi_one_uri_depths(self): + # Create initial git repo + self.add_empty_file('a') + self.add_empty_file('b') + self.git('checkout -b a_branch', cwd=self.srcdir) + self.add_empty_file('c') + self.add_empty_file('d') + self.git('checkout master', cwd=self.srcdir) + self.add_empty_file('e') + self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir) + self.add_empty_file('f') + self.assertRevCount(7, cwd=self.srcdir) + + uri = self.d.getVar('SRC_URI', True).split()[0] + uri = '%s;branch=master,a_branch;name=master,a_branch' % uri + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.d.setVar('BB_GIT_SHALLOW_DEPTH_master', '3') + self.d.setVar('BB_GIT_SHALLOW_DEPTH_a_branch', '1') + self.d.setVar('SRCREV_master', '${AUTOREV}') + self.d.setVar('SRCREV_a_branch', '${AUTOREV}') + + self.fetch_shallow(uri) + + self.assertRevCount(4, ['--all']) + self.assertRefs(['master', 'origin/master', 'origin/a_branch']) + + def test_shallow_clone_preferred_over_shallow(self): + self.add_empty_file('a') + self.add_empty_file('b') + + # Fetch once to generate the shallow tarball + fetcher, ud = self.fetch() + assert os.path.exists(os.path.join(self.dldir, ud.mirrortarballs[0])) + + # Fetch and unpack with both the clonedir and shallow tarball available + bb.utils.remove(self.gitdir, recurse=True) + fetcher, ud = self.fetch_and_unpack() + + # The unpacked tree should *not* be shallow + self.assertRevCount(2) + assert not os.path.exists(os.path.join(self.gitdir, '.git', 'shallow')) + + def test_shallow_mirrors(self): + self.add_empty_file('a') + self.add_empty_file('b') + + # Fetch once to generate the shallow tarball + fetcher, ud = self.fetch() + mirrortarball = ud.mirrortarballs[0] + assert os.path.exists(os.path.join(self.dldir, mirrortarball)) + + # Set up the mirror + mirrordir = os.path.join(self.tempdir, 'mirror') + bb.utils.mkdirhier(mirrordir) + self.d.setVar('PREMIRRORS', 'git://.*/.* file://%s/\n' % mirrordir) + + os.rename(os.path.join(self.dldir, mirrortarball), + os.path.join(mirrordir, mirrortarball)) + + # Fetch from the mirror + bb.utils.remove(self.dldir, recurse=True) + bb.utils.remove(self.gitdir, recurse=True) + self.fetch_and_unpack() + self.assertRevCount(1) + + def test_shallow_invalid_depth(self): + self.add_empty_file('a') + self.add_empty_file('b') + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '-12') + with self.assertRaises(bb.fetch2.FetchError): + self.fetch() + + def test_shallow_invalid_depth_default(self): + self.add_empty_file('a') + self.add_empty_file('b') + + self.d.setVar('BB_GIT_SHALLOW_DEPTH_default', '-12') + with self.assertRaises(bb.fetch2.FetchError): + self.fetch() + + def test_shallow_extra_refs(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('branch a_branch', cwd=self.srcdir) + self.assertRefs(['master', 'a_branch'], cwd=self.srcdir) + self.assertRevCount(2, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/heads/a_branch') + self.fetch_shallow() + + self.assertRefs(['master', 'origin/master', 'origin/a_branch']) + self.assertRevCount(1) + + def test_shallow_extra_refs_wildcard(self): + self.add_empty_file('a') + self.add_empty_file('b') + self.git('branch a_branch', cwd=self.srcdir) + self.git('tag v1.0', cwd=self.srcdir) + self.assertRefs(['master', 'a_branch', 'v1.0'], cwd=self.srcdir) + self.assertRevCount(2, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/tags/*') + self.fetch_shallow() + + self.assertRefs(['master', 'origin/master', 'v1.0']) + self.assertRevCount(1) + + def test_shallow_missing_extra_refs(self): + self.add_empty_file('a') + self.add_empty_file('b') + + self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/heads/foo') + with self.assertRaises(bb.fetch2.FetchError): + self.fetch() + + def test_shallow_missing_extra_refs_wildcard(self): + self.add_empty_file('a') + self.add_empty_file('b') + + self.d.setVar('BB_GIT_SHALLOW_EXTRA_REFS', 'refs/tags/*') + self.fetch() + + def test_shallow_remove_revs(self): + # Create initial git repo + self.add_empty_file('a') + self.add_empty_file('b') + self.git('checkout -b a_branch', cwd=self.srcdir) + self.add_empty_file('c') + self.add_empty_file('d') + self.git('checkout master', cwd=self.srcdir) + self.git('tag v0.0 a_branch', cwd=self.srcdir) + self.add_empty_file('e') + self.git('merge --no-ff --no-edit a_branch', cwd=self.srcdir) + self.git('branch -d a_branch', cwd=self.srcdir) + self.add_empty_file('f') + self.assertRevCount(7, cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0') + + self.fetch_shallow() + + self.assertRevCount(5) + + def test_shallow_invalid_revs(self): + self.add_empty_file('a') + self.add_empty_file('b') + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + self.d.setVar('BB_GIT_SHALLOW_REVS', 'v0.0') + + with self.assertRaises(bb.fetch2.FetchError): + self.fetch() + + @skipIfNoNetwork() + def test_bitbake(self): + self.git('remote add --mirror=fetch origin git://github.com/openembedded/bitbake', cwd=self.srcdir) + self.git('config core.bare true', cwd=self.srcdir) + self.git('fetch', cwd=self.srcdir) + + self.d.setVar('BB_GIT_SHALLOW_DEPTH', '0') + # Note that the 1.10.0 tag is annotated, so this also tests + # reference of an annotated vs unannotated tag + self.d.setVar('BB_GIT_SHALLOW_REVS', '1.10.0') + + self.fetch_shallow() + + # Confirm that the history of 1.10.0 was removed + orig_revs = len(self.git('rev-list master', cwd=self.srcdir).splitlines()) + revs = len(self.git('rev-list master').splitlines()) + self.assertNotEqual(orig_revs, revs) + self.assertRefs(['master', 'origin/master']) + self.assertRevCount(orig_revs - 1758) diff --git a/poky/bitbake/lib/bb/tests/parse.py b/poky/bitbake/lib/bb/tests/parse.py new file mode 100644 index 000000000..8f16ba4f4 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/parse.py @@ -0,0 +1,185 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Test for lib/bb/parse/ +# +# Copyright (C) 2015 Richard Purdie +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import tempfile +import logging +import bb +import os + +logger = logging.getLogger('BitBake.TestParse') + +import bb.parse +import bb.data +import bb.siggen + +class ParseTest(unittest.TestCase): + + testfile = """ +A = "1" +B = "2" +do_install() { + echo "hello" +} + +C = "3" +""" + + def setUp(self): + self.d = bb.data.init() + bb.parse.siggen = bb.siggen.init(self.d) + + def parsehelper(self, content, suffix = ".bb"): + + f = tempfile.NamedTemporaryFile(suffix = suffix) + f.write(bytes(content, "utf-8")) + f.flush() + os.chdir(os.path.dirname(f.name)) + return f + + def test_parse_simple(self): + f = self.parsehelper(self.testfile) + d = bb.parse.handle(f.name, self.d)[''] + self.assertEqual(d.getVar("A"), "1") + self.assertEqual(d.getVar("B"), "2") + self.assertEqual(d.getVar("C"), "3") + + def test_parse_incomplete_function(self): + testfileB = self.testfile.replace("}", "") + f = self.parsehelper(testfileB) + with self.assertRaises(bb.parse.ParseError): + d = bb.parse.handle(f.name, self.d)[''] + + unsettest = """ +A = "1" +B = "2" +B[flag] = "3" + +unset A +unset B[flag] +""" + + def test_parse_unset(self): + f = self.parsehelper(self.unsettest) + d = bb.parse.handle(f.name, self.d)[''] + self.assertEqual(d.getVar("A"), None) + self.assertEqual(d.getVarFlag("A","flag"), None) + self.assertEqual(d.getVar("B"), "2") + + exporttest = """ +A = "a" +export B = "b" +export C +exportD = "d" +""" + + def test_parse_exports(self): + f = self.parsehelper(self.exporttest) + d = bb.parse.handle(f.name, self.d)[''] + self.assertEqual(d.getVar("A"), "a") + self.assertIsNone(d.getVarFlag("A", "export")) + self.assertEqual(d.getVar("B"), "b") + self.assertEqual(d.getVarFlag("B", "export"), 1) + self.assertIsNone(d.getVar("C")) + self.assertEqual(d.getVarFlag("C", "export"), 1) + self.assertIsNone(d.getVar("D")) + self.assertIsNone(d.getVarFlag("D", "export")) + self.assertEqual(d.getVar("exportD"), "d") + self.assertIsNone(d.getVarFlag("exportD", "export")) + + + overridetest = """ +RRECOMMENDS_${PN} = "a" +RRECOMMENDS_${PN}_libc = "b" +OVERRIDES = "libc:${PN}" +PN = "gtk+" +""" + + def test_parse_overrides(self): + f = self.parsehelper(self.overridetest) + d = bb.parse.handle(f.name, self.d)[''] + self.assertEqual(d.getVar("RRECOMMENDS"), "b") + bb.data.expandKeys(d) + self.assertEqual(d.getVar("RRECOMMENDS"), "b") + d.setVar("RRECOMMENDS_gtk+", "c") + self.assertEqual(d.getVar("RRECOMMENDS"), "c") + + overridetest2 = """ +EXTRA_OECONF = "" +EXTRA_OECONF_class-target = "b" +EXTRA_OECONF_append = " c" +""" + + def test_parse_overrides(self): + f = self.parsehelper(self.overridetest2) + d = bb.parse.handle(f.name, self.d)[''] + d.appendVar("EXTRA_OECONF", " d") + d.setVar("OVERRIDES", "class-target") + self.assertEqual(d.getVar("EXTRA_OECONF"), "b c d") + + overridetest3 = """ +DESCRIPTION = "A" +DESCRIPTION_${PN}-dev = "${DESCRIPTION} B" +PN = "bc" +""" + + def test_parse_combinations(self): + f = self.parsehelper(self.overridetest3) + d = bb.parse.handle(f.name, self.d)[''] + bb.data.expandKeys(d) + self.assertEqual(d.getVar("DESCRIPTION_bc-dev"), "A B") + d.setVar("DESCRIPTION", "E") + d.setVar("DESCRIPTION_bc-dev", "C D") + d.setVar("OVERRIDES", "bc-dev") + self.assertEqual(d.getVar("DESCRIPTION"), "C D") + + + classextend = """ +VAR_var_override1 = "B" +EXTRA = ":override1" +OVERRIDES = "nothing${EXTRA}" + +BBCLASSEXTEND = "###CLASS###" +""" + classextend_bbclass = """ +EXTRA = "" +python () { + d.renameVar("VAR_var", "VAR_var2") +} +""" + + # + # Test based upon a real world data corruption issue. One + # data store changing a variable poked through into a different data + # store. This test case replicates that issue where the value 'B' would + # become unset/disappear. + # + def test_parse_classextend_contamination(self): + cls = self.parsehelper(self.classextend_bbclass, suffix=".bbclass") + #clsname = os.path.basename(cls.name).replace(".bbclass", "") + self.classextend = self.classextend.replace("###CLASS###", cls.name) + f = self.parsehelper(self.classextend) + alldata = bb.parse.handle(f.name, self.d) + d1 = alldata[''] + d2 = alldata[cls.name] + self.assertEqual(d1.getVar("VAR_var"), "B") + self.assertEqual(d2.getVar("VAR_var"), None) + diff --git a/poky/bitbake/lib/bb/tests/utils.py b/poky/bitbake/lib/bb/tests/utils.py new file mode 100644 index 000000000..2f4ccf3c6 --- /dev/null +++ b/poky/bitbake/lib/bb/tests/utils.py @@ -0,0 +1,603 @@ +# ex:ts=4:sw=4:sts=4:et +# -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- +# +# BitBake Tests for utils.py +# +# Copyright (C) 2012 Richard Purdie +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License version 2 as +# published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# + +import unittest +import bb +import os +import tempfile +import re + +class VerCmpString(unittest.TestCase): + + def test_vercmpstring(self): + result = bb.utils.vercmp_string('1', '2') + self.assertTrue(result < 0) + result = bb.utils.vercmp_string('2', '1') + self.assertTrue(result > 0) + result = bb.utils.vercmp_string('1', '1.0') + self.assertTrue(result < 0) + result = bb.utils.vercmp_string('1', '1.1') + self.assertTrue(result < 0) + result = bb.utils.vercmp_string('1.1', '1_p2') + self.assertTrue(result < 0) + result = bb.utils.vercmp_string('1.0', '1.0+1.1-beta1') + self.assertTrue(result < 0) + result = bb.utils.vercmp_string('1.1', '1.0+1.1-beta1') + self.assertTrue(result > 0) + + def test_explode_dep_versions(self): + correctresult = {"foo" : ["= 1.10"]} + result = bb.utils.explode_dep_versions2("foo (= 1.10)") + self.assertEqual(result, correctresult) + result = bb.utils.explode_dep_versions2("foo (=1.10)") + self.assertEqual(result, correctresult) + result = bb.utils.explode_dep_versions2("foo ( = 1.10)") + self.assertEqual(result, correctresult) + result = bb.utils.explode_dep_versions2("foo ( =1.10)") + self.assertEqual(result, correctresult) + result = bb.utils.explode_dep_versions2("foo ( = 1.10 )") + self.assertEqual(result, correctresult) + result = bb.utils.explode_dep_versions2("foo ( =1.10 )") + self.assertEqual(result, correctresult) + + def test_vercmp_string_op(self): + compareops = [('1', '1', '=', True), + ('1', '1', '==', True), + ('1', '1', '!=', False), + ('1', '1', '>', False), + ('1', '1', '<', False), + ('1', '1', '>=', True), + ('1', '1', '<=', True), + ('1', '0', '=', False), + ('1', '0', '==', False), + ('1', '0', '!=', True), + ('1', '0', '>', True), + ('1', '0', '<', False), + ('1', '0', '>>', True), + ('1', '0', '<<', False), + ('1', '0', '>=', True), + ('1', '0', '<=', False), + ('0', '1', '=', False), + ('0', '1', '==', False), + ('0', '1', '!=', True), + ('0', '1', '>', False), + ('0', '1', '<', True), + ('0', '1', '>>', False), + ('0', '1', '<<', True), + ('0', '1', '>=', False), + ('0', '1', '<=', True)] + + for arg1, arg2, op, correctresult in compareops: + result = bb.utils.vercmp_string_op(arg1, arg2, op) + self.assertEqual(result, correctresult, 'vercmp_string_op("%s", "%s", "%s") != %s' % (arg1, arg2, op, correctresult)) + + # Check that clearly invalid operator raises an exception + self.assertRaises(bb.utils.VersionStringException, bb.utils.vercmp_string_op, '0', '0', '$') + + +class Path(unittest.TestCase): + def test_unsafe_delete_path(self): + checkitems = [('/', True), + ('//', True), + ('///', True), + (os.getcwd().count(os.sep) * ('..' + os.sep), True), + (os.environ.get('HOME', '/home/test'), True), + ('/home/someone', True), + ('/home/other/', True), + ('/home/other/subdir', False), + ('', False)] + for arg1, correctresult in checkitems: + result = bb.utils._check_unsafe_delete_path(arg1) + self.assertEqual(result, correctresult, '_check_unsafe_delete_path("%s") != %s' % (arg1, correctresult)) + + +class EditMetadataFile(unittest.TestCase): + _origfile = """ +# A comment +HELLO = "oldvalue" + +THIS = "that" + +# Another comment +NOCHANGE = "samevalue" +OTHER = 'anothervalue' + +MULTILINE = "a1 \\ + a2 \\ + a3" + +MULTILINE2 := " \\ + b1 \\ + b2 \\ + b3 \\ + " + + +MULTILINE3 = " \\ + c1 \\ + c2 \\ + c3 \\ +" + +do_functionname() { + command1 ${VAL1} ${VAL2} + command2 ${VAL3} ${VAL4} +} +""" + def _testeditfile(self, varvalues, compareto, dummyvars=None): + if dummyvars is None: + dummyvars = [] + with tempfile.NamedTemporaryFile('w', delete=False) as tf: + tf.write(self._origfile) + tf.close() + try: + varcalls = [] + def handle_file(varname, origvalue, op, newlines): + self.assertIn(varname, varvalues, 'Callback called for variable %s not in the list!' % varname) + self.assertNotIn(varname, dummyvars, 'Callback called for variable %s in dummy list!' % varname) + varcalls.append(varname) + return varvalues[varname] + + bb.utils.edit_metadata_file(tf.name, varvalues.keys(), handle_file) + with open(tf.name) as f: + modfile = f.readlines() + # Ensure the output matches the expected output + self.assertEqual(compareto.splitlines(True), modfile) + # Ensure the callback function was called for every variable we asked for + # (plus allow testing behaviour when a requested variable is not present) + self.assertEqual(sorted(varvalues.keys()), sorted(varcalls + dummyvars)) + finally: + os.remove(tf.name) + + + def test_edit_metadata_file_nochange(self): + # Test file doesn't get modified with nothing to do + self._testeditfile({}, self._origfile) + # Test file doesn't get modified with only dummy variables + self._testeditfile({'DUMMY1': ('should_not_set', None, 0, True), + 'DUMMY2': ('should_not_set_again', None, 0, True)}, self._origfile, dummyvars=['DUMMY1', 'DUMMY2']) + # Test file doesn't get modified with some the same values + self._testeditfile({'THIS': ('that', None, 0, True), + 'OTHER': ('anothervalue', None, 0, True), + 'MULTILINE3': (' c1 c2 c3 ', None, 4, False)}, self._origfile) + + def test_edit_metadata_file_1(self): + + newfile1 = """ +# A comment +HELLO = "newvalue" + +THIS = "that" + +# Another comment +NOCHANGE = "samevalue" +OTHER = 'anothervalue' + +MULTILINE = "a1 \\ + a2 \\ + a3" + +MULTILINE2 := " \\ + b1 \\ + b2 \\ + b3 \\ + " + + +MULTILINE3 = " \\ + c1 \\ + c2 \\ + c3 \\ +" + +do_functionname() { + command1 ${VAL1} ${VAL2} + command2 ${VAL3} ${VAL4} +} +""" + self._testeditfile({'HELLO': ('newvalue', None, 4, True)}, newfile1) + + + def test_edit_metadata_file_2(self): + + newfile2 = """ +# A comment +HELLO = "oldvalue" + +THIS = "that" + +# Another comment +NOCHANGE = "samevalue" +OTHER = 'anothervalue' + +MULTILINE = " \\ + d1 \\ + d2 \\ + d3 \\ + " + +MULTILINE2 := " \\ + b1 \\ + b2 \\ + b3 \\ + " + + +MULTILINE3 = "nowsingle" + +do_functionname() { + command1 ${VAL1} ${VAL2} + command2 ${VAL3} ${VAL4} +} +""" + self._testeditfile({'MULTILINE': (['d1','d2','d3'], None, 4, False), + 'MULTILINE3': ('nowsingle', None, 4, True), + 'NOTPRESENT': (['a', 'b'], None, 4, False)}, newfile2, dummyvars=['NOTPRESENT']) + + + def test_edit_metadata_file_3(self): + + newfile3 = """ +# A comment +HELLO = "oldvalue" + +# Another comment +NOCHANGE = "samevalue" +OTHER = "yetanothervalue" + +MULTILINE = "e1 \\ + e2 \\ + e3 \\ + " + +MULTILINE2 := "f1 \\ +\tf2 \\ +\t" + + +MULTILINE3 = " \\ + c1 \\ + c2 \\ + c3 \\ +" + +do_functionname() { + othercommand_one a b c + othercommand_two d e f +} +""" + + self._testeditfile({'do_functionname()': (['othercommand_one a b c', 'othercommand_two d e f'], None, 4, False), + 'MULTILINE2': (['f1', 'f2'], None, '\t', True), + 'MULTILINE': (['e1', 'e2', 'e3'], None, -1, True), + 'THIS': (None, None, 0, False), + 'OTHER': ('yetanothervalue', None, 0, True)}, newfile3) + + + def test_edit_metadata_file_4(self): + + newfile4 = """ +# A comment +HELLO = "oldvalue" + +THIS = "that" + +# Another comment +OTHER = 'anothervalue' + +MULTILINE = "a1 \\ + a2 \\ + a3" + +MULTILINE2 := " \\ + b1 \\ + b2 \\ + b3 \\ + " + + +""" + + self._testeditfile({'NOCHANGE': (None, None, 0, False), + 'MULTILINE3': (None, None, 0, False), + 'THIS': ('that', None, 0, False), + 'do_functionname()': (None, None, 0, False)}, newfile4) + + + def test_edit_metadata(self): + newfile5 = """ +# A comment +HELLO = "hithere" + +# A new comment +THIS += "that" + +# Another comment +NOCHANGE = "samevalue" +OTHER = 'anothervalue' + +MULTILINE = "a1 \\ + a2 \\ + a3" + +MULTILINE2 := " \\ + b1 \\ + b2 \\ + b3 \\ + " + + +MULTILINE3 = " \\ + c1 \\ + c2 \\ + c3 \\ +" + +NEWVAR = "value" + +do_functionname() { + command1 ${VAL1} ${VAL2} + command2 ${VAL3} ${VAL4} +} +""" + + + def handle_var(varname, origvalue, op, newlines): + if varname == 'THIS': + newlines.append('# A new comment\n') + elif varname == 'do_functionname()': + newlines.append('NEWVAR = "value"\n') + newlines.append('\n') + valueitem = varvalues.get(varname, None) + if valueitem: + return valueitem + else: + return (origvalue, op, 0, True) + + varvalues = {'HELLO': ('hithere', None, 0, True), 'THIS': ('that', '+=', 0, True)} + varlist = ['HELLO', 'THIS', 'do_functionname()'] + (updated, newlines) = bb.utils.edit_metadata(self._origfile.splitlines(True), varlist, handle_var) + self.assertTrue(updated, 'List should be updated but isn\'t') + self.assertEqual(newlines, newfile5.splitlines(True)) + + # Make sure the orig value matches what we expect it to be + def test_edit_metadata_origvalue(self): + origfile = """ +MULTILINE = " stuff \\ + morestuff" +""" + expected_value = "stuff morestuff" + global value_in_callback + value_in_callback = "" + + def handle_var(varname, origvalue, op, newlines): + global value_in_callback + value_in_callback = origvalue + return (origvalue, op, -1, False) + + bb.utils.edit_metadata(origfile.splitlines(True), + ['MULTILINE'], + handle_var) + + testvalue = re.sub('\s+', ' ', value_in_callback.strip()) + self.assertEqual(expected_value, testvalue) + +class EditBbLayersConf(unittest.TestCase): + + def _test_bblayers_edit(self, before, after, add, remove, notadded, notremoved): + with tempfile.NamedTemporaryFile('w', delete=False) as tf: + tf.write(before) + tf.close() + try: + actual_notadded, actual_notremoved = bb.utils.edit_bblayers_conf(tf.name, add, remove) + with open(tf.name) as f: + actual_after = f.readlines() + self.assertEqual(after.splitlines(True), actual_after) + self.assertEqual(notadded, actual_notadded) + self.assertEqual(notremoved, actual_notremoved) + finally: + os.remove(tf.name) + + + def test_bblayers_remove(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/subpath/layer3 \ + /home/user/path/layer4 \ + " +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/subpath/layer3 \ + /home/user/path/layer4 \ + " +""" + self._test_bblayers_edit(before, after, + None, + '/home/user/path/layer2', + [], + []) + + + def test_bblayers_add(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/subpath/layer3 \ + /home/user/path/layer4 \ + " +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/subpath/layer3 \ + /home/user/path/layer4 \ + /other/path/to/layer5 \ + " +""" + self._test_bblayers_edit(before, after, + '/other/path/to/layer5/', + None, + [], + []) + + + def test_bblayers_add_remove(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/subpath/layer3 \ + /home/user/path/layer4 \ + " +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/layer4 \ + /other/path/to/layer5 \ + " +""" + self._test_bblayers_edit(before, after, + ['/other/path/to/layer5', '/home/user/path/layer2/'], '/home/user/path/subpath/layer3/', + ['/home/user/path/layer2'], + []) + + + def test_bblayers_add_remove_home(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + ~/path/layer1 \ + ~/path/layer2 \ + ~/otherpath/layer3 \ + ~/path/layer4 \ + " +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS = " \ + ~/path/layer2 \ + ~/path/layer4 \ + ~/path2/layer5 \ + " +""" + self._test_bblayers_edit(before, after, + [os.environ['HOME'] + '/path/layer4', '~/path2/layer5'], + [os.environ['HOME'] + '/otherpath/layer3', '~/path/layer1', '~/path/notinlist'], + [os.environ['HOME'] + '/path/layer4'], + ['~/path/notinlist']) + + + def test_bblayers_add_remove_plusequals(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS += " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + " +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS += " \ + /home/user/path/layer2 \ + /home/user/path/layer3 \ + " +""" + self._test_bblayers_edit(before, after, + '/home/user/path/layer3', + '/home/user/path/layer1', + [], + []) + + + def test_bblayers_add_remove_plusequals2(self): + before = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS += " \ + /home/user/path/layer1 \ + /home/user/path/layer2 \ + /home/user/path/layer3 \ + " +BBLAYERS += "/home/user/path/layer4" +BBLAYERS += "/home/user/path/layer5" +""" + after = r""" +# A comment + +BBPATH = "${TOPDIR}" +BBFILES ?= "" +BBLAYERS += " \ + /home/user/path/layer2 \ + /home/user/path/layer3 \ + " +BBLAYERS += "/home/user/path/layer5" +BBLAYERS += "/home/user/otherpath/layer6" +""" + self._test_bblayers_edit(before, after, + ['/home/user/otherpath/layer6', '/home/user/path/layer3'], ['/home/user/path/layer1', '/home/user/path/layer4', '/home/user/path/layer7'], + ['/home/user/path/layer3'], + ['/home/user/path/layer7']) |