Source code for planemo.galaxy.invocations.simulations

"""Simulate Galaxy workflows running on a server for testing purposes."""

from collections import deque
from typing import (
    Dict,
    List,
    Optional,
)
from uuid import uuid4

import yaml

from .api import Invocation as InvocationResponse
from .api import InvocationJobsSummary
from .api import InvocationStep as InvocationStepResponse


[docs] class Ticks: after: int @property def active(self): return self.after <= 0
[docs] def tick(self) -> None: if self.active: self.tick_when_active() else: self.after -= 1
[docs] def tick_when_active(self) -> None: ...
[docs] class StateWithDuration(Ticks): def __init__(self, state: str, duration: int): self.after = 0 self.state = state self.duration = duration
[docs] def tick_when_active(self) -> None: self.duration -= 1
[docs] class HasState(Ticks): final: Optional[str] def __init__(self, after: int, states: List[StateWithDuration]): self.after = after or 0 self.states = deque(states) self.final_state: Optional[str] = None
[docs] def tick_when_active(self) -> None: if self.final_state is not None: return next_state = self.states.popleft() next_state.tick() if next_state.duration == 0 and not self.states: self.final_state = next_state.state elif next_state.duration != 0: self.states.appendleft(next_state)
# else: next state will be state during next tick @property def state(self): if self.final_state is not None: return self.final_state else: return self.states[0].state
Job = HasState
[docs] class InvocationStep(HasState): invocation: Optional["Invocation"] jobs: Optional[List[Job]] def __init__( self, jobs: List[Job], invocation: Optional["Invocation"], after: int, states: List[StateWithDuration] ): super().__init__(after, states) self.jobs = jobs self.invocation = invocation
[docs] def tick_when_active(self) -> None: super().tick_when_active() if self.jobs: for job in self.jobs: job.tick() if self.invocation: self.invocation.tick()
@property def active_jobs(self) -> List[Job]: return [j for j in (self.jobs or []) if j.active]
[docs] class Invocation(HasState): def __init__(self, steps: List[InvocationStep], after: int, states: List[StateWithDuration]): self.id = str(uuid4())[:8] self.steps = steps super().__init__(after, states)
[docs] def tick_when_active(self) -> None: super().tick_when_active() for step in self.steps: step.tick()
@property def active_steps(self) -> List[InvocationStep]: return [s for s in self.steps if s.active]
[docs] def get_invocation_by_id(self, invocation_id: str) -> Optional["Invocation"]: if self.id == invocation_id: return self for step in self.steps: step_invocation = step.invocation if step_invocation: step_subworkflow_invocation_with_id = step_invocation.get_invocation_by_id(invocation_id) if step_subworkflow_invocation_with_id is not None: return step_subworkflow_invocation_with_id return None
[docs] def get_subworkflow_invocation(self, subworkflow_invocation_id: str) -> "Invocation": for step in self.steps: if step.invocation and step.invocation.id == subworkflow_invocation_id: return step.invocation raise Exception(f"Unknown subworkflow invocation id ({subworkflow_invocation_id})")
[docs] def get_subworkflow_invocation_by_step_index(self, index: int) -> Optional["Invocation"]: return self.steps[index].invocation
[docs] def get_api_invocation(self) -> InvocationResponse: steps: List[InvocationStepResponse] = [] for step in self.active_steps: api_step: InvocationStepResponse = { "state": step.state, } if step.invocation: api_step["subworkflow_invocation_id"] = step.invocation.id steps.append(api_step) return { "id": self.id, "state": self.state, "steps": steps, }
[docs] def get_api_jobs_summary(self) -> InvocationJobsSummary: job_states = [] for step in self.active_steps: for job in step.active_jobs: api_job = { "state": job.state, } job_states.append(api_job) by_state: Dict[str, int] = {} for job_state in job_states: state = job_state["state"] if state not in by_state: by_state[state] = 0 by_state[state] += 1 return {"states": by_state}
[docs] def parse_workflow_simulation_from_string(workflow_simulation: str) -> Invocation: return parse_workflow_simulation(yaml.safe_load(workflow_simulation))
[docs] def parse_workflow_simulation(workflow_simulation: dict) -> Invocation: return parse_workflow_simulation_invocation(workflow_simulation)
[docs] def parse_workflow_simulation_job(workflow_simulation_job: dict) -> Job: states = parse_states_from(workflow_simulation_job) after = parse_after_from(workflow_simulation_job) return Job(after, states)
[docs] def parse_workflow_simulation_invocation_step(workflow_simulation_invocation_step: dict) -> InvocationStep: states = parse_states_from(workflow_simulation_invocation_step) after = parse_after_from(workflow_simulation_invocation_step) if "invocation" in workflow_simulation_invocation_step: invocation = parse_workflow_simulation_invocation(workflow_simulation_invocation_step["invocation"]) else: invocation = None jobs = None if "jobs" in workflow_simulation_invocation_step: jobs = [] for job in workflow_simulation_invocation_step.get("jobs") or []: jobs.append(parse_workflow_simulation_job(job)) return InvocationStep(jobs, invocation, after, states)
[docs] def parse_workflow_simulation_invocation(workflow_simulation_invocation: dict) -> Invocation: states = parse_states_from(workflow_simulation_invocation) after = parse_after_from(workflow_simulation_invocation) steps = [] for step in workflow_simulation_invocation.get("steps") or []: steps.append(parse_workflow_simulation_invocation_step(step)) return Invocation(steps, after, states)
[docs] def parse_after_from(simulation_object: dict) -> int: return simulation_object.get("after", 0)
[docs] def parse_states_from(simulation_object: dict) -> List[StateWithDuration]: if "states" in simulation_object: states = simulation_object["states"] states_with_duration = [] for state in states: if ":" in state: state, duration_str = state.split(":", 1) duration = int(duration_str) state_with_duration = StateWithDuration(state, duration) else: state_with_duration = StateWithDuration(state, 1) states_with_duration.append(state_with_duration) return states_with_duration else: state = simulation_object["state"] return [StateWithDuration(state, 1)]