Source code for planemo.shed_lint

"""Logic related to linting shed repositories."""

import os
import xml.etree.ElementTree as ET

import yaml
from galaxy.tool_util.lint import lint_tool_source_with
from import rst_invalid
from galaxy.util import unicodify

from import info
from planemo.lint import (
from planemo.shed import (
from planemo.shed2tap import base
from planemo.tool_lint import (
from import yield_tool_sources
from planemo.xml import XSDS_PATH

TOOL_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "tool_dependencies.xsd")
REPO_DEPENDENCIES_XSD = os.path.join(XSDS_PATH, "repository_dependencies.xsd")



[docs]def lint_repository(ctx, realized_repository, **kwds): """Lint a realized shed repository. See :mod:`planemo.shed` for details on constructing a realized repository data structure. """ failed = False path = realized_repository.real_path info("Linting repository %s" % path) lint_args = build_tool_lint_args(ctx, **kwds) lint_args, lint_ctx = setup_lint(ctx, lint_args=lint_args, **kwds) lint_ctx.lint( "lint_expansion", lint_expansion, realized_repository, ) lint_ctx.lint( "lint_expected_files", lint_expected_files, realized_repository, ) lint_ctx.lint( "lint_tool_dependencies_xsd", lint_tool_dependencies_xsd, realized_repository, ) lint_ctx.lint( "lint_tool_dependencies_sha256sum", lint_tool_dependencies_sha256sum, realized_repository, ) lint_ctx.lint( "lint_tool_dependencies_actions", lint_tool_dependencies_actions, realized_repository, ) lint_ctx.lint( "lint_repository_dependencies", lint_repository_dependencies, realized_repository, ) lint_ctx.lint( "lint_shed_yaml", lint_shed_yaml, realized_repository, ) lint_ctx.lint( "lint_readme", lint_readme, realized_repository, ) if kwds["urls"]: lint_ctx.lint( "lint_urls", lint_tool_dependencies_urls, realized_repository, ) if kwds["tools"]: tools_failed = lint_repository_tools(ctx, realized_repository, lint_ctx, lint_args) failed = failed or tools_failed if kwds["ensure_metadata"]: lint_ctx.lint( "lint_shed_metadata", lint_shed_metadata, realized_repository, ) return handle_lint_complete(lint_ctx, lint_args, failed=failed)
def lint_repository_tools(ctx, realized_repository, lint_ctx, lint_args): path = realized_repository.path for (tool_path, tool_source) in yield_tool_sources(ctx, path, recursive=True): original_path = tool_path.replace(path, realized_repository.real_path) info("+Linting tool %s" % original_path) if handle_tool_load_error(tool_path, tool_source): return True lint_tool_source_with(lint_ctx, tool_source, extra_modules=lint_args["extra_modules"]) def lint_expansion(realized_repository, lint_ctx): missing = realized_repository.missing if missing: msg = "Failed to expand inclusions %s" % missing lint_ctx.warn(msg) else:"Included files all found.") def lint_shed_metadata(realized_repository, lint_ctx): found_all = True for key in SHED_METADATA: if key not in realized_repository.config: found_all = False lint_ctx.warn("Missing shed metadata field [%s] for repository" % key) if found_all:"Found all shed metadata fields required for automated repository " "creation and/or updates.") def lint_readme(realized_repository, lint_ctx): path = realized_repository.real_path readme_rst = os.path.join(path, "README.rst") readme = os.path.join(path, "README") readme_txt = os.path.join(path, "README.txt") readme_found = False for readme in [readme_rst, readme, readme_txt]: if os.path.exists(readme): readme_found = readme readme_md = os.path.join(path, "") if not readme_found and os.path.exists(readme_md): lint_ctx.warn("Tool Shed doesn't render markdown, " " is invalid readme.") return if not readme_found: # TODO: filter on TYPE and make this a warning if # unrestricted repository - need to update iuc standards # first though."No README found skipping.") return if readme_found.endswith(".rst"): with open(readme_found) as fh: readme_text = invalid_rst = rst_invalid(readme_text) if invalid_rst: template = "Invalid restructured text found in README [%s]." msg = template % invalid_rst lint_ctx.warn(msg) return"README found containing valid reStructuredText.") else:"README found containing plain text.") def lint_tool_dependencies_urls(realized_repository, lint_ctx): path = realized_repository.real_path tool_dependencies = os.path.join(path, "tool_dependencies.xml") if not os.path.exists(tool_dependencies):"No tool_dependencies.xml, skipping.") return root = ET.parse(tool_dependencies).getroot() lint_urls(root, lint_ctx) def lint_tool_dependencies_sha256sum(realized_repository, lint_ctx): tool_dependencies = os.path.join(realized_repository.real_path, "tool_dependencies.xml") if not os.path.exists(tool_dependencies):"No tool_dependencies.xml, skipping.") return root = ET.parse(tool_dependencies).getroot() count = 0 for action in root.findall(".//action"): assert action.tag == "action" if action.attrib.get("type", "") not in ["download_by_url", "download_file"]: continue url = action.text.strip() checksum = action.attrib.get("sha256sum", "") if not checksum: lint_ctx.warn("Missing checksum for %s" % url) elif len(checksum) != 64 or not set("0123456789abcdef").issuperset(checksum.lower()): lint_ctx.error(f"Invalid checksum {checksum!r} for {url}") else: # TODO - See planned --verify option to check it matches #"SHA256 checkum listed for %s" % url) count += 1 if count:"Found %i download action(s) with SHA256 checksums" % count) def lint_tool_dependencies_xsd(realized_repository, lint_ctx): path = realized_repository.real_path tool_dependencies = os.path.join(path, "tool_dependencies.xml") if not os.path.exists(tool_dependencies):"No tool_dependencies.xml, skipping.") return lint_xsd(lint_ctx, TOOL_DEPENDENCIES_XSD, tool_dependencies) def lint_tool_dependencies_actions(realized_repository, lint_ctx): path = realized_repository.real_path tool_dependencies = os.path.join(path, "tool_dependencies.xml") if not os.path.exists(tool_dependencies):"No tool_dependencies.xml, skipping.") return try: base.Dependencies(tool_dependencies)"Parsed tool dependencies.") except Exception as e: import sys import traceback exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback, limit=1, file=sys.stdout) traceback.print_exc() template = "Problem parsing tool_dependenies.xml [%s]" msg = template % unicodify(e) lint_ctx.warn(msg) return def lint_expected_files(realized_repository, lint_ctx): if realized_repository.is_package: if not os.path.exists(realized_repository.tool_dependencies_path): lint_ctx.warn("Package repository does not contain a " "tool_dependencies.xml file.") if realized_repository.is_suite: if not os.path.exists(realized_repository.repo_dependencies_path): lint_ctx.warn("Suite repository does not contain a " "repository_dependencies.xml file.") def lint_repository_dependencies(realized_repository, lint_ctx): path = realized_repository.real_path repo_dependencies = os.path.join(path, "repository_dependencies.xml") if not os.path.exists(repo_dependencies):"No repository_dependencies.xml, skipping.") return lint_xsd(lint_ctx, REPO_DEPENDENCIES_XSD, repo_dependencies) def lint_shed_yaml(realized_repository, lint_ctx): path = realized_repository.real_path shed_yaml = os.path.join(path, ".shed.yml") if not os.path.exists(shed_yaml):"No .shed.yml file found, skipping.") return try: with open(shed_yaml) as fh: yaml.safe_load(fh) except Exception as e: lint_ctx.warn("Failed to parse .shed.yml file [%s]" % unicodify(e)) return".shed.yml found and appears to be valid YAML.") _lint_shed_contents(lint_ctx, realized_repository) def _lint_shed_contents(lint_ctx, realized_repository): config = realized_repository.config def _lint_if_present(key, func, *args): value = config.get(key, None) if value is not None: msg = func(value, *args) if msg: lint_ctx.warn(msg) _lint_if_present("owner", validate_repo_owner) _lint_if_present("name", validate_repo_name) _lint_if_present("type", _validate_repo_type, config["name"]) _lint_if_present("categories", _validate_categories, realized_repository) def _validate_repo_type(repo_type, name): if repo_type not in VALID_REPOSITORY_TYPES: return "Invalid repository type specified [%s]" % repo_type is_dep = repo_type == "tool_dependency_definition" is_suite = repo_type == "repository_suite_definition" if is_dep and not name.startswith("package_"): return "Tool dependency definition repositories should have names " "starting with package_" if is_suite and not name.startswith("suite_"): return "Repository suite definition repositories should have names " "starting with suite_" if name.startswith("package_") or name.startswith("suite_"): if repo_type == "unrestricted": return "Repository name indicated specialized repository type " "but repository is listed as unrestricted." def _validate_categories(categories, realized_repository): msg = None if len(categories) == 0: msg = "Repository should specify one or more categories." else: for category in categories: unknown_categories = [] if category not in CURRENT_CATEGORIES: unknown_categories.append(category) if unknown_categories: msg = "Categories [%s] unknown." % unknown_categories if realized_repository.is_package: if "Tool Dependency Packages" not in categories: msg = ( "Packages should be placed and should only be placed " "in the category 'Tool Dependency Packages'." ) return msg __all__ = ("lint_repository",)