Source code for planemo.engine.interface

"""Module contianing the :class:`Engine` abstraction."""

import abc
import json
import os
import tempfile
from typing import (
    Callable,
    List,
    Optional,
)

from planemo.exit_codes import EXIT_CODE_UNSUPPORTED_FILE_TYPE
from planemo.io import error
from planemo.runnable import (
    cases,
    RunnableType,
)
from planemo.test.results import StructuredData


[docs] class Engine(metaclass=abc.ABCMeta): """Abstract description of an external process for running tools or workflows."""
[docs] @abc.abstractmethod def run(self, path, job_path): """Run a job using a compatible artifact (workflow or tool)."""
[docs] @abc.abstractmethod def cleanup(self): """Release any resources used to run/test with this engine."""
[docs] @abc.abstractmethod def test(self, runnables): """Test runnable artifacts (workflow or tool)."""
[docs] class BaseEngine(Engine): """Base class providing context and keywords for Engine implementations.""" handled_runnable_types: List[RunnableType] = [] def __init__(self, ctx, **kwds): """Store context and kwds.""" self._ctx = ctx self._kwds = kwds
[docs] def can_run(self, runnable): """Use subclass's ``handled_runnable_types`` variable to infer ``can_run``.""" return runnable.type in self.handled_runnable_types
[docs] def cleanup(self): """Default no-op cleanup method."""
[docs] def run(self, runnables, job_paths, output_collectors: Optional[List[Callable]] = None): """Run a job using a compatible artifact (workflow or tool).""" self._check_can_run_all(runnables) run_responses = self._run(runnables, job_paths, output_collectors) return run_responses
@abc.abstractmethod def _run(self, runnables, job_path, output_collectors: Optional[List[Callable]] = None): """Run a job using a compatible artifact (workflow or tool) wrapped as a runnable.""" def _check_can_run(self, runnable): if not self.can_run(runnable): template = "Engine type [%s] cannot execute [%s]s" message = template % (self.__class__, runnable.type) error(message) self._ctx.exit(EXIT_CODE_UNSUPPORTED_FILE_TYPE) def _check_can_run_all(self, runnables): for runnable in runnables: self._check_can_run(runnable)
[docs] def test(self, runnables, test_timeout): """Test runnable artifacts (workflow or tool).""" self._check_can_run_all(runnables) test_cases = [t for tl in map(cases, runnables) for t in tl] test_results = self._collect_test_results(test_cases, test_timeout) tests = [] for test_case, run_response in test_results: test_case_data = test_case.structured_test_data(run_response) tests.append(test_case_data) test_data = { "version": "0.1", "tests": tests, } structured_results = StructuredData(data=test_data) structured_results.calculate_summary_data() return structured_results
def _collect_test_results(self, test_cases, test_timeout): run_responses = self._run_test_cases(test_cases, test_timeout) return [(test_case, run_response) for test_case, run_response in zip(test_cases, run_responses)] def _run_test_cases(self, test_cases, test_timeout): runnables = [test_case.runnable for test_case in test_cases] job_paths = [] tmp_paths = [] output_collectors = [] for test_case in test_cases: if test_case.job_path is None: job = test_case.job with tempfile.NamedTemporaryFile( dir=test_case.tests_directory, suffix=".json", prefix="plnmotmptestjob", delete=False, mode="w+", ) as f: tmp_path = f.name job_path = tmp_path tmp_paths.append(tmp_path) json.dump(job, f) job_paths.append(job_path) else: job_paths.append(test_case.job_path) output_collectors.append( lambda run_response, test_case=test_case: test_case.structured_test_data(run_response) ) try: run_responses = self._run(runnables, job_paths, output_collectors) finally: for tmp_path in tmp_paths: os.remove(tmp_path) return run_responses
__all__ = ( "Engine", "BaseEngine", )