diff options
author | Dave Cobbley <david.j.cobbley@linux.intel.com> | 2018-08-14 20:05:37 +0300 |
---|---|---|
committer | Brad Bishop <bradleyb@fuzziesquirrel.com> | 2018-08-23 04:26:31 +0300 |
commit | eb8dc40360f0cfef56fb6947cc817a547d6d9bc6 (patch) | |
tree | de291a73dc37168da6370e2cf16c347d1eba9df8 /poky/meta/lib/oeqa/utils/package_manager.py | |
parent | 9c3cf826d853102535ead04cebc2d6023eff3032 (diff) | |
download | openbmc-eb8dc40360f0cfef56fb6947cc817a547d6d9bc6.tar.xz |
[Subtree] Removing import-layers directory
As part of the move to subtrees, need to bring all the import layers
content to the top level.
Change-Id: I4a163d10898cbc6e11c27f776f60e1a470049d8f
Signed-off-by: Dave Cobbley <david.j.cobbley@linux.intel.com>
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
Diffstat (limited to 'poky/meta/lib/oeqa/utils/package_manager.py')
-rw-r--r-- | poky/meta/lib/oeqa/utils/package_manager.py | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/poky/meta/lib/oeqa/utils/package_manager.py b/poky/meta/lib/oeqa/utils/package_manager.py new file mode 100644 index 000000000..afd5b8e75 --- /dev/null +++ b/poky/meta/lib/oeqa/utils/package_manager.py @@ -0,0 +1,211 @@ +import os +import json +import shutil + +from oeqa.core.utils.test import getCaseFile, getCaseMethod + +def get_package_manager(d, root_path): + """ + Returns an OE package manager that can install packages in root_path. + """ + from oe.package_manager import RpmPM, OpkgPM, DpkgPM + + pkg_class = d.getVar("IMAGE_PKGTYPE") + if pkg_class == "rpm": + pm = RpmPM(d, + root_path, + d.getVar('TARGET_VENDOR'), + filterbydependencies=False) + pm.create_configs() + + elif pkg_class == "ipk": + pm = OpkgPM(d, + root_path, + d.getVar("IPKGCONF_TARGET"), + d.getVar("ALL_MULTILIB_PACKAGE_ARCHS")) + + elif pkg_class == "deb": + pm = DpkgPM(d, + root_path, + d.getVar('PACKAGE_ARCHS'), + d.getVar('DPKG_ARCH')) + + pm.write_index() + pm.update() + + return pm + +def find_packages_to_extract(test_suite): + """ + Returns packages to extract required by runtime tests. + """ + from oeqa.core.utils.test import getSuiteCasesFiles + + needed_packages = {} + files = getSuiteCasesFiles(test_suite) + + for f in set(files): + json_file = _get_json_file(f) + if json_file: + needed_packages.update(_get_needed_packages(json_file)) + + return needed_packages + +def _get_json_file(module_path): + """ + Returns the path of the JSON file for a module, empty if doesn't exitst. + """ + + json_file = '%s.json' % module_path.rsplit('.', 1)[0] + if os.path.isfile(module_path) and os.path.isfile(json_file): + return json_file + else: + return '' + +def _get_needed_packages(json_file, test=None): + """ + Returns a dict with needed packages based on a JSON file. + + If a test is specified it will return the dict just for that test. + """ + needed_packages = {} + + with open(json_file) as f: + test_packages = json.load(f) + for key,value in test_packages.items(): + needed_packages[key] = value + + if test: + if test in needed_packages: + needed_packages = needed_packages[test] + else: + needed_packages = {} + + return needed_packages + +def extract_packages(d, needed_packages): + """ + Extract packages that will be needed during runtime. + """ + + import bb + import oe.path + + extracted_path = d.getVar('TEST_EXTRACTED_DIR') + + for key,value in needed_packages.items(): + packages = () + if isinstance(value, dict): + packages = (value, ) + elif isinstance(value, list): + packages = value + else: + bb.fatal('Failed to process needed packages for %s; ' + 'Value must be a dict or list' % key) + + for package in packages: + pkg = package['pkg'] + rm = package.get('rm', False) + extract = package.get('extract', True) + + if extract: + #logger.debug(1, 'Extracting %s' % pkg) + dst_dir = os.path.join(extracted_path, pkg) + # Same package used for more than one test, + # don't need to extract again. + if os.path.exists(dst_dir): + continue + + # Extract package and copy it to TEST_EXTRACTED_DIR + pkg_dir = _extract_in_tmpdir(d, pkg) + oe.path.copytree(pkg_dir, dst_dir) + shutil.rmtree(pkg_dir) + + else: + #logger.debug(1, 'Copying %s' % pkg) + _copy_package(d, pkg) + +def _extract_in_tmpdir(d, pkg): + """" + Returns path to a temp directory where the package was + extracted without dependencies. + """ + + from oeqa.utils.package_manager import get_package_manager + + pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) + pm = get_package_manager(d, pkg_path) + extract_dir = pm.extract(pkg) + shutil.rmtree(pkg_path) + + return extract_dir + +def _copy_package(d, pkg): + """ + Copy the RPM, DEB or IPK package to dst_dir + """ + + from oeqa.utils.package_manager import get_package_manager + + pkg_path = os.path.join(d.getVar('TEST_INSTALL_TMP_DIR'), pkg) + dst_dir = d.getVar('TEST_PACKAGED_DIR') + pm = get_package_manager(d, pkg_path) + pkg_info = pm.package_info(pkg) + file_path = pkg_info[pkg]['filepath'] + shutil.copy2(file_path, dst_dir) + shutil.rmtree(pkg_path) + +def install_package(test_case): + """ + Installs package in DUT if required. + """ + needed_packages = test_needs_package(test_case) + if needed_packages: + _install_uninstall_packages(needed_packages, test_case, True) + +def uninstall_package(test_case): + """ + Uninstalls package in DUT if required. + """ + needed_packages = test_needs_package(test_case) + if needed_packages: + _install_uninstall_packages(needed_packages, test_case, False) + +def test_needs_package(test_case): + """ + Checks if a test case requires to install/uninstall packages. + """ + test_file = getCaseFile(test_case) + json_file = _get_json_file(test_file) + + if json_file: + test_method = getCaseMethod(test_case) + needed_packages = _get_needed_packages(json_file, test_method) + if needed_packages: + return needed_packages + + return None + +def _install_uninstall_packages(needed_packages, test_case, install=True): + """ + Install/Uninstall packages in the DUT without using a package manager + """ + + if isinstance(needed_packages, dict): + packages = [needed_packages] + elif isinstance(needed_packages, list): + packages = needed_packages + + for package in packages: + pkg = package['pkg'] + rm = package.get('rm', False) + extract = package.get('extract', True) + src_dir = os.path.join(test_case.tc.extract_dir, pkg) + + # Install package + if install and extract: + test_case.tc.target.copyDirTo(src_dir, '/') + + # Uninstall package + elif not install and rm: + test_case.tc.target.deleteDirStructure(src_dir, '/') |