Source code for planemo.galaxy.invocations.api

"""API interaction for Galaxy's workflow invocation API.

Gives a mockable surface for testing, type contract consumed by Planemo,
and builtin utilities around bioblend for working around transient request
issues that have been observed in practice.
"""

from typing import (
    Dict,
    List,
    Optional,
    Protocol,
)

from typing_extensions import TypedDict

from planemo.galaxy.api import retry_on_timeouts


[docs] class InvocationStep(TypedDict, total=False): state: Optional[str] subworkflow_invocation_id: Optional[str]
[docs] class Invocation(TypedDict, total=False): id: str state: str steps: List[InvocationStep] history_id: Optional[str]
[docs] class InvocationJobsSummary(TypedDict, total=False): states: Dict[str, int]
[docs] class Job(TypedDict, total=False): id: str state: str exit_code: Optional[int] stderr: Optional[str] stdout: Optional[str] command_line: Optional[str] tool_id: Optional[str] history_id: Optional[str]
[docs] class InvocationApi(Protocol):
[docs] def get_invocation(self, invocation_id: str) -> Invocation: ...
[docs] def get_invocation_summary(self, invocation_id: str, state: Optional[str] = None) -> InvocationJobsSummary: ...
[docs] def get_invocation_jobs(self, invocation_id: str, state: Optional[str] = None) -> List[Job]: ...
[docs] def get_job(self, job_id: str, full_details: bool = False) -> Job: ...
[docs] class BioblendInvocationApi(InvocationApi): def __init__(self, ctx, user_gi): self._ctx = ctx self._user_gi = user_gi
[docs] def get_invocation(self, invocation_id: str) -> Invocation: return retry_on_timeouts(self._ctx, self._user_gi, lambda gi: gi.invocations.show_invocation(invocation_id))
[docs] def get_invocation_summary(self, invocation_id: str, state: Optional[str] = None) -> InvocationJobsSummary: return self._user_gi.invocations.get_invocation_summary(invocation_id)
[docs] def get_invocation_jobs(self, invocation_id: str, state: Optional[str] = None) -> List[Job]: return self._user_gi.jobs.get_jobs(invocation_id=invocation_id, state=state)
[docs] def get_job(self, job_id: str, full_details: bool = False) -> Job: return self._user_gi.jobs.show_job(job_id, full_details=full_details)
INVOCATION_SUCCESS_STATES = ("scheduled", "completed") INVOCATION_ERROR_STATES = ("cancelled", "failed") INVOCATION_TERMINAL_STATES = INVOCATION_SUCCESS_STATES + INVOCATION_ERROR_STATES
[docs] def invocation_state_terminal(state: str): return state in INVOCATION_TERMINAL_STATES
JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"] NON_TERMINAL_JOB_STATES = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"}