Source code for planemo.galaxy.upload_progress

"""Progress display for Galaxy data uploads."""

from io import StringIO
from typing import (
    Any,
    Dict,
    List,
    Optional,
    Set,
    Union,
)

from rich.console import (
    Console,
    Group,
)
from rich.live import Live
from rich.panel import Panel
from rich.progress import (
    BarColumn,
    Progress,
    TaskID,
    TaskProgressColumn,
    TextColumn,
)

from .invocations.api import JOB_ERROR_STATES
from .invocations.progress_display import DisplayConfiguration

# Job state constants for uploads (same as workflow jobs)
UPLOAD_JOB_TERMINAL_STATES = JOB_ERROR_STATES + ["ok", "skipped", "paused"]


[docs] class UploadJobsSummary: """Summary of upload job states.""" def __init__(self, states: Optional[Dict[str, int]] = None): self.states: Dict[str, int] = states or {}
[docs] def count_states(job_summary: Optional[UploadJobsSummary], query_states: List[str]) -> int: """Count jobs in specific states.""" count = 0 if job_summary and job_summary.states: for state in query_states: count += job_summary.states.get(state, 0) return count
[docs] def job_count(job_summary: Optional[UploadJobsSummary]) -> int: """Count total jobs across all states.""" count = 0 if job_summary and job_summary.states: for state_count in job_summary.states.values(): if state_count: count += state_count return count
[docs] def ok_count(job_summary: UploadJobsSummary) -> int: """Count successfully completed jobs.""" return count_states(job_summary, ["ok", "skipped"])
[docs] def error_count(job_summary: UploadJobsSummary) -> int: """Count jobs in error states.""" return count_states(job_summary, JOB_ERROR_STATES)
[docs] class UploadProgress(Progress): """Progress tracker for upload jobs.""" _uploads_task: TaskID def __init__(self, display: DisplayConfiguration): self.job_count: int = 0 self.jobs_completed: int = 0 self.num_ok: int = 0 self.num_new: int = 0 self.num_queued: int = 0 self.num_running: int = 0 self.num_errors: int = 0 self.num_paused: int = 0 self.printed_job_errors: Set[str] = set() # Track printed job errors by job ID self.display = display self.uploads_color: str = self.display.style_initializing # Create progress bar with same column layout as WorkflowProgress bar_column = BarColumn( style=self.display.style_bar_back, finished_style=self.display.style_bar_finished, complete_style=self.display.style_bar_complete, ) super().__init__( TextColumn("[progress.description]{task.description}"), TextColumn(display.divider), bar_column, TextColumn(display.divider), TaskProgressColumn(f"[{self.display.style_percent}]{{task.percentage:>3.0f}}%"), TextColumn(display.divider), TextColumn(text_format="{task.fields[status]}"), ) # Add the uploads progress bar self._uploads_task = self.add_task(f"[{self.uploads_color}]{self.display.label_progress_jobs}", status="") @property def jobs_terminal_count(self) -> int: """Count of jobs in terminal states (completed, errors, paused).""" return self.jobs_completed + self.num_paused @property def terminal(self) -> bool: """Check if all upload jobs are in terminal state.""" return self.job_count > 0 and self.job_count == self.jobs_terminal_count
[docs] def handle_upload_jobs(self, job_summary: UploadJobsSummary): """Update progress based on upload job states. Args: job_summary: Summary of current job states """ # Update state counters self.job_count = job_count(job_summary) self.num_new = count_states(job_summary, ["new", "upload"]) self.num_queued = count_states(job_summary, ["queued", "waiting"]) self.num_running = count_states(job_summary, ["running"]) self.num_errors = error_count(job_summary) self.num_ok = ok_count(job_summary) self.jobs_completed = self.num_ok + self.num_errors self.num_paused = count_states(job_summary, ["paused"]) # Update color based on state if self.num_errors > 0: self.uploads_color = self.display.style_error elif self.job_count > 0: self.uploads_color = self.display.style_ok else: self.uploads_color = self.display.style_initializing # Format status message jobs_status = "" jobs_total: Optional[int] = self.job_count jobs_completed: Optional[int] = self.jobs_completed if self.job_count == 0: jobs_completed = None jobs_total = None elif self.job_count > 0: jobs_status = f"{self.jobs_completed}/{self.job_count} terminal" # Update the progress bar self.update( self._uploads_task, total=jobs_total, completed=jobs_completed, description=f"[{self.uploads_color}]{self.display.label_progress_jobs}", status=jobs_status, )
def _job_states_console_line(self) -> str: """Format a console line showing breakdown of job states with icons.""" output = StringIO() if self.num_ok > 0: output.write(f"{self.display.icon_state_ok} {self.num_ok} {self.display.divider} ") if self.num_errors > 0: output.write(f"{self.display.icon_state_errors} {self.num_errors} {self.display.divider} ") if self.num_new > 0: output.write(f"{self.display.icon_state_new} {self.num_new} {self.display.divider} ") if self.num_queued > 0: output.write(f"{self.display.icon_state_queued} {self.num_queued} {self.display.divider} ") if self.num_running > 0: output.write(f"{self.display.icon_state_running} {self.num_running} {self.display.divider} ") if self.num_paused > 0: output.write(f"{self.display.icon_state_paused} {self.num_paused} {self.display.divider} ") result = output.getvalue().rstrip(f" {self.display.divider} ") output.close() return f"[{self.uploads_color}]{self.display.label_job_states_prefix} [reset]{self.display.divider} {result}"
[docs] def print_job_errors_once(self, gi, upload_progress_display: "UploadProgressDisplay"): """Print job errors only if they haven't been printed before, tracking by job ID. Args: gi: GalaxyInstance for querying job details upload_progress_display: Display instance for console output """ # Early exit if no errors detected if self.num_errors == 0: return try: new_error_lines = [] # Get all tracked upload jobs and check for errors for job_id in upload_progress_display._upload_job_ids: if job_id in self.printed_job_errors: continue try: # Get job details job_details = gi.jobs.show_job(job_id, full_details=True) job_state = job_details.get("state", "") # If this job is in error state and we haven't printed it yet if job_state in JOB_ERROR_STATES: self.printed_job_errors.add(job_id) error_lines = self._format_job_error_details(job_id, job_details) new_error_lines.extend(error_lines) except Exception: # Silently ignore job query failures pass # Print any new errors found if new_error_lines: upload_progress_display.console.print("\n".join(new_error_lines)) except Exception as e: error_msg = f"Failed to collect upload job error details: {e}" upload_progress_display.console.print(error_msg)
def _format_job_error_details(self, job_id: str, job_details: Dict[str, Any]) -> List[str]: """Format error details for a single failed upload job. Args: job_id: Job ID job_details: Job details from Galaxy API Returns: List of formatted error message lines """ error_lines = [] exit_code = job_details.get("exit_code") stderr = job_details.get("stderr", "").strip() stdout = job_details.get("stdout", "").strip() tool_id = job_details.get("tool_id") error_lines.append(f"Failed upload job {job_id}:") if tool_id: error_lines.append(f" Tool: {tool_id}") if exit_code is not None: error_lines.append(f" Exit code: {exit_code}") if stderr: error_lines.append(f" Stderr: {stderr[:500]}") # Limit stderr output if stdout: error_lines.append(f" Stdout: {stdout[:500]}") # Limit stdout output return error_lines
[docs] class UploadProgressDisplay(Live): """Live display for upload progress with Rich panel.""" def __init__( self, history_id: str, display_configuration: Optional[DisplayConfiguration] = None, galaxy_url: Optional[str] = None, ): """Initialize upload progress display. Args: history_id: Galaxy history ID where files are being uploaded display_configuration: Optional display configuration for styling galaxy_url: Optional Galaxy server URL for display """ self.history_id = history_id self.galaxy_url = galaxy_url display = display_configuration or DisplayConfiguration() self.display = display self.upload_progress = UploadProgress(display) self.console = Console() self._upload_job_ids: Set[str] = set() # Initialize with auto_refresh=False for manual control super().__init__(self._panel(), console=self.console, auto_refresh=False) def _panel(self) -> Panel: """Create Rich panel with progress display. Returns: Rich Panel containing the upload progress """ # Format title with clickable link like "Uploading to History <hist123abc>" history_link = self.get_history_ui_link() title = f"[{self.display.style_header}]Uploading to History <[link={history_link}]{self.history_id}[/link]>" # Create group with progress bar and optional job state breakdown renderables: List[Union[UploadProgress, str]] = [self.upload_progress] # Add job state breakdown if configured if self.display.include_job_state_breakdown and self.upload_progress.job_count > 0: renderables.append(self.upload_progress._job_states_console_line()) content = Group(*renderables) if len(renderables) > 1 else self.upload_progress return Panel(content, title=title, expand=True)
[docs] def update_jobs(self, upload_jobs: List[Dict[str, Any]], job_summary: UploadJobsSummary): """Update progress display with current upload job states. Args: upload_jobs: List of upload job dictionaries job_summary: Summary of job states """ # Track job IDs for error reporting for job in upload_jobs: if "id" in job: self._upload_job_ids.add(job["id"]) # Update progress self.upload_progress.handle_upload_jobs(job_summary) # Refresh the display self.update(self._panel()) self.refresh()
def _aggregate_job_states(upload_jobs: List[Dict[str, Any]], gi) -> UploadJobsSummary: """Poll all upload jobs and aggregate their states. Args: upload_jobs: List of upload job dictionaries with 'id' field gi: GalaxyInstance for querying job states Returns: UploadJobsSummary with aggregated state counts """ states: Dict[str, int] = {} for upload_job in upload_jobs: job_id = upload_job.get("id") if not job_id: continue try: job = gi.jobs.show_job(job_id) state = job.get("state", "unknown") states[state] = states.get(state, 0) + 1 except Exception: # If we can't get job state, count it as unknown states["unknown"] = states.get("unknown", 0) + 1 return UploadJobsSummary(states)