Source code for planemo.galaxy.workflows

"""Utilities for Galaxy workflows."""

import json
import os
import tempfile
from collections import namedtuple
from functools import lru_cache
from typing import (
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Protocol,
    runtime_checkable,
    Tuple,
    TYPE_CHECKING,
)
from urllib.parse import (
    quote,
    urlparse,
)

if TYPE_CHECKING:
    from bioblend.galaxy import GalaxyInstance

import requests
import yaml
from ephemeris import (
    generate_tool_list_from_ga_workflow_files,
    shed_tools,
)

# Labels lives in a private gxformat2 module; reuse its canonical anonymous/
# unlabeled predicates rather than re-deriving the sentinel conventions here.
from gxformat2._labels import Labels
from gxformat2.converter import python_to_workflow
from gxformat2.normalize import (
    inputs_normalized,
    outputs_normalized,
)

from planemo.galaxy.api import (
    get_dict_from_workflow,
    gi,
)
from planemo.io import warn

FAILED_REPOSITORIES_MESSAGE = "Failed to install one or more repositories."
GALAXY_WORKFLOWS_PREFIX = "gxid://workflows/"
GALAXY_WORKFLOW_INSTANCE_PREFIX = "gxid://workflow-instance/"
TRS_WORKFLOWS_PREFIX = "trs://"
MAIN_TOOLSHED_URL = "https://toolshed.g2.bx.psu.edu"


def parse_trs_id(trs_id: str) -> Optional[Dict[str, str]]:
    """Parse a TRS ID into a full TRS URL.

    Args:
        trs_id: TRS ID in format: [#]workflow/github.com/org/repo/workflow_name[/version]
                Examples:
                - workflow/github.com/org/repo/main
                - #workflow/github.com/org/repo/main/v0.1.14
                - workflow/github.com/iwc-workflows/parallel-accession-download/main

    Returns:
        Dict with key 'trs_url' containing the full TRS API URL,
        or None if invalid
    """
    # Remove leading # if present
    if trs_id.startswith("#"):
        trs_id = trs_id[1:]

    # Expected format: workflow/github.com/org/repo[/workflow_name][/version]
    # Some workflows use the repo name as the workflow name (4 parts for tool ID)
    # Others have a separate workflow name (5 parts for tool ID)
    parts = trs_id.split("/")
    if len(parts) < 4:
        return None

    artifact_type = parts[0]  # workflow or tool
    service = parts[1]  # github.com
    owner = parts[2]
    repo = parts[3]

    # Determine if we have a workflow name and/or version
    # Format could be:
    #   workflow/github.com/org/repo (4 parts) - no workflow name, no version
    #   workflow/github.com/org/repo/workflow_name (5 parts) - with workflow name, no version
    #   workflow/github.com/org/repo/workflow_name/version (6 parts) - with both
    #   workflow/github.com/org/repo/workflow_name/versions/version (7 parts) - full URL format
    if len(parts) == 4:
        workflow_name = None
        version = None
    elif len(parts) == 5:
        # 5th part is the workflow name (e.g., "main" in most cases)
        workflow_name = parts[4]
        version = None
    elif len(parts) >= 6:
        # 6+ parts: 5th is workflow name, 6th might be "versions" keyword or version
        workflow_name = parts[4]
        # Check if this is full URL format with "versions" keyword
        if len(parts) >= 7 and parts[5] == "versions":
            # Full URL format: .../workflow_name/versions/version
            version = parts[6]
        else:
            # Short format: .../workflow_name/version
            version = parts[5]
    else:
        workflow_name = None
        version = None

    # Build the TRS tool ID
    # Format: #workflow/github.com/org/repo[/workflow_name]
    if workflow_name:
        trs_tool_id = f"#{artifact_type}/{service}/{owner}/{repo}/{workflow_name}"
    else:
        trs_tool_id = f"#{artifact_type}/{service}/{owner}/{repo}"
    # URL-encode the tool ID for API calls
    encoded_tool_id = quote(trs_tool_id, safe="")

    # Build the full TRS URL
    # Dockstore is the primary TRS server for GitHub workflows
    trs_base_url = "https://dockstore.org/api/ga4gh/trs/v2/tools/"

    if version:
        # Specific version requested
        trs_url = f"{trs_base_url}{trs_tool_id}/versions/{version}"
    else:
        # No version specified - fetch latest version from Dockstore
        # Galaxy requires /versions/{version_id} in TRS URLs, so we must provide a version
        default_version = workflow_name if workflow_name else "main"

        try:
            # Query Dockstore API to get available versions (using encoded URL)
            versions_url = f"{trs_base_url}{encoded_tool_id}/versions"
            response = requests.get(versions_url, timeout=10)
            response.raise_for_status()
            versions = response.json()

            if versions and len(versions) > 0:
                # Get the first version (usually the latest/default)
                latest_version = versions[0].get("name") or versions[0].get("id")
                if latest_version:
                    trs_url = f"{trs_base_url}{trs_tool_id}/versions/{latest_version}"
                else:
                    # Use default version as fallback
                    trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}"
            else:
                # No versions found, use default version
                trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}"
        except Exception:
            # If we can't fetch versions, use default version
            # Galaxy requires the /versions/ part in TRS URLs
            trs_url = f"{trs_base_url}{trs_tool_id}/versions/{default_version}"

    return {"trs_url": trs_url}


def parse_trs_uri(trs_uri: str) -> Optional[Dict[str, str]]:
    """Parse a TRS URI into a full TRS URL.

    Args:
        trs_uri: TRS URI in format: trs://[#]workflow/github.com/org/repo/workflow_name[/version]
                 or trs://<full_dockstore_url>

    Returns:
        Dict with key 'trs_url' containing the full TRS API URL,
        or None if invalid
    """
    if not trs_uri.startswith(TRS_WORKFLOWS_PREFIX):
        return None

    # Remove trs:// prefix
    trs_content = trs_uri[len(TRS_WORKFLOWS_PREFIX) :]

    # Parse as a TRS ID (workflow/... or #workflow/...) to resolve versions
    return parse_trs_id(trs_content)


@runtime_checkable
class TrsImporter(Protocol):
    """Interface for importing workflows from TRS."""

    def import_from_trs(self, trs_url: str) -> Dict[str, Any]:
        """Import a workflow from a TRS URL.

        Args:
            trs_url: Full TRS URL to import from

        Returns:
            Workflow dict with 'id' and other metadata
        """
        ...


class GalaxyTrsImporter:
    """Import TRS workflows via Galaxy API."""

    def __init__(self, gi: "GalaxyInstance"):
        """Initialize with Galaxy instance.

        Args:
            gi: BioBlend GalaxyInstance for API access
        """
        self._gi = gi

    def import_from_trs(self, trs_url: str) -> Dict[str, Any]:
        """Import a workflow from a TRS URL via Galaxy API.

        Args:
            trs_url: Full TRS URL to import from

        Returns:
            Workflow dict with 'id' and other metadata
        """
        trs_payload = {"archive_source": "trs_tool", "trs_url": trs_url}
        url = self._gi.workflows._make_url()
        return self._gi.workflows._post(url=url, payload=trs_payload)


[docs] def import_workflow_from_trs(trs_uri: str, importer: TrsImporter) -> Dict[str, Any]: """Import a workflow from a TRS endpoint. Args: trs_uri: TRS URI in format trs://[#]workflow/github.com/org/repo/workflow_name[/version] importer: TrsImporter implementation to use for importing Returns: Workflow dict with 'id' and other metadata """ trs_info = parse_trs_uri(trs_uri) if not trs_info: raise ValueError(f"Invalid TRS URI: {trs_uri}") return importer.import_from_trs(trs_info["trs_url"])
DOCKSTORE_TRS_BASE = "https://dockstore.org/api/ga4gh/trs/v2/tools/" def _resolve_trs_url(trs_id: str) -> str: """Resolve a TRS identifier to a full TRS URL.""" if trs_id.startswith(DOCKSTORE_TRS_BASE): return trs_id if trs_id.startswith(TRS_WORKFLOWS_PREFIX): trs_info = parse_trs_uri(trs_id) if not trs_info: raise ValueError(f"Invalid TRS URI: {trs_id}") return trs_info["trs_url"] # It's a short TRS ID trs_info = parse_trs_id(trs_id) if not trs_info: raise ValueError(f"Invalid TRS ID: {trs_id}") return trs_info["trs_url"] def _encode_trs_url(trs_url: str) -> str: """URL-encode the tool ID portion of a TRS URL for API calls.""" if not trs_url.startswith(DOCKSTORE_TRS_BASE): return trs_url tool_id_and_version = trs_url[len(DOCKSTORE_TRS_BASE) :] if "/versions/" in tool_id_and_version: tool_id, version = tool_id_and_version.split("/versions/", 1) return f"{DOCKSTORE_TRS_BASE}{quote(tool_id, safe='')}/versions/{version}" return f"{DOCKSTORE_TRS_BASE}{quote(tool_id_and_version, safe='')}" def _find_primary_descriptor(files: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Find the primary descriptor file from a list of TRS files.""" for f in files: if f.get("file_type") == "PRIMARY_DESCRIPTOR": return f for f in files: if f.get("path", "").endswith((".ga", ".gxwf.yml", ".gxwf.yaml")): return f return files[0] if files else None
[docs] def fetch_workflow_from_trs(trs_id: str) -> Dict[str, Any]: """Fetch a workflow definition directly from a TRS endpoint. Args: trs_id: TRS ID in format: [#]workflow/github.com/org/repo/workflow_name[/version] or a full TRS URL Returns: Workflow definition dict (gxformat2 or GA format) Raises: ValueError: If the TRS ID is invalid or workflow cannot be fetched """ trs_url = _encode_trs_url(_resolve_trs_url(trs_id)) files_url = f"{trs_url}/GALAXY/files" try: response = requests.get(files_url, timeout=30) response.raise_for_status() files = response.json() primary_file = _find_primary_descriptor(files) if not primary_file: raise ValueError(f"No workflow file found at TRS endpoint: {trs_url}") descriptor_url = f"{trs_url}/GALAXY/descriptor/{primary_file.get('path', '')}" file_response = requests.get(descriptor_url, timeout=30) file_response.raise_for_status() content = file_response.json().get("content", "") if not content: raise ValueError(f"Empty workflow content from TRS endpoint: {trs_url}") try: return json.loads(content) except json.JSONDecodeError: return yaml.safe_load(content) except requests.RequestException as e: raise ValueError(f"Failed to fetch workflow from TRS endpoint {trs_url}: {e}")
@lru_cache(maxsize=None) def guess_tool_shed_url(tool_shed_fqdn: str) -> Optional[str]: if tool_shed_fqdn in MAIN_TOOLSHED_URL: return MAIN_TOOLSHED_URL else: # guess if tool shed is served over https or http https_tool_shed_url = f"https://{tool_shed_fqdn}" r = requests.get(https_tool_shed_url) if r.status_code == 200: return https_tool_shed_url else: http_tool_shed_url = f"http://{tool_shed_fqdn}" r = requests.get(http_tool_shed_url) if r.status_code == 200: return http_tool_shed_url else: warn(f"Could not connect to {tool_shed_fqdn}") return None def get_toolshed_url_for_tool_id(tool_id: str) -> Optional[str]: components = tool_id.split("/repos") if len(components) > 1: tool_shed_fqdn = components[0] return guess_tool_shed_url(tool_shed_fqdn=tool_shed_fqdn) return None def load_shed_repos(runnable): if runnable.type.name != "galaxy_workflow": return [] path = runnable.path if path.endswith(".ga"): with tempfile.NamedTemporaryFile() as out: generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow( [path], "Tools from workflows", out.name ) with open(out.name) as f: tools = yaml.safe_load(f)["tools"] else: # It'd be better to just infer this from the tool shed ID somehow than # require explicit annotation like this... I think? with open(path) as f: workflow = yaml.safe_load(f) steps = workflow["steps"] if isinstance(steps, dict): steps = steps.values() tools = [] for step in steps: repository = step.get("tool_shed_repository") if repository: repository["tool_panel_section_label"] = "Tools from workflows" tools.append(repository) for repo in tools: tool_shed = repo.get("tool_shed") if tool_shed: tool_shed_url = guess_tool_shed_url(tool_shed) if tool_shed_url: repo["tool_shed_url"] = tool_shed_url return tools def _install_shed_repos_from_tools_info( tools_info: List[Dict[str, Any]], admin_gi: "GalaxyInstance", ignore_dependency_problems: bool, install_tool_dependencies: bool = False, install_resolver_dependencies: bool = True, install_repository_dependencies: bool = True, install_most_recent_revision: bool = False, ) -> Tuple[Optional[List[Any]], Optional[List[Any]]]: """Common logic for installing tool shed repositories from a tools_info list.""" if not tools_info: return None, None install_tool_manager = shed_tools.InstallRepositoryManager(admin_gi) install_results = install_tool_manager.install_repositories( tools_info, default_install_tool_dependencies=install_tool_dependencies, default_install_resolver_dependencies=install_resolver_dependencies, default_install_repository_dependencies=install_repository_dependencies, ) if install_most_recent_revision: # for workflow autoupdates we also need the most recent tool versions update_results = install_tool_manager.update_repositories( tools_info, default_install_tool_dependencies=install_tool_dependencies, default_install_resolver_dependencies=install_resolver_dependencies, default_install_repository_dependencies=install_repository_dependencies, ) install_results.errored_repositories.extend(update_results.errored_repositories) updated_repos = update_results.installed_repositories else: updated_repos = None if install_results.errored_repositories: if ignore_dependency_problems: warn(FAILED_REPOSITORIES_MESSAGE) else: raise Exception(FAILED_REPOSITORIES_MESSAGE) return install_results.installed_repositories, updated_repos def install_shed_repos( runnable, admin_gi, ignore_dependency_problems, install_tool_dependencies=False, install_resolver_dependencies=True, install_repository_dependencies=True, install_most_recent_revision=False, ): tools_info = load_shed_repos(runnable) return _install_shed_repos_from_tools_info( tools_info, admin_gi, ignore_dependency_problems, install_tool_dependencies, install_resolver_dependencies, install_repository_dependencies, install_most_recent_revision, ) def install_shed_repos_for_workflow_id( workflow_id: str, user_gi: "GalaxyInstance", admin_gi: "GalaxyInstance", ignore_dependency_problems: bool, install_tool_dependencies: bool = False, install_resolver_dependencies: bool = True, install_repository_dependencies: bool = True, install_most_recent_revision: bool = False, ) -> Tuple[Optional[List[Any]], Optional[List[Any]]]: """Install tool shed repositories for a workflow that's already in Galaxy. This is used for TRS workflows that are imported via Galaxy's TRS API. We fetch the workflow definition from Galaxy and extract tool requirements. """ # Fetch the workflow from Galaxy to get the GA format workflow_dict = user_gi.workflows.export_workflow_dict(workflow_id) # Use ephemeris to generate the tool list from the workflow with tempfile.NamedTemporaryFile(mode="w", suffix=".ga", delete=False) as wf_file: json.dump(workflow_dict, wf_file) wf_file.flush() wf_path = wf_file.name try: with tempfile.NamedTemporaryFile(mode="w", suffix=".yml", delete=False) as tool_file: tool_path = tool_file.name try: # Generate tool list from the GA workflow generate_tool_list_from_ga_workflow_files.generate_tool_list_from_workflow( [wf_path], "Tools from TRS workflow", tool_path ) # Load the generated tool list with open(tool_path) as f: tools_data = yaml.safe_load(f) tools_info = tools_data.get("tools", []) if tools_data else [] # Add tool shed URLs for repo in tools_info: tool_shed = repo.get("tool_shed") if tool_shed: tool_shed_url = guess_tool_shed_url(tool_shed) if tool_shed_url: repo["tool_shed_url"] = tool_shed_url # Use common installation logic return _install_shed_repos_from_tools_info( tools_info, admin_gi, ignore_dependency_problems, install_tool_dependencies, install_resolver_dependencies, install_repository_dependencies, install_most_recent_revision, ) finally: # Clean up tool list file if os.path.exists(tool_path): os.unlink(tool_path) finally: # Clean up workflow file if os.path.exists(wf_path): os.unlink(wf_path)
[docs] def import_workflow(path, user_gi, from_path=False): """Import a workflow path to specified Galaxy instance.""" if not from_path: workflow = _raw_dict(path) return user_gi.workflows.import_workflow_dict(workflow) else: path = os.path.abspath(path) workflow = user_gi.workflows.import_workflow_from_local_path(path) return workflow
def _raw_dict(path): if path.endswith(".ga"): with open(path) as f: workflow = json.load(f) else: workflow_directory = os.path.abspath(os.path.dirname(path)) with open(path) as f: workflow = yaml.safe_load(f) workflow = python_to_workflow(workflow, workflow_directory=workflow_directory) return workflow def get_tool_ids_for_workflow(wf_dict: Dict[str, Any], tool_ids: Optional[List[str]] = None) -> List[str]: tool_ids = [] if tool_ids is None else tool_ids steps = wf_dict["steps"].values() if isinstance(wf_dict["steps"], dict) else wf_dict["steps"] for step in steps: if step.get("type", "tool") == "tool" and not step.get("run", {}).get("class") == "GalaxyWorkflow": tool_id = step["tool_id"] tool_ids.append(tool_id) elif step.get("type") == "subworkflow": # GA SWF get_tool_ids_for_workflow(step["subworkflow"], tool_ids=tool_ids) elif step.get("run", {}).get("class") == "GalaxyWorkflow": # gxformat2 SWF get_tool_ids_for_workflow(step["run"], tool_ids=tool_ids) else: continue return list(dict.fromkeys(tool_ids)) def find_tool_ids(path): workflow = _raw_dict(path) return get_tool_ids_for_workflow(workflow) WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label", "optional"]) def remote_runnable_to_workflow_id(runnable): assert runnable.is_remote_workflow_uri parse_result = urlparse(runnable.uri) return parse_result.path[1:]
[docs] def describe_outputs(runnable, gi=None): """Return a list of :class:`WorkflowOutput` objects for target workflow.""" if runnable.uri.startswith((GALAXY_WORKFLOWS_PREFIX, GALAXY_WORKFLOW_INSTANCE_PREFIX)): workflow_id = remote_runnable_to_workflow_id(runnable) assert gi is not None instance = runnable.uri.startswith(GALAXY_WORKFLOW_INSTANCE_PREFIX) workflow = get_dict_from_workflow(gi, workflow_id, instance) else: workflow = _raw_dict(runnable.path) outputs = [] for order_index, step in workflow["steps"].items(): optional = False if not step.get("tool_id"): # One of the parameter types ... need eliminate this guesswork on the Galaxy side tool_state = json.loads(step.get("tool_state", "{}")) optional = tool_state.get("optional", False) step_outputs = step.get("workflow_outputs", []) for step_output in step_outputs: output = WorkflowOutput( int(order_index), step_output["output_name"], step_output["label"], optional, ) outputs.append(output) return outputs
def input_labels(workflow_path): """Get normalized labels for workflow artifact regardless of format.""" steps = inputs_normalized(workflow_path=workflow_path) labels = [] for step in steps: step_id = input_label(step) if step_id: labels.append(step_id) return labels def required_input_steps(workflow_path): try: steps = inputs_normalized(workflow_path=workflow_path) except Exception: raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") required_steps = [] for input_step in steps: if input_step.get("optional", False) or input_step.get("default") is not None: continue required_steps.append(input_step) return required_steps def required_input_labels(workflow_path): return map(input_label, required_input_steps(workflow_path)) def input_label(input_step): """Get the normalized label of a step returned from inputs_normalized. Relies on gxformat2's canonical id contract: every normalized input has a stable ``id`` (including synthetic ``_unlabeled_input_*`` sentinels for unlabeled inputs), so no anonymity heuristic is re-derived here. """ step_id = input_step.get("id") or input_step.get("label") return step_id def output_labels(workflow_path): outputs = outputs_normalized(workflow_path=workflow_path) return [o["id"] for o in outputs] def output_stubs_for_workflow(workflow_path, **kwds): """ Return output labels and class. """ if kwds.get("from_invocation"): return _job_outputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"]) outputs = {} for label in output_labels(workflow_path): if not Labels.is_anonymous_output_label(label): outputs[label] = {"class": ""} return outputs def job_template(workflow_path, **kwds): """Return a job template for specified workflow. A dictionary describing non-optional inputs that must be specified to run the workflow. """ if kwds.get("from_invocation"): return _job_inputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"]) template = {} for required_input_step in required_input_steps(workflow_path): i_label = input_label(required_input_step) input_type = required_input_step["type"] if input_type == "data": template[i_label] = { "class": "File", "path": "todo_test_data_path.ext", } elif input_type == "collection": template[i_label] = { "class": "Collection", "collection_type": "list", "elements": [ { "class": "File", "identifier": "todo_element_name", "path": "todo_test_data_path.ext", } ], } elif input_type in ["string", "text", "int", "integer", "long", "float", "double", "boolean", "color"]: template[i_label] = "todo_param_value" else: template[i_label] = { "TODO", # Does this work yet? } return template def _collection_elements_for_type(collection_type): """Generate appropriate sample elements for a collection type.""" if collection_type == "paired": return [ { "class": "File", "identifier": "forward", "path": "todo_test_data_path_forward.ext", }, { "class": "File", "identifier": "reverse", "path": "todo_test_data_path_reverse.ext", }, ] elif collection_type == "list:paired": return [ { "class": "Collection", "type": "paired", "identifier": "todo_element_name", "elements": [ { "class": "File", "identifier": "forward", "path": "todo_test_data_path_forward.ext", }, { "class": "File", "identifier": "reverse", "path": "todo_test_data_path_reverse.ext", }, ], } ] else: # Default to list return [ { "class": "File", "identifier": "todo_element_name", "path": "todo_test_data_path.ext", } ] def _build_template_and_metadata_from_inputs( all_inputs: List[Dict[str, Any]], ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Build job template and metadata from normalized workflow inputs. Args: all_inputs: List of normalized input step definitions from gxformat2 Returns: Tuple of (template_dict, metadata_dict) """ template: Dict[str, Any] = {} metadata: Dict[str, Any] = {} for input_step in all_inputs: i_label = input_label(input_step) input_type = input_step["type"] input_doc = input_step.get("doc", "") is_optional = input_step.get("optional", False) default_value = input_step.get("default") has_default = default_value is not None input_format = input_step.get("format", "") if isinstance(input_format, list): input_format = ", ".join(input_format) collection_type = input_step.get("collection_type", "") # Store metadata for this input metadata[i_label] = { "type": input_type, "doc": input_doc, "optional": is_optional or has_default, "default": default_value, "format": input_format, "collection_type": collection_type, } if input_type == "data": template[i_label] = { "class": "File", "path": "todo_test_data_path.ext", } elif input_type == "collection": coll_type = collection_type or "list" template[i_label] = { "class": "Collection", "collection_type": coll_type, "elements": _collection_elements_for_type(coll_type), } elif input_type in ["string", "text", "int", "integer", "long", "float", "double", "boolean", "color"]: # Use default value if available, otherwise use placeholder or false for booleans if has_default: template[i_label] = default_value elif input_type == "boolean": template[i_label] = False else: template[i_label] = "todo_param_value" else: template[i_label] = { "TODO", # Does this work yet? } return template, metadata def _job_template_with_metadata_from_dict(workflow_dict: Dict[str, Any]): """Generate job template and metadata from a workflow dictionary. This handles both GA format (.ga) and gxformat2 format workflows. Args: workflow_dict: Workflow definition dict Returns: Tuple of (template_dict, metadata_dict) """ # Write workflow to temp file and use inputs_normalized # This handles both GA and gxformat2 formats consistently is_ga_format = ( workflow_dict.get("a_galaxy_workflow", False) or "steps" in workflow_dict and isinstance(workflow_dict.get("steps"), dict) ) suffix = ".ga" if is_ga_format else ".gxwf.yml" with tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) as wf_file: if is_ga_format: json.dump(workflow_dict, wf_file) else: yaml.dump(workflow_dict, wf_file) wf_file.flush() wf_path = wf_file.name try: all_inputs = inputs_normalized(workflow_path=wf_path) except Exception: raise Exception("Input workflow could not be successfully normalized from TRS endpoint.") finally: if os.path.exists(wf_path): os.unlink(wf_path) return _build_template_and_metadata_from_inputs(all_inputs)
[docs] def is_trs_identifier(identifier: str) -> bool: """Check if the identifier is a TRS ID or TRS URI. Args: identifier: The workflow identifier to check Returns: True if it's a TRS ID or URI, False otherwise """ # Full Dockstore TRS URL if identifier.startswith(DOCKSTORE_TRS_BASE): return True # TRS URI if identifier.startswith(TRS_WORKFLOWS_PREFIX): return True # Short TRS ID format if identifier.startswith(("workflow/", "tool/", "#workflow/", "#tool/")) and "/github.com/" in identifier: return True return False
def job_template_with_metadata(workflow_path, **kwds): """Return a job template with metadata for each input. Returns a tuple of (template_dict, metadata_dict) where metadata_dict contains type, doc (description), optional status, default value, and format for each input label. The workflow_path can be: - A local file path to a workflow file - A TRS ID (e.g., workflow/github.com/org/repo/workflow_name) - A TRS URI (e.g., trs://workflow/github.com/org/repo/workflow_name) - A full Dockstore TRS URL """ if kwds.get("from_invocation"): # For invocation-based templates, we don't have metadata return _job_inputs_template_from_invocation(workflow_path, kwds["galaxy_url"], kwds["galaxy_user_key"]), {} # Check if this is a TRS identifier if is_trs_identifier(workflow_path): # Fetch workflow from TRS and write to temp file for processing workflow_dict = fetch_workflow_from_trs(workflow_path) return _job_template_with_metadata_from_dict(workflow_dict) try: all_inputs = inputs_normalized(workflow_path=workflow_path) except Exception: raise Exception("Input workflow could not be successfully normalized - try linting with planemo workflow_lint.") return _build_template_and_metadata_from_inputs(all_inputs) def new_workflow_associated_path(workflow_path, suffix="tests"): """Generate path for test or job YAML file next to workflow.""" base, input_ext = os.path.splitext(workflow_path) # prefer -tests.yml but if the author uses underscores or .yaml respect that. sep = "-" if "_" in base and "-" not in base: sep = "_" ext = "yml" if "yaml" in input_ext: ext = "yaml" return base + sep + suffix + "." + ext def rewrite_job_file(input_file, output_file, job): """Rewrite a job file with galaxy_ids for upload_data subcommand""" with open(input_file) as f: job_contents = yaml.safe_load(f) for job_input, job_input_name in job_contents.items(): if isinstance(job[job_input], dict): # dataset or collection job_contents[job_input] = {"class": job_input_name["class"], "galaxy_id": job[job_input]["id"]} # else: presumably a parameter, no need to modify with open(output_file, "w") as f: yaml.dump(job_contents, f) def get_workflow_from_invocation_id(invocation_id, galaxy_url, galaxy_api_key): user_gi = gi(url=galaxy_url, key=galaxy_api_key) workflow_id = user_gi.invocations.show_invocation(invocation_id)["workflow_id"] workflow = get_dict_from_workflow(user_gi, workflow_id, instance=True) workflow_name = "-".join(workflow["name"].split()) with open(f"{workflow_name}.ga", "w") as workflow_out: json.dump(workflow, workflow_out, ensure_ascii=False, indent=4) return workflow_name def _elements_to_test_def( elements: List[Dict[str, Any]], test_data_base_path: str, download_function: Callable, definition_style: str = "input", ): element_test_def = [] output_element_test_def = {} if definition_style == "output": elements = elements[:1] for element in elements: if element["element_type"] == "dataset_collection": nested_elements = _elements_to_test_def( element["object"]["elements"], test_data_base_path, download_function, definition_style=definition_style, ) test_def = {} if definition_style == "input": test_def["class"] = "Collection" test_def["type"] = element["object"]["collection_type"] test_def["identifier"] = element["element_identifier"] test_def["elements"] = nested_elements element_test_def.append(test_def) else: output_element_test_def[element["element_identifier"]] = {"elements": nested_elements} elif element["element_type"] == "hda": ext = element["object"]["file_ext"] path = f"{test_data_base_path}_{element['element_identifier']}.{ext}" download_function( element["object"]["id"], use_default_filename=False, file_path=path, ) if definition_style == "input": test_def = { "class": "File", "identifier": element["element_identifier"], "path": path, } element_test_def.append(test_def) else: output_element_test_def[element["element_identifier"]] = {"path": path} if definition_style == "input": return element_test_def else: return output_element_test_def def _job_inputs_template_from_invocation(invocation_id, galaxy_url, galaxy_api_key): user_gi = gi(url=galaxy_url, key=galaxy_api_key) invocation = user_gi.invocations.show_invocation(invocation_id) template = {} for input_step in invocation["inputs"].values(): if input_step["src"] == "hda": ext = user_gi.datasets.show_dataset(input_step["id"])["extension"] user_gi.datasets.download_dataset( input_step["id"], use_default_filename=False, file_path=f"test-data/{input_step['label']}.{ext}" ) template[input_step["label"]] = { "class": "File", "path": f"test-data/{input_step['label']}.{ext}", "filetype": ext, } elif input_step["src"] == "hdca": collection = user_gi.dataset_collections.show_dataset_collection(input_step["id"]) test_def = { "class": "Collection", "collection_type": collection["collection_type"], "elements": _elements_to_test_def( collection["elements"], test_data_base_path=f"test-data/{input_step['label']}", download_function=user_gi.datasets.download_dataset, ), } template[input_step["label"]] = test_def for param, param_step in invocation["input_step_parameters"].items(): template[param] = param_step["parameter_value"] return template def _job_outputs_template_from_invocation(invocation_id, galaxy_url, galaxy_api_key): user_gi = gi(url=galaxy_url, key=galaxy_api_key) invocation = user_gi.invocations.show_invocation(invocation_id) outputs = {} for label, output in invocation["outputs"].items(): ext = user_gi.datasets.show_dataset(output["id"])["extension"] user_gi.datasets.download_dataset( output["id"], use_default_filename=False, file_path=f"test-data/{label}.{ext}" ) outputs[label] = {"path": f"test-data/{label}.{ext}"} for label, output in invocation["output_collections"].items(): collection = user_gi.dataset_collections.show_dataset_collection(output["id"]) element_tests = _elements_to_test_def( collection["elements"], test_data_base_path=f"test-data/{label}", download_function=user_gi.datasets.download_dataset, definition_style="outputs", ) outputs[label] = {"element_tests": element_tests} return outputs __all__ = ( "describe_outputs", "DOCKSTORE_TRS_BASE", "fetch_workflow_from_trs", "import_workflow", "import_workflow_from_trs", "is_trs_identifier", "TRS_WORKFLOWS_PREFIX", )