"""Module provides generic interface to running Galaxy tools and workflows."""

import os
import sys
import tempfile
import time
import traceback
from datetime import datetime
from typing import (
from urllib.parse import urljoin

import bioblend
from bioblend.galaxy import GalaxyInstance
from bioblend.util import attach_file

    from galaxy.tool_util.client.staging import StagingInterface
except ImportError:
    from galaxy.tool_util.client.staging import StagingInterace as StagingInterface

from galaxy.tool_util.cwl.util import (
from galaxy.tool_util.parser import get_tool_source
from galaxy.util import (
from pathvalidate import sanitize_filename
from requests.exceptions import (

from planemo.galaxy.api import summarize_history
from import wait_on
from planemo.runnable import (

    from planemo.cli import PlanemoCliContext
    from planemo.galaxy.config import BaseGalaxyConfig
    from planemo.runnable import RunnableOutput

    "Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
    "You may need to enable verbose logging and determine why the tool did not load. [%s]"

[docs] def execute( ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds ) -> RunResponse: """Execute a Galaxy activity.""" try: start_datetime = return _execute(ctx, config, runnable, job_path, **kwds) except Exception as e: end_datetime = ctx.log("Failed to execute Galaxy activity, throwing ErrorRunResponse") traceback.print_exc(file=sys.stdout) return ErrorRunResponse(unicodify(e), start_datetime=start_datetime, end_datetime=end_datetime)
def _verified_tool_id(runnable, user_gi): tool_id = _tool_id(runnable.path) try: except Exception as e: raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) return tool_id def _inputs_representation(runnable): if runnable.type == RunnableType.cwl_tool: inputs_representation = "cwl" else: inputs_representation = "galaxy" return inputs_representation def log_contents_str(config): if hasattr(config, "log_contents"): return config.log_contents else: return "No log for this engine type." class PlanemoStagingInterface(StagingInterface): def __init__( self, ctx: "PlanemoCliContext", runnable: Runnable, user_gi: GalaxyInstance, version_major: str, simultaneous_uploads: bool, ) -> None: self._ctx = ctx self._user_gi = user_gi self._runnable = runnable self._version_major = version_major self._simultaneous_uploads = simultaneous_uploads def _post(self, api_path: str, payload: Dict[str, Any], files_attached: bool = False) -> Dict[str, Any]: # Keep the files_attached argument because StagingInterface._post() had # it until Galaxy 22.05. url = urljoin(self._user_gi.url, "api/" + api_path) if payload.get("__files"): # put attached files where BioBlend expects them files_attached = True for k, v in payload["__files"].items(): payload[k] = v del payload["__files"] return self._user_gi.make_post_request(url, payload=payload, files_attached=files_attached) def _attach_file(self, path): return attach_file(path) def _handle_job(self, job_response): if not self._simultaneous_uploads: job_id = job_response["id"] _wait_for_job(self._user_gi, job_id) @property def use_fetch_api(self): # hack around this not working for galaxy_tools - why is that :( return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09" # extension point for planemo to override logging def _log(self, message): self._ctx.vlog(message) def _execute( # noqa C901 ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds ) -> "GalaxyBaseRunResponse": user_gi = config.user_gi admin_gi = run_response = None start_datetime = try: job_dict, history_id = stage_in(ctx, runnable, config, job_path, **kwds) except Exception: ctx.vlog("Problem with staging in data for Galaxy activities...") raise if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: response_class: Type[GalaxyBaseRunResponse] = GalaxyToolRunResponse tool_id = _verified_tool_id(runnable, user_gi) inputs_representation = _inputs_representation(runnable) run_tool_payload = dict( history_id=history_id, tool_id=tool_id, inputs=job_dict, inputs_representation=inputs_representation, ) ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) tool_run_response = if not kwds.get("no_wait"): job = tool_run_response["jobs"][0] job_id = job["id"] try: final_state = _wait_for_job(user_gi, job_id, timeout=kwds.get("test_timeout")) except Exception: summarize_history(ctx, user_gi, history_id) raise if final_state != "ok": msg = "Failed to run CWL tool job final job state is [%s]." % final_state summarize_history(ctx, user_gi, history_id) raise Exception(msg) ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) job_info = response_kwds = { "job_info": job_info, "api_run_response": tool_run_response, } if ctx.verbose: summarize_history(ctx, user_gi, history_id) elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: response_class = GalaxyWorkflowRunResponse workflow_id = config.workflow_id_for_runnable(runnable) ctx.vlog(f"Found Galaxy workflow ID [{workflow_id}] for URI [{runnable.uri}]") invocation = user_gi.workflows.invoke_workflow( workflow_id, inputs=job_dict, history_id=history_id, allow_tool_state_corrections=True, inputs_by="name", ) run_response = invocation_to_run_response( ctx, config.user_gi, runnable, invocation, polling_backoff=kwds.get("polling_backoff", 0), no_wait=kwds.get("no_wait", False), start_datetime=start_datetime, log=log_contents_str(config), ) else: raise NotImplementedError() if not run_response: run_response = response_class( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log_contents_str(config), start_datetime=start_datetime,, **response_kwds, ) if kwds.get("download_outputs"): output_directory = kwds.get("output_directory", None) ctx.vlog("collecting outputs from run...") run_response.collect_outputs(output_directory) ctx.vlog("collecting outputs complete") return run_response def invocation_to_run_response( ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None ): start_datetime = start_datetime or invocation_id = invocation["id"] history_id = invocation["history_id"] workflow_id = invocation["workflow_id"] ctx.vlog("Waiting for invocation [%s]" % invocation_id) final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( ctx, invocation_id=invocation_id, history_id=history_id, user_gi=user_gi, no_wait=no_wait, polling_backoff=polling_backoff, ) if final_invocation_state not in ("ok", "skipped", "scheduled"): msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." ctx.vlog(msg) summarize_history(ctx, user_gi, history_id) return GalaxyWorkflowRunResponse( ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_id, history_state=job_state if not no_wait else None, invocation_state=final_invocation_state, error_message=error_message, log=log, start_datetime=start_datetime,, ) def stage_in( ctx: "PlanemoCliContext", runnable: Runnable, config: "BaseGalaxyConfig", job_path: str, **kwds ) -> Tuple[Dict[str, Any], str]: # only upload objects as files/collections for CWL workflows... tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow" to_posix_lines = runnable.type.is_galaxy_artifact simultaneous_uploads = kwds.get("simultaneous_uploads", False) user_gi = config.user_gi history_id = _history_id(user_gi, **kwds) psi = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major, simultaneous_uploads) job_dict, datasets = psi.stage( tool_or_workflow, history_id=history_id, job_path=job_path, use_path_paste=config.use_path_paste, to_posix_lines=to_posix_lines, ) if datasets and kwds.get("check_uploads_ok", True): ctx.vlog(f"Uploaded datasets [{datasets}] for activity, checking history state") final_state = _wait_for_history(ctx, user_gi, history_id) else: # Mark uploads as ok because nothing to do. final_state = "ok" ctx.vlog(f"Final state is {final_state}") if final_state != "ok": msg = "Failed to upload data, upload state is [%s]." % final_state summarize_history(ctx, user_gi, history_id) raise Exception(msg) return job_dict, history_id def _file_path_to_name(file_path): if file_path is not None: name = os.path.basename(file_path) else: name = "defaultname" return name def execute_rerun( ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", rerunnable: Rerunnable, **kwds ) -> "GalaxyBaseRunResponse": rerun_successful = True user_gi = config.user_gi if rerunnable.rerunnable_type == "history": job_ids = [job["id"] for job in, state="error")] elif rerunnable.rerunnable_type == "invocation": job_ids = [job["id"] for job in, state="error")] elif rerunnable.rerunnable_type == "job": job_ids = [rerunnable.rerunnable_id] # elif rerunnable.rerunnable_type = 'collection': else: raise Exception(f"Unknown Rerunnable type {rerunnable.rerunnable_type}") for job_id in job_ids: try:, remap=True) except (ValueError, bioblend.ConnectionError): rerun_successful = False if rerunnable.rerunnable_type == "job": ctx.log(f"Job {job_id} could not be rerun with dataset remapping.") else: ctx.log( f"Job {job_id} associated with {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} " "could not be rerun with dataset remapping." ) else: if rerunnable.rerunnable_type == "job": ctx.log(f"Job {job_id} was successfully rerun.") else: ctx.log( f"Job {job_id} associated with {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} was successfully rerun." ) if not job_ids: ctx.log(f"No jobs matching the specified {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} were found.") return GalaxyBaseRunResponse( ctx=ctx, runnable=rerunnable, user_gi=user_gi, history_id=rerunnable.rerunnable_id if rerunnable.rerunnable_type == "history_id" else None, log=log_contents_str(config), successful=rerun_successful, ) class GalaxyBaseRunResponse(SuccessfulRunResponse): def __init__( self, ctx: "PlanemoCliContext", runnable, user_gi: GalaxyInstance, history_id, log, start_datetime: Optional[datetime] = None, end_datetime: Optional[datetime] = None, successful: bool = True, ) -> None: super().__init__(runnable=runnable) self._ctx = ctx self._user_gi = user_gi self._history_id = history_id self._log = log self._job_info = None self._outputs_dict: Dict[str, Optional[str]] = {} self._start_datetime = start_datetime self._end_datetime = end_datetime self._successful = successful @property def was_successful(self): return self._successful def to_galaxy_output(self, output): """Convert runnable output to a GalaxyOutput object. Subclasses for workflow and tool execution override this. """ raise NotImplementedError() @property def start_datetime(self): """Start datetime of run.""" return self._start_datetime @property def end_datetime(self): """End datetime of run.""" return self._end_datetime @property def outputs_dict(self): return self._outputs_dict def output_src(self, output: "RunnableOutput", ignore_missing_outputs: Optional[bool] = False) -> Dict[str, str]: return {} def _get_extra_files(self, dataset_details): extra_files_url = ( f"{self._user_gi.url}/histories/{self._history_id}/contents/{dataset_details['id']}/extra_files" ) extra_files = return extra_files def _get_metadata(self, history_content_type, content_id): if history_content_type == "dataset": return self._user_gi.histories.show_dataset( self._history_id, content_id, ) elif history_content_type == "dataset_collection": return self._user_gi.histories.show_dataset_collection( self._history_id, content_id, ) else: raise Exception("Unknown history content type encountered [%s]" % history_content_type) def collect_outputs( self, output_directory: Optional[str] = None, ignore_missing_output: Optional[bool] = False, output_id: Optional[str] = None, ): outputs_dict: Dict[str, Optional[str]] = {} # TODO: rather than creating a directory just use # Galaxy paths if they are available in this # configuration. output_directory = output_directory or tempfile.mkdtemp() self._ctx.log("collecting outputs to directory %s" % output_directory) for runnable_output in get_outputs(self._runnable, gi=self._user_gi): runnable_output_id = runnable_output.get_id() if not runnable_output_id: self._ctx.log("Workflow output identified without an ID (label), skipping") continue if output_id and runnable_output_id != output_id: continue def get_dataset(dataset_details, filename=None): parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or runnable_output_id) file_ext = dataset_details["file_ext"] if file_ext == "directory": # TODO: rename output_directory to outputs_directory because we can have output directories # and this is confusing... the_output_directory = os.path.join(output_directory, parent_basename) safe_makedirs(the_output_directory) destination = self.download_output_to( self._ctx, dataset_details, the_output_directory, filename=filename ) else: destination = self.download_output_to( self._ctx, dataset_details, output_directory, filename=filename ) if filename is None: basename = parent_basename else: basename = os.path.basename(filename) return {"path": destination, "basename": basename} is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool] output_src = self.output_src(runnable_output, ignore_missing_output) if not output_src: # Optional workflow output or invocation failed self._ctx.vlog(f"workflow output '{runnable_output_id}' not created, skipping") outputs_dict[runnable_output_id] = None continue output_dataset_id = output_src["id"] galaxy_output = self.to_galaxy_output(runnable_output) cwl_output = output_to_cwl_json( galaxy_output, self._get_metadata, get_dataset, self._get_extra_files, pseudo_location=True, ) output_dict_value = None if is_cwl or output_src["src"] == "hda": output_dict_value = cwl_output else: def attach_file_properties(collection, cwl_output): elements = collection["elements"] assert len(elements) == len(cwl_output) for element, cwl_output_element in zip(elements, cwl_output): element["_output_object"] = cwl_output_element if isinstance(cwl_output_element, list): assert "elements" in element["object"] attach_file_properties(element["object"], cwl_output_element) output_metadata = self._get_metadata("dataset_collection", output_dataset_id) attach_file_properties(output_metadata, cwl_output) output_dict_value = output_metadata if output_id: return output_dict_value outputs_dict[runnable_output_id] = output_dict_value self._outputs_dict = outputs_dict self._ctx.vlog("collected outputs [%s]" % self._outputs_dict) @property def log(self): return self._log @property def job_info(self): print(self._job_info) if self._job_info is not None: return dict( stdout=self._job_info.get("stdout"), stderr=self._job_info.get("stderr"), command_line=self._job_info.get("command_line"), ) return None @property def invocation_details(self): return None def get_output(self, output_id): if output_id not in self._outputs_dict: self._outputs_dict[output_id] = self.collect_outputs(ignore_missing_output=True, output_id=output_id) return self._outputs_dict[output_id] def download_output_to(self, ctx, dataset_details, output_directory, filename=None): if filename is None: local_filename = f'{sanitize_filename(dataset_details.get("cwl_file_name") or dataset_details.get("name"))}__{dataset_details["uuid"]}' else: local_filename = filename destination = os.path.join(output_directory, local_filename) self._history_content_download( ctx, self._history_id, dataset_details["id"], to_path=destination, filename=filename, ) return destination def _history_content_download(self, ctx, history_id, dataset_id, to_path, filename=None): user_gi = self._user_gi url = f"{user_gi.url}/histories/{history_id}/contents/{dataset_id}/display" data = {} if filename: data["filename"] = filename r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout) # Do write an output file before anything could fail, so that the downstream # data collection doesn't fail. Comment below still applies. with open(to_path, "wb") as fp: try: r.raise_for_status() except HTTPError as e: # When a job fails abruptly the object store may not contain a dataset, # and that results in an internal server error on the Galaxy side. # We don't want this to break the rest of the test report. # Should probably find a way to propagate this back into the report. ctx.log(f"Failed to download history content at URL {url}, exception: {e}") return for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): if chunk: fp.write(chunk) class GalaxyToolRunResponse(GalaxyBaseRunResponse): def __init__( self, ctx, runnable, user_gi, history_id, log, job_info, api_run_response, start_datetime=None, end_datetime=None, ): super().__init__( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log, start_datetime=start_datetime, end_datetime=end_datetime, ) self._job_info = job_info self.api_run_response = api_run_response def is_collection(self, output): # TODO: Make this more rigorous - search both output and output # collections - throw an exception if not found in either place instead # of just assuming all non-datasets are collections. return self.output_src(output)["src"] == "hdca" def to_galaxy_output(self, runnable_output): output_id = runnable_output.get_id() return tool_response_to_output(self.api_run_response, self._history_id, output_id) def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): outputs = self.api_run_response["outputs"] output_collections = self.api_run_response["output_collections"] output_id = output.get_id() output_src = None self._ctx.vlog(f"Looking for id [{output_id}] in outputs [{outputs}]") for output in outputs: if output["output_name"] == output_id: output_src = {"src": "hda", "id": output["id"]} for output_collection in output_collections: if output_collection["output_name"] == output_id: output_src = {"src": "hdca", "id": output_collection["id"]} return output_src class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): def __init__( self, ctx, runnable, user_gi, history_id, log, workflow_id, invocation_id, history_state="ok", invocation_state="ok", error_message=None, start_datetime=None, end_datetime=None, ): super().__init__( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log, start_datetime=start_datetime, end_datetime=end_datetime, ) self._workflow_id = workflow_id self._invocation_id = invocation_id self._invocation_details = {} self._cached_invocation = None self.history_state = history_state self.invocation_state = invocation_state self.error_message = error_message self._invocation_details = self.collect_invocation_details(invocation_id) def to_galaxy_output(self, runnable_output): output_id = runnable_output.get_id() self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) return invocation_to_output(self._invocation, self._history_id, output_id) def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): invocation = self._invocation # Use newer workflow outputs API. output_name = output.get_id() if output_name in invocation["outputs"]: return invocation["outputs"][output.get_id()] elif output_name in invocation["output_collections"]: return invocation["output_collections"][output.get_id()] elif output.is_optional(): return None elif ignore_missing_outputs: # We don't need to check this in testing mode, we'll get an error through failed invocation and failed history anyway return None else: raise Exception(f"Failed to find output [{output_name}] in invocation outputs [{invocation['outputs']}]") def collect_invocation_details(self, invocation_id=None): gi = self._user_gi invocation_steps = {} invocation = self.get_invocation(invocation_id) for step in invocation["steps"]: step_label_or_index = f"{step['order_index']}. {step['workflow_step_label'] or 'Unnamed step'}" workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step["id"]) workflow_step["subworkflow"] = None subworkflow_invocation_id = workflow_step.get("subworkflow_invocation_id") if subworkflow_invocation_id: workflow_step["subworkflow"] = self.collect_invocation_details(subworkflow_invocation_id) workflow_step_job_details = [["id"], full_details=True) for j in workflow_step["jobs"] ] workflow_step["jobs"] = workflow_step_job_details invocation_steps[step_label_or_index] = workflow_step invocation_details = { "steps": invocation_steps, "details": { "invocation_id": self._invocation_id, "history_id": self._history_id, "workflow_id": self._workflow_id, "invocation_state": self.invocation_state, "history_state": self.history_state, "error_message": self.error_message, # Messages are only present from 23.0 onward "messages": invocation.get("messages", []), }, } return invocation_details @property def invocation_details(self): return self._invocation_details def get_invocation(self, invocation_id): return self._user_gi.invocations.show_invocation(invocation_id) @property def _invocation(self): if self._cached_invocation is None: self._cached_invocation = self.get_invocation(self._invocation_id) return self._cached_invocation @property def was_successful(self): return self.history_state in ["ok", "skipped", None] and self.invocation_state == "scheduled" def _tool_id(tool_path): tool_source = get_tool_source(tool_path) return tool_source.parse_id() def _history_id(gi, **kwds) -> str: history_id = kwds.get("history_id") if history_id is None: history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME history_id = gi.histories.create_history(history_name)["id"] tags_str = kwds.get("tags") if tags_str: tags = tags_str.split(",") gi.histories.update_history(history_id, tags=tags) return history_id def wait_for_invocation_and_jobs( ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int ): ctx.vlog("Waiting for invocation [%s]" % invocation_id) final_invocation_state = "new" # TODO: hook in invocation["messages"] error_message = "" job_state = "ok" try: final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) assert final_invocation_state == "scheduled" except Exception as e: ctx.vlog(f"Problem waiting on invocation: {str(e)}") summarize_history(ctx, user_gi, history_id) error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") if not no_wait: job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) if job_state not in ("ok", "skipped"): msg = f"Failed to run workflow, at least one job is in [{job_state}] state." error_message = msg if not error_message else f"{error_message}. {msg}" else: # wait for possible subworkflow invocations invocation = user_gi.invocations.show_invocation(invocation_id) for step in invocation["steps"]: if step.get("subworkflow_invocation_id") is not None: final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( ctx, invocation_id=step["subworkflow_invocation_id"], history_id=history_id, user_gi=user_gi, no_wait=no_wait, polling_backoff=polling_backoff, ) if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): return final_invocation_state, job_state, error_message ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") return final_invocation_state, job_state, error_message def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) return _wait_on_state(state_func, polling_backoff) def _retry_on_timeouts(ctx, gi, f): gi.timeout = 60 try_count = 5 try: for try_num in range(try_count): start_time = time.time() try: return f(gi) except RequestException: end_time = time.time() if end_time - start_time > 45 and (try_num + 1) < try_count: ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") continue else: raise finally: gi.timeout = None def has_jobs_in_states(ctx, gi, history_id, states): params = {"history_id": history_id} jobs_url = gi.url + "/jobs" jobs =, params=params) target_jobs = [j for j in jobs if j["state"] in states] return len(target_jobs) > 0 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): # Used to wait for active jobs and then wait for history, but now this is called # after upload is complete and after the invocation has been done scheduling - so # no need to wait for active jobs anymore I think. def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) return _wait_on_state(state_func, polling_backoff) def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0): # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish, # as you could have more than one invocation in a history, or an invocation without # steps that produce history items. ctx.log(f"waiting for invocation {invocation_id}") def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: return _wait_on_state(state_func, polling_backoff) def _wait_for_job(gi, job_id, timeout=None): def state_func(): return, full_details=True) return _wait_on_state(state_func, timeout=timeout) def _wait_on_state(state_func, polling_backoff=0, timeout=None): def get_state(): response = state_func() if not isinstance(response, list): response = [response] if not response: # invocation may not have any attached jobs, that's fine return "ok" non_terminal_states = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"} current_states = set(item["state"] for item in response) current_non_terminal_states = non_terminal_states.intersection(current_states) # Mix of "error"-ish terminal job, dataset, invocation terminal states, so we can use this for whatever we throw at it hierarchical_fail_states = [ "error", "paused", "deleted", "stopped", "discarded", "failed_metadata", "cancelled", "failed", ] for terminal_state in hierarchical_fail_states: if terminal_state in current_states: # If we got here something has failed and we can return (early) return terminal_state if current_non_terminal_states: return None if len(current_states) > 1: current_states = current_states - {"skipped"} assert len(current_states) == 1, f"unexpected state(s) found: {current_states}" return current_states.pop() timeout = timeout or 60 * 60 * 24 final_state = wait_on(get_state, "state", timeout, polling_backoff) return final_state __all__ = ("execute",)