Source code for planemo.galaxy.invocations.progress

import random
from io import StringIO
from typing import (
    Dict,
    List,
    Optional,
    Set,
)

from rich.console import Group
from rich.live import Live
from rich.markup import escape
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    TaskID,
    TaskProgressColumn,
    TextColumn,
)
from typing_extensions import TypedDict

from .api import (
    invocation_state_terminal,
    InvocationApi,
    JOB_ERROR_STATES,
)
from .progress_display import DisplayConfiguration


# Types for various invocation responses
[docs] class InvocationStep(TypedDict, total=False): id: str state: Optional[str] subworkflow_invocation_id: Optional[str]
[docs] class Invocation(TypedDict, total=False): id: str state: str steps: List[InvocationStep]
[docs] class InvocationJobsSummary(TypedDict, total=False): states: Dict[str, int]
[docs] class WorkflowProgress(Progress): _jobs_task: TaskID _steps_task: TaskID _subworkflows_task: Optional[TaskID] = None def __init__(self, display: DisplayConfiguration): self.invocation_state: str = "new" self.step_count: Optional[int] = None self.job_count: Optional[int] = 0 self.jobs_completed: Optional[int] = None self.step_states: Dict[str, int] = {} self.num_ok: int = 0 self.num_new: int = 0 self.num_queued: int = 0 self.num_running: int = 0 self.num_errors: int = 0 self.num_paused: int = 0 self.printed_job_errors: Set[str] = set() # Track printed job errors by job ID self.num_subworkflows: int = 0 self.num_subworkflows_complete: int = 0 self.display = display bar_column = BarColumn( style=self.display.style_bar_back, finished_style=self.display.style_bar_finished, complete_style=self.display.style_bar_complete, ) self.jobs_color: str = self.display.style_initializing self.steps_color: str = self.display.style_initializing self.subworkflows_color: str = self.display.style_initializing super().__init__( TextColumn("[progress.description]{task.description}"), TextColumn(display.divider), bar_column, TextColumn(display.divider), TaskProgressColumn(f"[{self.display.style_percent}]{{task.percentage:>3.0f}}%"), TextColumn(display.divider), TextColumn(text_format="{task.fields[status]}"), ) self.add_bars() @property def invocation_scheduling_terminal(self): # This is a tricky issue, because we might be waiting to schedule a new step whose inputs are paused. # Unlikely that this will ever be unpaused, so we also consider the "ready" invocation state and the presence of paused jobs as terminal. # We might want to tweak the state in Galaxy if we pause jobs because of input errors, so that this hack won't be rquired. return ( invocation_state_terminal(self.invocation_state) or self.invocation_state == "ready" and self.num_paused and self.num_errors ) @property def jobs_terminal(self): return self.job_count is not None and self.job_count == self.jobs_terminal_count @property def terminal(self): return self.invocation_scheduling_terminal and self.jobs_terminal
[docs] def handle_subworkflow_counts(self, num: int, num_complete: int): previous_count = self.num_subworkflows self.num_subworkflows = num self.num_subworkflows_complete = num_complete if previous_count < 2 and num >= 2: self._subworkflows_task = self.add_task( f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", status="" ) if num >= 2: self.subworkflows_color = self.display.style_ok subworkflows_status = f"{self.num_subworkflows_complete}/{self.num_subworkflows} terminal" self.update( self._subworkflows_task, total=self.num_subworkflows, completed=self.num_subworkflows_complete, description=f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", status=subworkflows_status, )
[docs] def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): self.invocation_state = invocation.get("state") or "new" self.step_count = len(invocation.get("steps") or []) or None self.step_states = step_states(invocation) steps_completed = None steps_status = "" if self.step_count is None: steps_status = "Loading steps." self.steps_color = self.display.style_initializing elif self.invocation_state == "cancelled": steps_status = "Invocation cancelled" self.steps_color = self.display.style_error elif self.invocation_state == "failed": steps_status = "Invocation failed" self.steps_color = self.display.style_error else: num_scheduled = (self.step_states.get("scheduled") or 0) + (self.step_states.get("completed") or 0) if num_scheduled > 0: self.steps_color = self.display.style_ok else: self.steps_color = self.display.style_initializing steps_completed = num_scheduled steps_status = f"{num_scheduled}/{self.step_count} scheduled" jobs_status = "" self.job_count = job_count(job_state_summary) self.num_new = count_states(job_state_summary, ["new"]) self.num_queued = count_states(job_state_summary, ["queued", "waiting"]) self.num_running = count_states(job_state_summary, ["running"]) self.num_errors = error_count(job_state_summary) self.num_ok = ok_count(job_state_summary) self.jobs_completed = self.num_ok + self.num_errors self.num_paused = count_states(job_state_summary, ["paused"]) self.jobs_terminal_count = self.jobs_completed + self.num_paused jobs_total: Optional[int] = self.job_count if self.num_errors > 0: self.jobs_color = self.display.style_error elif self.job_count > 0: self.jobs_color = self.display.style_ok else: self.jobs_color = self.display.style_initializing self.jobs_completed = None jobs_total = None if self.job_count > 0: jobs_status = f"{self.jobs_completed}/{self.job_count} terminal" self.update( self._steps_task, total=self.step_count, completed=steps_completed, description=f"[{self.steps_color}]{self.display.label_progress_steps}", status=steps_status, ) self.update( self._jobs_task, total=jobs_total, completed=self.jobs_completed, description=f"[{self.jobs_color}]{self.display.label_progress_jobs}", status=jobs_status, )
def _job_states_console_line(self): output = StringIO() if self.num_ok > 0: output.write(f"{self.display.icon_state_ok} {self.num_ok} {self.display.divider} ") if self.num_errors > 0: output.write(f"{self.display.icon_state_errors} {self.num_errors} {self.display.divider} ") if self.num_new > 0: output.write(f"{self.display.icon_state_new} {self.num_new} {self.display.divider} ") if self.num_queued > 0: output.write(f"{self.display.icon_state_queued} {self.num_queued} {self.display.divider} ") if self.num_running > 0: output.write(f"{self.display.icon_state_running} {self.num_running} {self.display.divider} ") if self.num_paused > 0: output.write(f"{self.display.icon_state_paused} {self.num_paused} {self.display.divider} ") result = output.getvalue().rstrip(" {self.display.divider} ") output.close() # Is there an actual way to reset this? The undefined style seems to work but is a hack. return f"[{self.jobs_color}]{self.display.label_job_states_prefix} [reset]{self.display.divider} {result}"
[docs] def add_bars(self): self._steps_task = self.add_task(f"[{self.steps_color}]{self.display.label_progress_steps}", status="") self._jobs_task = self.add_task(f"[{self.jobs_color}]{self.display.label_progress_jobs}", status="")
[docs] def print_job_errors_once( self, ctx, invocation_api: InvocationApi, invocation_id: str, workflow_progress_display: "WorkflowProgressDisplay", ): """Print job errors only if they haven't been printed before, tracking by job ID.""" # Early exit if no errors detected in current state if self.num_errors == 0: return # Get all failed job details and filter for new ones try: new_error_lines = [] # Use the efficient jobs API to get all failed jobs for this invocation in one call try: # Get all jobs in error state for this invocation failed_jobs = invocation_api.get_invocation_jobs(invocation_id, state="error") ctx.vlog(f"Found {len(failed_jobs)} failed jobs for invocation {invocation_id}") # Process each failed job that we haven't seen before for job in failed_jobs: job_id = job.get("id") if job_id and job_id not in self.printed_job_errors: # Mark this job as printed self.printed_job_errors.add(job_id) ctx.vlog(f"Processing failed job {job_id}") # Get detailed job information for error details try: job_details = invocation_api.get_job(job_id, full_details=True) job_errors = self._format_job_error_details(job_id, job_details) new_error_lines.extend(job_errors) except Exception as e: ctx.vlog("Failed to get details for job {}: {}".format(job_id, e)) except Exception as e: ctx.vlog("Failed to get failed jobs: {}".format(e)) return # Print any new errors found if new_error_lines: workflow_progress_display.console.print("\n".join(new_error_lines)) except Exception as e: error_msg = "❌ Failed to collect failed job details: {}".format(escape(str(e))) workflow_progress_display.console.print(error_msg)
def _format_job_error_details(self, job_id, job_details): """Format error details for a single failed job.""" error_lines = [] exit_code = job_details.get("exit_code") stderr = job_details.get("stderr", "").strip() stdout = job_details.get("stdout", "").strip() command_line = job_details.get("command_line") tool_id = job_details.get("tool_id") error_lines.append("❌ Failed job {}:".format(escape(str(job_id)))) if tool_id: error_lines.append(" Tool ID: {}".format(escape(tool_id))) if exit_code is not None: error_lines.append(" Exit code: {}".format(exit_code)) if command_line: error_lines.append(" Command line: {}".format(escape(command_line))) if stdout: error_lines.append(" Stdout:") for line in stdout.split("\n"): if line.strip(): error_lines.append(" {}".format(escape(line))) if stderr: error_lines.append(" Stderr:") for line in stderr.split("\n"): if line.strip(): error_lines.append(" {}".format(escape(line))) error_lines.append("") # Empty line for spacing return error_lines
# converted from Galaxy TypeScript (see util.ts next to WorkflowInvocationState.vue)
[docs] def count_states(job_summary: Optional[InvocationJobsSummary], query_states: list[str]) -> int: count = 0 states = job_summary.get("states") if job_summary else None if states: for state in query_states: count += states.get(state, 0) return count
[docs] def job_count(job_summary: Optional[InvocationJobsSummary]) -> int: states = job_summary.get("states") if job_summary else None count = 0 if states: for state_count in states.values(): if state_count: count += state_count return count
[docs] def step_states(invocation: Invocation): step_states = {} steps = invocation.get("steps") or [] for step in steps: if not step: continue step_state = step.get("state") or "unknown" if step_state not in step_states: step_states[step_state] = 0 step_states[step_state] += 1 return step_states
[docs] def ok_count(job_summary: InvocationJobsSummary) -> int: return count_states(job_summary, ["ok", "skipped"])
[docs] def error_count(job_summary: InvocationJobsSummary) -> int: return count_states(job_summary, JOB_ERROR_STATES)
[docs] def running_count(job_summary: InvocationJobsSummary) -> int: return count_states(job_summary, ["running"])
[docs] class WorkflowProgressDisplay(Live): def __init__( self, invocation_id: str, display_configuration: Optional[DisplayConfiguration] = None, galaxy_url: Optional[str] = None, ): self.subworkflow_invocation_ids_seen: Set[str] = set() self.subworkflow_invocation_ids_completed: Set[str] = set() self.subworkflow_invocation_id: Optional[str] = None self.new_steps: List[str] = [] self.invocation_id = invocation_id display = display_configuration or DisplayConfiguration() self.galaxy_url = galaxy_url self.display = display self.workflow_progress = WorkflowProgress(display) self.subworkflow_progress = WorkflowProgress(display) super().__init__(self._panel(), auto_refresh=False) def _register_subworkflow_invocation_ids_from(self, invocation: Invocation): subworkflow_invocation_ids: List[str] = [] steps = invocation.get("steps") or [] new_steps: List[str] = [] for step in steps: if step["state"] == "new": new_steps.append(step["id"]) subworkflow_invocation_id = step.get("subworkflow_invocation_id") if subworkflow_invocation_id: subworkflow_invocation_ids.append(subworkflow_invocation_id) self.new_steps = new_steps self._register_subworkflow_invocation_ids(subworkflow_invocation_ids) def _register_subworkflow_invocation_ids(self, ids: List[str]): for invocation_id in ids: self.subworkflow_invocation_ids_seen.add(invocation_id) def _complete_subworkflow(self, id: str): self.subworkflow_invocation_ids_completed.add(id)
[docs] def an_incomplete_subworkflow_id(self): return random.choice(tuple(self.subworkflow_invocation_ids_seen - self.subworkflow_invocation_ids_completed))
[docs] def all_subworkflows_complete(self): if self.new_steps: # These don't have subworkflow invocation ids yet, we can't know if they're all complete return False return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed)
def _panel(self): def job_states(workflow_progress): if self.display.include_job_state_breakdown: return workflow_progress._job_states_console_line() else: return None title = f"[{self.display.style_header}]{self.display.label_header_prefix}<[link={self.get_invocation_ui_link()}]{self.invocation_id}[/link]>" subworkflow_title = None if self.subworkflow_invocation_id: subworkflow_title = f"[{self.display.style_subworkflow_header}]{self.display.label_subworkflow_header_prefix}<{self.subworkflow_invocation_id}>" if not self.subworkflow_invocation_id or not self.display.include_nested_subworkflows: renderable = as_group( self.workflow_progress, job_states(self.workflow_progress), ) elif not self.display.subworkflows_as_panel: renderable = as_group( self.workflow_progress, job_states(self.workflow_progress), subworkflow_title, self.subworkflow_progress, job_states(self.subworkflow_progress), ) else: renderable = as_group( self.workflow_progress, job_states(self.workflow_progress), Panel( as_group(self.subworkflow_progress, job_states(self.subworkflow_progress)), title=subworkflow_title, ), ) return Panel(renderable, title=title, expand=True) def _update_panel(self): self.update(self._panel()) self.refresh()
[docs] def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): self.workflow_progress.handle_invocation(invocation, job_state_summary) self._register_subworkflow_invocation_ids_from(invocation) self._update_panel()
[docs] def handle_subworkflow_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): self.subworkflow_invocation_id = invocation["id"] self.subworkflow_progress.handle_invocation(invocation, job_state_summary) self._register_subworkflow_invocation_ids_from(invocation) if self.subworkflow_progress.terminal: self._complete_subworkflow(invocation["id"]) self.workflow_progress.handle_subworkflow_counts( len(self.subworkflow_invocation_ids_seen), len(self.subworkflow_invocation_ids_completed), ) self._update_panel()
[docs] def as_group(*renderables): return Group(*(r for r in renderables if r))