Source code for planemo.galaxy.activity

"""Module provides generic interface to running Galaxy tools and workflows."""

import os
import sys
import tempfile
import time
import traceback
from datetime import datetime
from typing import (
    Any,
    Dict,
    Optional,
    Tuple,
    Type,
    TYPE_CHECKING,
)
from urllib.parse import urljoin

import bioblend
from bioblend.galaxy import GalaxyInstance
from bioblend.util import attach_file

try:
    from galaxy.tool_util.client.staging import StagingInterface
except ImportError:
    from galaxy.tool_util.client.staging import StagingInterace as StagingInterface

from galaxy.tool_util.cwl.util import (
    invocation_to_output,
    output_to_cwl_json,
    tool_response_to_output,
)
from galaxy.tool_util.parser import get_tool_source
from galaxy.util import (
    safe_makedirs,
    unicodify,
)
from pathvalidate import sanitize_filename
from requests.exceptions import (
    HTTPError,
    RequestException,
)

from planemo.galaxy.api import summarize_history
from planemo.io import wait_on
from planemo.runnable import (
    ErrorRunResponse,
    get_outputs,
    Rerunnable,
    Runnable,
    RunnableType,
    RunResponse,
    SuccessfulRunResponse,
)

if TYPE_CHECKING:
    from planemo.cli import PlanemoCliContext
    from planemo.galaxy.config import BaseGalaxyConfig
    from planemo.runnable import RunnableOutput

DEFAULT_HISTORY_NAME = "CWL Target History"
ERR_NO_SUCH_TOOL = (
    "Failed to find tool with ID [%s] in Galaxy - cannot execute job. "
    "You may need to enable verbose logging and determine why the tool did not load. [%s]"
)


[docs] def execute( ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds ) -> RunResponse: """Execute a Galaxy activity.""" try: start_datetime = datetime.now() return _execute(ctx, config, runnable, job_path, **kwds) except Exception as e: end_datetime = datetime.now() ctx.log("Failed to execute Galaxy activity, throwing ErrorRunResponse") traceback.print_exc(file=sys.stdout) return ErrorRunResponse(unicodify(e), start_datetime=start_datetime, end_datetime=end_datetime)
def _verified_tool_id(runnable, user_gi): tool_id = _tool_id(runnable.path) try: user_gi.tools.show_tool(tool_id) except Exception as e: raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) return tool_id def _inputs_representation(runnable): if runnable.type == RunnableType.cwl_tool: inputs_representation = "cwl" else: inputs_representation = "galaxy" return inputs_representation def log_contents_str(config): if hasattr(config, "log_contents"): return config.log_contents else: return "No log for this engine type." class PlanemoStagingInterface(StagingInterface): def __init__( self, ctx: "PlanemoCliContext", runnable: Runnable, user_gi: GalaxyInstance, version_major: str, simultaneous_uploads: bool, ) -> None: self._ctx = ctx self._user_gi = user_gi self._runnable = runnable self._version_major = version_major self._simultaneous_uploads = simultaneous_uploads def _post(self, api_path: str, payload: Dict[str, Any], files_attached: bool = False) -> Dict[str, Any]: # Keep the files_attached argument because StagingInterface._post() had # it until Galaxy 22.05. url = urljoin(self._user_gi.url, "api/" + api_path) if payload.get("__files"): # put attached files where BioBlend expects them files_attached = True for k, v in payload["__files"].items(): payload[k] = v del payload["__files"] return self._user_gi.make_post_request(url, payload=payload, files_attached=files_attached) def _attach_file(self, path): return attach_file(path) def _handle_job(self, job_response): if not self._simultaneous_uploads: job_id = job_response["id"] _wait_for_job(self._user_gi, job_id) @property def use_fetch_api(self): # hack around this not working for galaxy_tools - why is that :( return self._runnable.type != RunnableType.galaxy_tool and self._version_major >= "20.09" # extension point for planemo to override logging def _log(self, message): self._ctx.vlog(message) def _execute( # noqa C901 ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds ) -> "GalaxyBaseRunResponse": user_gi = config.user_gi admin_gi = config.gi run_response = None start_datetime = datetime.now() try: job_dict, history_id = stage_in(ctx, runnable, config, job_path, **kwds) except Exception: ctx.vlog("Problem with staging in data for Galaxy activities...") raise if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: response_class: Type[GalaxyBaseRunResponse] = GalaxyToolRunResponse tool_id = _verified_tool_id(runnable, user_gi) inputs_representation = _inputs_representation(runnable) run_tool_payload = dict( history_id=history_id, tool_id=tool_id, inputs=job_dict, inputs_representation=inputs_representation, ) ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) tool_run_response = user_gi.tools._post(run_tool_payload) if not kwds.get("no_wait"): job = tool_run_response["jobs"][0] job_id = job["id"] try: final_state = _wait_for_job(user_gi, job_id, timeout=kwds.get("test_timeout")) except Exception: summarize_history(ctx, user_gi, history_id) raise if final_state != "ok": msg = "Failed to run CWL tool job final job state is [%s]." % final_state summarize_history(ctx, user_gi, history_id) raise Exception(msg) ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) job_info = admin_gi.jobs.show_job(job_id) response_kwds = { "job_info": job_info, "api_run_response": tool_run_response, } if ctx.verbose: summarize_history(ctx, user_gi, history_id) elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: response_class = GalaxyWorkflowRunResponse workflow_id = config.workflow_id_for_runnable(runnable) ctx.vlog(f"Found Galaxy workflow ID [{workflow_id}] for URI [{runnable.uri}]") invocation = user_gi.workflows.invoke_workflow( workflow_id, inputs=job_dict, history_id=history_id, allow_tool_state_corrections=True, inputs_by="name", ) run_response = invocation_to_run_response( ctx, config.user_gi, runnable, invocation, polling_backoff=kwds.get("polling_backoff", 0), no_wait=kwds.get("no_wait", False), start_datetime=start_datetime, log=log_contents_str(config), ) else: raise NotImplementedError() if not run_response: run_response = response_class( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log_contents_str(config), start_datetime=start_datetime, end_datetime=datetime.now(), **response_kwds, ) if kwds.get("download_outputs"): output_directory = kwds.get("output_directory", None) ctx.vlog("collecting outputs from run...") run_response.collect_outputs(output_directory) ctx.vlog("collecting outputs complete") return run_response def invocation_to_run_response( ctx, user_gi, runnable, invocation, polling_backoff=0, no_wait=False, start_datetime=None, log=None ): start_datetime = start_datetime or datetime.now() invocation_id = invocation["id"] history_id = invocation["history_id"] workflow_id = invocation["workflow_id"] ctx.vlog("Waiting for invocation [%s]" % invocation_id) final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( ctx, invocation_id=invocation_id, history_id=history_id, user_gi=user_gi, no_wait=no_wait, polling_backoff=polling_backoff, ) if final_invocation_state not in ("ok", "skipped", "scheduled"): msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." ctx.vlog(msg) summarize_history(ctx, user_gi, history_id) return GalaxyWorkflowRunResponse( ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, workflow_id=workflow_id, invocation_id=invocation_id, history_state=job_state if not no_wait else None, invocation_state=final_invocation_state, error_message=error_message, log=log, start_datetime=start_datetime, end_datetime=datetime.now(), ) def stage_in( ctx: "PlanemoCliContext", runnable: Runnable, config: "BaseGalaxyConfig", job_path: str, **kwds ) -> Tuple[Dict[str, Any], str]: # only upload objects as files/collections for CWL workflows... tool_or_workflow = "tool" if runnable.type != RunnableType.cwl_workflow else "workflow" to_posix_lines = runnable.type.is_galaxy_artifact simultaneous_uploads = kwds.get("simultaneous_uploads", False) user_gi = config.user_gi history_id = _history_id(user_gi, **kwds) psi = PlanemoStagingInterface(ctx, runnable, user_gi, config.version_major, simultaneous_uploads) job_dict, datasets = psi.stage( tool_or_workflow, history_id=history_id, job_path=job_path, use_path_paste=config.use_path_paste, to_posix_lines=to_posix_lines, ) if datasets and kwds.get("check_uploads_ok", True): ctx.vlog(f"Uploaded datasets [{datasets}] for activity, checking history state") final_state = _wait_for_history(ctx, user_gi, history_id) else: # Mark uploads as ok because nothing to do. final_state = "ok" ctx.vlog(f"Final state is {final_state}") if final_state != "ok": msg = "Failed to upload data, upload state is [%s]." % final_state summarize_history(ctx, user_gi, history_id) raise Exception(msg) return job_dict, history_id def _file_path_to_name(file_path): if file_path is not None: name = os.path.basename(file_path) else: name = "defaultname" return name def execute_rerun( ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", rerunnable: Rerunnable, **kwds ) -> "GalaxyBaseRunResponse": rerun_successful = True user_gi = config.user_gi if rerunnable.rerunnable_type == "history": job_ids = [job["id"] for job in user_gi.jobs.get_jobs(history_id=rerunnable.rerunnable_id, state="error")] elif rerunnable.rerunnable_type == "invocation": job_ids = [job["id"] for job in user_gi.jobs.get_jobs(invocation_id=rerunnable.rerunnable_id, state="error")] elif rerunnable.rerunnable_type == "job": job_ids = [rerunnable.rerunnable_id] # elif rerunnable.rerunnable_type = 'collection': else: raise Exception(f"Unknown Rerunnable type {rerunnable.rerunnable_type}") for job_id in job_ids: try: user_gi.jobs.rerun_job(job_id, remap=True) except (ValueError, bioblend.ConnectionError): rerun_successful = False if rerunnable.rerunnable_type == "job": ctx.log(f"Job {job_id} could not be rerun with dataset remapping.") else: ctx.log( f"Job {job_id} associated with {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} " "could not be rerun with dataset remapping." ) else: if rerunnable.rerunnable_type == "job": ctx.log(f"Job {job_id} was successfully rerun.") else: ctx.log( f"Job {job_id} associated with {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} was successfully rerun." ) if not job_ids: ctx.log(f"No jobs matching the specified {rerunnable.rerunnable_type} {rerunnable.rerunnable_id} were found.") return GalaxyBaseRunResponse( ctx=ctx, runnable=rerunnable, user_gi=user_gi, history_id=rerunnable.rerunnable_id if rerunnable.rerunnable_type == "history_id" else None, log=log_contents_str(config), successful=rerun_successful, ) class GalaxyBaseRunResponse(SuccessfulRunResponse): def __init__( self, ctx: "PlanemoCliContext", runnable, user_gi: GalaxyInstance, history_id, log, start_datetime: Optional[datetime] = None, end_datetime: Optional[datetime] = None, successful: bool = True, ) -> None: super().__init__(runnable=runnable) self._ctx = ctx self._user_gi = user_gi self._history_id = history_id self._log = log self._job_info = None self._outputs_dict: Dict[str, Optional[str]] = {} self._start_datetime = start_datetime self._end_datetime = end_datetime self._successful = successful @property def was_successful(self): return self._successful def to_galaxy_output(self, output): """Convert runnable output to a GalaxyOutput object. Subclasses for workflow and tool execution override this. """ raise NotImplementedError() @property def start_datetime(self): """Start datetime of run.""" return self._start_datetime @property def end_datetime(self): """End datetime of run.""" return self._end_datetime @property def outputs_dict(self): return self._outputs_dict def output_src(self, output: "RunnableOutput", ignore_missing_outputs: Optional[bool] = False) -> Dict[str, str]: return {} def _get_extra_files(self, dataset_details): extra_files_url = ( f"{self._user_gi.url}/histories/{self._history_id}/contents/{dataset_details['id']}/extra_files" ) extra_files = self._user_gi.jobs._get(url=extra_files_url) return extra_files def _get_metadata(self, history_content_type, content_id): if history_content_type == "dataset": return self._user_gi.histories.show_dataset( self._history_id, content_id, ) elif history_content_type == "dataset_collection": return self._user_gi.histories.show_dataset_collection( self._history_id, content_id, ) else: raise Exception("Unknown history content type encountered [%s]" % history_content_type) def collect_outputs( self, output_directory: Optional[str] = None, ignore_missing_output: Optional[bool] = False, output_id: Optional[str] = None, ): outputs_dict: Dict[str, Optional[str]] = {} # TODO: rather than creating a directory just use # Galaxy paths if they are available in this # configuration. output_directory = output_directory or tempfile.mkdtemp() self._ctx.log("collecting outputs to directory %s" % output_directory) for runnable_output in get_outputs(self._runnable, gi=self._user_gi): runnable_output_id = runnable_output.get_id() if not runnable_output_id: self._ctx.log("Workflow output identified without an ID (label), skipping") continue if output_id and runnable_output_id != output_id: continue def get_dataset(dataset_details, filename=None): parent_basename = sanitize_filename(dataset_details.get("cwl_file_name") or runnable_output_id) file_ext = dataset_details["file_ext"] if file_ext == "directory": # TODO: rename output_directory to outputs_directory because we can have output directories # and this is confusing... the_output_directory = os.path.join(output_directory, parent_basename) safe_makedirs(the_output_directory) destination = self.download_output_to( self._ctx, dataset_details, the_output_directory, filename=filename ) else: destination = self.download_output_to( self._ctx, dataset_details, output_directory, filename=filename ) if filename is None: basename = parent_basename else: basename = os.path.basename(filename) return {"path": destination, "basename": basename} is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool] output_src = self.output_src(runnable_output, ignore_missing_output) if not output_src: # Optional workflow output or invocation failed self._ctx.vlog(f"workflow output '{runnable_output_id}' not created, skipping") outputs_dict[runnable_output_id] = None continue output_dataset_id = output_src["id"] galaxy_output = self.to_galaxy_output(runnable_output) cwl_output = output_to_cwl_json( galaxy_output, self._get_metadata, get_dataset, self._get_extra_files, pseudo_location=True, ) output_dict_value = None if is_cwl or output_src["src"] == "hda": output_dict_value = cwl_output else: def attach_file_properties(collection, cwl_output): elements = collection["elements"] assert len(elements) == len(cwl_output) for element, cwl_output_element in zip(elements, cwl_output): element["_output_object"] = cwl_output_element if isinstance(cwl_output_element, list): assert "elements" in element["object"] attach_file_properties(element["object"], cwl_output_element) output_metadata = self._get_metadata("dataset_collection", output_dataset_id) attach_file_properties(output_metadata, cwl_output) output_dict_value = output_metadata if output_id: return output_dict_value outputs_dict[runnable_output_id] = output_dict_value self._outputs_dict = outputs_dict self._ctx.vlog("collected outputs [%s]" % self._outputs_dict) @property def log(self): return self._log @property def job_info(self): print(self._job_info) if self._job_info is not None: return dict( stdout=self._job_info.get("stdout"), stderr=self._job_info.get("stderr"), command_line=self._job_info.get("command_line"), ) return None @property def invocation_details(self): return None def get_output(self, output_id): if output_id not in self._outputs_dict: self._outputs_dict[output_id] = self.collect_outputs(ignore_missing_output=True, output_id=output_id) return self._outputs_dict[output_id] def download_output_to(self, ctx, dataset_details, output_directory, filename=None): if filename is None: local_filename = f'{sanitize_filename(dataset_details.get("cwl_file_name") or dataset_details.get("name"))}__{dataset_details["uuid"]}' else: local_filename = filename destination = os.path.join(output_directory, local_filename) self._history_content_download( ctx, self._history_id, dataset_details["id"], to_path=destination, filename=filename, ) return destination def _history_content_download(self, ctx, history_id, dataset_id, to_path, filename=None): user_gi = self._user_gi url = f"{user_gi.url}/histories/{history_id}/contents/{dataset_id}/display" data = {} if filename: data["filename"] = filename r = user_gi.make_get_request(url, params=data, stream=True, timeout=user_gi.timeout) # Do write an output file before anything could fail, so that the downstream # data collection doesn't fail. Comment below still applies. with open(to_path, "wb") as fp: try: r.raise_for_status() except HTTPError as e: # When a job fails abruptly the object store may not contain a dataset, # and that results in an internal server error on the Galaxy side. # We don't want this to break the rest of the test report. # Should probably find a way to propagate this back into the report. ctx.log(f"Failed to download history content at URL {url}, exception: {e}") return for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): if chunk: fp.write(chunk) class GalaxyToolRunResponse(GalaxyBaseRunResponse): def __init__( self, ctx, runnable, user_gi, history_id, log, job_info, api_run_response, start_datetime=None, end_datetime=None, ): super().__init__( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log, start_datetime=start_datetime, end_datetime=end_datetime, ) self._job_info = job_info self.api_run_response = api_run_response def is_collection(self, output): # TODO: Make this more rigorous - search both output and output # collections - throw an exception if not found in either place instead # of just assuming all non-datasets are collections. return self.output_src(output)["src"] == "hdca" def to_galaxy_output(self, runnable_output): output_id = runnable_output.get_id() return tool_response_to_output(self.api_run_response, self._history_id, output_id) def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): outputs = self.api_run_response["outputs"] output_collections = self.api_run_response["output_collections"] output_id = output.get_id() output_src = None self._ctx.vlog(f"Looking for id [{output_id}] in outputs [{outputs}]") for output in outputs: if output["output_name"] == output_id: output_src = {"src": "hda", "id": output["id"]} for output_collection in output_collections: if output_collection["output_name"] == output_id: output_src = {"src": "hdca", "id": output_collection["id"]} return output_src class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): def __init__( self, ctx, runnable, user_gi, history_id, log, workflow_id, invocation_id, history_state="ok", invocation_state="ok", error_message=None, start_datetime=None, end_datetime=None, ): super().__init__( ctx=ctx, runnable=runnable, user_gi=user_gi, history_id=history_id, log=log, start_datetime=start_datetime, end_datetime=end_datetime, ) self._workflow_id = workflow_id self._invocation_id = invocation_id self._invocation_details = {} self._cached_invocation = None self.history_state = history_state self.invocation_state = invocation_state self.error_message = error_message self._invocation_details = self.collect_invocation_details(invocation_id) def to_galaxy_output(self, runnable_output): output_id = runnable_output.get_id() self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) return invocation_to_output(self._invocation, self._history_id, output_id) def output_src(self, output, ignore_missing_outputs: Optional[bool] = False): invocation = self._invocation # Use newer workflow outputs API. output_name = output.get_id() if output_name in invocation["outputs"]: return invocation["outputs"][output.get_id()] elif output_name in invocation["output_collections"]: return invocation["output_collections"][output.get_id()] elif output.is_optional(): return None elif ignore_missing_outputs: # We don't need to check this in testing mode, we'll get an error through failed invocation and failed history anyway return None else: raise Exception(f"Failed to find output [{output_name}] in invocation outputs [{invocation['outputs']}]") def collect_invocation_details(self, invocation_id=None): gi = self._user_gi invocation_steps = {} invocation = self.get_invocation(invocation_id) for step in invocation["steps"]: step_label_or_index = f"{step['order_index']}. {step['workflow_step_label'] or 'Unnamed step'}" workflow_step = gi.invocations.show_invocation_step(self._invocation_id, step["id"]) workflow_step["subworkflow"] = None subworkflow_invocation_id = workflow_step.get("subworkflow_invocation_id") if subworkflow_invocation_id: workflow_step["subworkflow"] = self.collect_invocation_details(subworkflow_invocation_id) workflow_step_job_details = [ self._user_gi.jobs.show_job(j["id"], full_details=True) for j in workflow_step["jobs"] ] workflow_step["jobs"] = workflow_step_job_details invocation_steps[step_label_or_index] = workflow_step invocation_details = { "steps": invocation_steps, "details": { "invocation_id": self._invocation_id, "history_id": self._history_id, "workflow_id": self._workflow_id, "invocation_state": self.invocation_state, "history_state": self.history_state, "error_message": self.error_message, # Messages are only present from 23.0 onward "messages": invocation.get("messages", []), }, } return invocation_details @property def invocation_details(self): return self._invocation_details def get_invocation(self, invocation_id): return self._user_gi.invocations.show_invocation(invocation_id) @property def _invocation(self): if self._cached_invocation is None: self._cached_invocation = self.get_invocation(self._invocation_id) return self._cached_invocation @property def was_successful(self): return self.history_state in ["ok", "skipped", None] and self.invocation_state == "scheduled" def _tool_id(tool_path): tool_source = get_tool_source(tool_path) return tool_source.parse_id() def _history_id(gi, **kwds) -> str: history_id = kwds.get("history_id") if history_id is None: history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) or DEFAULT_HISTORY_NAME history_id = gi.histories.create_history(history_name)["id"] tags_str = kwds.get("tags") if tags_str: tags = tags_str.split(",") gi.histories.update_history(history_id, tags=tags) return history_id def wait_for_invocation_and_jobs( ctx, invocation_id: str, history_id: str, user_gi: GalaxyInstance, no_wait: bool, polling_backoff: int ): ctx.vlog("Waiting for invocation [%s]" % invocation_id) final_invocation_state = "new" # TODO: hook in invocation["messages"] error_message = "" job_state = "ok" try: final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) assert final_invocation_state == "scheduled" except Exception as e: ctx.vlog(f"Problem waiting on invocation: {str(e)}") summarize_history(ctx, user_gi, history_id) error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") if not no_wait: job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) if job_state not in ("ok", "skipped"): msg = f"Failed to run workflow, at least one job is in [{job_state}] state." error_message = msg if not error_message else f"{error_message}. {msg}" else: # wait for possible subworkflow invocations invocation = user_gi.invocations.show_invocation(invocation_id) for step in invocation["steps"]: if step.get("subworkflow_invocation_id") is not None: final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( ctx, invocation_id=step["subworkflow_invocation_id"], history_id=history_id, user_gi=user_gi, no_wait=no_wait, polling_backoff=polling_backoff, ) if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): return final_invocation_state, job_state, error_message ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") return final_invocation_state, job_state, error_message def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) return _wait_on_state(state_func, polling_backoff) def _retry_on_timeouts(ctx, gi, f): gi.timeout = 60 try_count = 5 try: for try_num in range(try_count): start_time = time.time() try: return f(gi) except RequestException: end_time = time.time() if end_time - start_time > 45 and (try_num + 1) < try_count: ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") continue else: raise finally: gi.timeout = None def has_jobs_in_states(ctx, gi, history_id, states): params = {"history_id": history_id} jobs_url = gi.url + "/jobs" jobs = gi.jobs._get(url=jobs_url, params=params) target_jobs = [j for j in jobs if j["state"] in states] return len(target_jobs) > 0 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): # Used to wait for active jobs and then wait for history, but now this is called # after upload is complete and after the invocation has been done scheduling - so # no need to wait for active jobs anymore I think. def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) return _wait_on_state(state_func, polling_backoff) def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0): # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish, # as you could have more than one invocation in a history, or an invocation without # steps that produce history items. ctx.log(f"waiting for invocation {invocation_id}") def state_func(): return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id)) return _wait_on_state(state_func, polling_backoff) def _wait_for_job(gi, job_id, timeout=None): def state_func(): return gi.jobs.show_job(job_id, full_details=True) return _wait_on_state(state_func, timeout=timeout) def _wait_on_state(state_func, polling_backoff=0, timeout=None): def get_state(): response = state_func() if not isinstance(response, list): response = [response] if not response: # invocation may not have any attached jobs, that's fine return "ok" non_terminal_states = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"} current_states = set(item["state"] for item in response) current_non_terminal_states = non_terminal_states.intersection(current_states) # Mix of "error"-ish terminal job, dataset, invocation terminal states, so we can use this for whatever we throw at it hierarchical_fail_states = [ "error", "paused", "deleted", "stopped", "discarded", "failed_metadata", "cancelled", "failed", ] for terminal_state in hierarchical_fail_states: if terminal_state in current_states: # If we got here something has failed and we can return (early) return terminal_state if current_non_terminal_states: return None if len(current_states) > 1: current_states = current_states - {"skipped"} assert len(current_states) == 1, f"unexpected state(s) found: {current_states}" return current_states.pop() timeout = timeout or 60 * 60 * 24 final_state = wait_on(get_state, "state", timeout, polling_backoff) return final_state __all__ = ("execute",)