"""This module contains :func:`build` to build tool descriptions.
This class is used by the `tool_init` command and can be used to build
Galaxy and CWL tool descriptions.
"""
import os
import re
import shlex
import shutil
import subprocess
from collections import namedtuple
from planemo import (
io,
templates,
)
from planemo.autopygen.argument_parser_conversion import (
command_from_decoy,
obtain_and_convert_parser,
xml_from_decoy,
xml_to_string,
)
REUSING_MACROS_MESSAGE = (
"Macros file macros.xml already exists, assuming " " it has relevant planemo-generated definitions."
)
DEFAULT_CWL_VERSION = "v1.0"
TOOL_TEMPLATE = """<tool id="{{id}}" name="{{name}}" version="{{version}}+galaxy0" python_template_version="3.5" profile="21.05">
{%- if description %}
<description>{{ description }}</description>
{%- endif %}
{%- if macros %}
<macros>
<import>macros.xml</import>
</macros>
<expand macro="requirements" />
{%- if version_command %}
<expand macro="version_command" />
{%- endif %}
{%- else %}
<requirements>
{%- for requirement in requirements %}
{{ requirement }}
{%- endfor %}
{%- for container in containers %}
{{ container }}
{%- endfor %}
</requirements>
{%- if version_command %}
<version_command>{{ version_command }}</version_command>
{%- endif %}
{%- endif %}
<command detect_errors="exit_code"><![CDATA[
{%- if command %}
{{ command }}
{%- endif %}
{%- if not command and auto_commands %}
TODO: executable name
{%- endif %}
{%- if auto_commands %}
{{ auto_commands }}
{%- endif %}
{%- if not (command or auto_commands) %}
TODO: Fill in command template.
{%- endif %}
]]></command>
<inputs>
{%- for input in inputs %}
{{ input }}
{%- endfor %}
{%- if auto_inputs %}
{{ auto_inputs }}
{%- endif %}
</inputs>
<outputs>
{%- for output in outputs %}
{{ output }}
{%- endfor %}
</outputs>
{%- if tests %}
<tests>
{%- for test in tests %}
<test>
{%- for param in test.params %}
<param name="{{ param[0]}}" value="{{ param[1] }}"/>
{%- endfor %}
{%- for output in test.outputs %}
<output name="{{ output[0] }}" file="{{ output[1] }}"/>
{%- endfor %}
</test>
{%- endfor %}
</tests>
{%- endif %}
<help><![CDATA[
{%- if help %}
{{ help }}
{%- endif %}
{%- if auto_help %}
{{ auto_help }}
{%- endif %}
{%- if not (help or auto_help) %}
TODO: Fill in help.
{%- endif %}
]]></help>
{%- if macros %}
<expand macro="citations" />
{%- else %}
{%- if doi or bibtex_citations %}
<citations>
{%- for single_doi in doi %}
<citation type="doi">{{ single_doi }}</citation>
{%- endfor %}
{%- for bibtex_citation in bibtex_citations %}
<citation type="bibtex">{{ bibtex_citation }}</citation>
{%- endfor %}
</citations>
{%- endif %}
{%- endif %}
</tool>
"""
MACROS_TEMPLATE = """<macros>
<xml name="requirements">
<requirements>
{%- for requirement in requirements %}
{{ requirement }}
{%- endfor %}
<yield/>
{%- for container in containers %}
{{ container }}
{%- endfor %}
</requirements>
</xml>
<xml name="citations">
<citations>
{%- for single_doi in doi %}
<citation type="doi">{{ single_doi }}</citation>
{%- endfor %}
{%- for bibtex_citation in bibtex_citations %}
<citation type="bibtex">{{ bibtex_citation }}</citation>
{%- endfor %}
<yield />
</citations>
</xml>
{%- if version_command %}
<xml name="version_command">
<version_command>{{ version_command }}</version_command>
</xml>
{%- endif %}
</macros>
"""
CWL_TEMPLATE = """#!/usr/bin/env cwl-runner
cwlVersion: '{{cwl_version}}'
class: CommandLineTool
id: "{{id}}"
label: "{{label}}"
{%- if containers or requirements %}
hints:
{%- for container in containers %}
DockerRequirement:
dockerPull: {{ container.image_id }}
{%- endfor %}
{%- if requirements %}
SoftwareRequirement:
packages:
{%- for requirement in requirements %}
- package: {{ requirement.name }}
{%- if requirement.version %}
version:
- "{{ requirement.version }}"
{%- else %}
version: []
{%- endif %}
{%- endfor %}
{%- endif %}
{%- endif %}
{%- if inputs or outputs %}
inputs:
{%- for input in inputs %}
{{ input.id }}:
type: {{ input.type }}
doc: |
TODO
inputBinding:
position: {{ input.position }}
{%- if input.prefix %}
prefix: "{{input.prefix.prefix}}"
{%- if not input.prefix.separated %}
separate: false
{%- endif %}
{%- endif %}
{%- endfor %}
{%- for output in outputs %}
{%- if output.require_filename %}
{{ output.id }}:
type: string
doc: |
Filename for output {{ output.id }}
inputBinding:
position: {{ output.position }}
{%- if output.prefix %}
prefix: "{{output.prefix.prefix}}"
{%- if not output.prefix.separated %}
separate: false
{%- endif %}
{%- endif %}
{%- endif %}
{%- endfor %}
{%- else %}
inputs: [] # TODO
{%- endif %}
{%- if outputs %}
outputs:
{%- for output in outputs %}
{{ output.id }}:
type: File
outputBinding:
glob: {{ output.glob }}
{%- endfor %}
{%- else %}
outputs: [] # TODO
{%- endif %}
{%- if base_command %}
baseCommand:
{%- for base_command_part in base_command %}
- "{{ base_command_part}}"
{%- endfor %}
{%- else %}
baseCommand: []
{%- endif %}
{%- if arguments %}
arguments:
{%- for argument in arguments %}
- valueFrom: "{{ argument.value }}"
position: {{ argument.position }}
{%- if argument.prefix %}
prefix: "{{argument.prefix.prefix}}"
{%- if not argument.prefix.separated %}
separate: false
{%- endif %}
{%- endif %}
{%- endfor %}
{%- else %}
arguments: []
{%- endif %}
{%- if stdout %}
stdout: {{ stdout }}
{%- endif %}
doc: |
{%- if help %}
{{ help|indent(2) }}
{%- else %}
TODO: Fill in description.
{%- endif %}
"""
CWL_TEST_TEMPLATE = """
- doc: test generated from example command
job: {{ job_filename }}
{%- if outputs %}
outputs:
{%- for output in outputs %}
{{ output.id }}:
path: test-data/{{ output.example_value }}
{%- endfor %}
{%- else %}
outputs: TODO
{%- endif %}
"""
CWL_JOB_TEMPLATE = """
{%- if inputs %}
{%- for input in inputs %}
{%- if input.type == "File" %}
{{ input.id }}:
class: File
path: test-data/{{ input.example_value }}
{%- else %}
{{ input.id }}: {{ input.example_value }}
{%- endif %}
{%- endfor %}
{%- else %}
# TODO: Specify job input.
{}
{%- endif %}
"""
[docs]def build(**kwds):
"""Build up a :func:`ToolDescription` from supplid arguments."""
if kwds.get("cwl"):
builder = _build_cwl
else:
builder = _build_galaxy
return builder(**kwds)
def _build_cwl(**kwds):
_handle_help(kwds)
_handle_requirements(kwds)
assert len(kwds["containers"]) <= 1, kwds
command_io = CommandIO(**kwds)
render_kwds = {
"cwl_version": DEFAULT_CWL_VERSION,
"help": kwds.get("help", ""),
"containers": kwds.get("containers", []),
"requirements": kwds.get("requirements", []),
"id": kwds.get("id"),
"label": kwds.get("name"),
}
render_kwds.update(command_io.cwl_properties())
contents = _render(render_kwds, template_str=CWL_TEMPLATE)
tool_files = []
test_files = []
if kwds["test_case"]:
sep = "-" if "-" in kwds.get("id") else "_"
tests_path = f"{kwds.get('id')}{sep}tests.yml"
job_path = f"{kwds.get('id')}{sep}job.yml"
render_kwds["job_filename"] = job_path
test_contents = _render(render_kwds, template_str=CWL_TEST_TEMPLATE)
job_contents = _render(render_kwds, template_str=CWL_JOB_TEMPLATE)
tool_files.append(ToolFile(tests_path, test_contents, "test"))
tool_files.append(ToolFile(job_path, job_contents, "job"))
for cwl_input in render_kwds["inputs"] or []:
if cwl_input.type == "File" and cwl_input.example_value:
test_files.append(cwl_input.example_value)
for cwl_output in render_kwds["outputs"] or []:
if cwl_output.example_value:
test_files.append(cwl_output.example_value)
return ToolDescription(contents, tool_files=tool_files, test_files=test_files)
def _build_galaxy(**kwds):
# Test case to build up from supplied inputs and outputs, ultimately
# ignored unless kwds["test_case"] is truthy.
_handle_help(kwds)
# process raw cite urls
cite_urls = kwds.get("cite_url", [])
del kwds["cite_url"]
citations = list(map(UrlCitation, cite_urls))
kwds["bibtex_citations"] = citations
# handle requirements and containers
_handle_requirements(kwds)
command_io = CommandIO(**kwds)
kwds["inputs"] = command_io.inputs
kwds["outputs"] = command_io.outputs
kwds["command"] = command_io.cheetah_template
kwds["auto_inputs"] = command_io.auto_inputs
kwds["auto_commands"] = command_io.auto_commands
kwds["version_command"] = command_io.version_command
test_case = command_io.test_case()
# finally wrap up tests
tests, test_files = _handle_tests(kwds, test_case)
kwds["tests"] = tests
# Render tool content from template.
contents = _render(kwds)
tool_files = []
append_macro_file(tool_files, kwds)
return ToolDescription(contents, tool_files=tool_files, test_files=test_files)
def append_macro_file(tool_files, kwds):
macro_contents = None
if kwds["macros"]:
macro_contents = _render(kwds, MACROS_TEMPLATE)
macros_file = "macros.xml"
if not os.path.exists(macros_file):
tool_files.append(ToolFile(macros_file, macro_contents, "macros"))
io.info(REUSING_MACROS_MESSAGE)
class CommandIO:
def __init__(self, **kwds):
command = _find_command(kwds)
cheetah_template = command
# process raw inputs
inputs = kwds.pop("input", [])
inputs = list(map(Input, inputs or []))
# alternatively process example inputs
example_inputs = kwds.pop("example_input", [])
for i, input_file in enumerate(example_inputs or []):
name = f"input{i + 1}"
inputs.append(Input(input_file, name=name, example=True))
cheetah_template = _replace_file_in_command(cheetah_template, input_file, name)
auto_inputs = None
auto_commands = None
auto_help = None
version_command = None
parser_path = kwds.get("autopygen", None)
if parser_path is not None:
parser = obtain_and_convert_parser(parser_path)
if parser is not None:
data_inputs = dict()
reserved_names = set()
name_map = dict()
section_map = dict()
generated_inputs, _, version_command_param = xml_from_decoy(
parser, data_inputs, reserved_names, name_map, section_map
)
auto_inputs = xml_to_string(generated_inputs, 8)
# TODO make them useful auto_outputs = xml_to_string(generated_outputs, 8)
auto_commands = command_from_decoy(
parser, data_inputs, reserved_names, name_map, section_map, skip_default_namespace=True
)
if version_command_param:
version_command = f"[TODO exec name] {version_command_param.argument}"
auto_help = parser.format_help()
# handle raw outputs (from_work_dir ones) as well as named_outputs
outputs = kwds.pop("output", [])
outputs = list(map(Output, outputs or []))
named_outputs = kwds.pop("named_output", [])
for named_output in named_outputs or []:
outputs.append(Output(name=named_output, example=False))
# handle example outputs
example_outputs = kwds.pop("example_output", [])
for i, output_file in enumerate(example_outputs or []):
name = f"output{i + 1}"
from_path = output_file
use_from_path = True
if output_file in cheetah_template:
# Actually found the file in the command, assume it can
# be specified directly and skip from_work_dir.
use_from_path = False
output = Output(name=name, from_path=from_path, use_from_path=use_from_path, example=True)
outputs.append(output)
cheetah_template = _replace_file_in_command(cheetah_template, output_file, output.name)
self.inputs = inputs
self.outputs = outputs
self.command = command
self.auto_inputs = auto_inputs
self.auto_commands = auto_commands
self.auto_help = auto_help
self.version_command = version_command
self.cheetah_template = cheetah_template
def example_input_names(self):
for input in self.inputs:
if input.example:
yield input.input_description
def example_output_names(self):
for output in self.outputs:
if output.example:
yield output.example_path
def cwl_lex_list(self):
if not self.command:
return []
command_parts = shlex.split(self.command)
parse_list = []
input_count = 0
output_count = 0
index = 0
prefixed_parts = []
while index < len(command_parts):
value = command_parts[index]
eq_split = value.split("=")
prefix = None
if not _looks_like_start_of_prefix(index, command_parts):
index += 1
elif len(eq_split) == 2:
prefix = Prefix(eq_split[0] + "=", False)
value = eq_split[1]
index += 1
else:
prefix = Prefix(value, True)
value = command_parts[index + 1]
index += 2
prefixed_parts.append((prefix, value))
for position, (prefix, value) in enumerate(prefixed_parts):
if value in self.example_input_names():
input_count += 1
input = _CwlInput(
f"input{input_count}",
position,
prefix,
value,
)
parse_list.append(input)
elif value in self.example_output_names():
output_count += 1
output = _CwlOutput(
f"output{output_count}",
position,
prefix,
value,
)
parse_list.append(output)
elif prefix:
param_id = prefix.prefix.lower().rstrip("=")
type_ = param_type(value)
input = _CwlInput(
param_id,
position,
prefix,
value,
type_=type_,
)
parse_list.append(input)
else:
part = _CwlCommandPart(value, position, prefix)
parse_list.append(part)
return parse_list
def cwl_properties(self):
base_command = []
arguments = []
inputs = []
outputs = []
lex_list = self.cwl_lex_list()
index = 0
while index < len(lex_list):
token = lex_list[index]
if isinstance(token, _CwlCommandPart):
base_command.append(token.value)
else:
break
index += 1
while index < len(lex_list):
token = lex_list[index]
if token.is_token(">"):
break
token.position = index - len(base_command) + 1
if isinstance(token, _CwlCommandPart):
arguments.append(token)
elif isinstance(token, _CwlInput):
inputs.append(token)
elif isinstance(token, _CwlOutput):
token.glob = f"$(inputs.{token.id})"
outputs.append(token)
index += 1
stdout = None
if index < len(lex_list):
token = lex_list[index]
if token.is_token(">") and (index + 1) < len(lex_list):
output_token = lex_list[index + 1]
if not isinstance(output_token, _CwlOutput):
output_token = _CwlOutput("std_out", None)
output_token.glob = "out"
output_token.require_filename = False
outputs.append(output_token)
stdout = "out"
index += 2
else:
io.warn("Example command too complex, you will need to build it up manually.")
return {
"inputs": inputs,
"outputs": outputs,
"arguments": arguments,
"base_command": base_command,
"stdout": stdout,
}
def test_case(self):
test_case = TestCase()
for input in self.inputs:
if input.example:
test_case.params.append((input.name, input.input_description))
for output in self.outputs:
if output.example:
test_case.outputs.append((output.name, output.example_path))
return test_case
def _looks_like_start_of_prefix(index, parts):
value = parts[index]
if len(value.split("=")) == 2:
return True
if index + 1 == len(parts):
return False
next_value = parts[index + 1]
next_value_is_not_start = (len(value.split("=")) != 2) and next_value[0] not in ["-", ">", "<", "|"]
return value.startswith("-") and next_value_is_not_start
Prefix = namedtuple("Prefix", ["prefix", "separated"])
class _CwlCommandPart:
def __init__(self, value, position, prefix):
self.value = value
self.position = position
self.prefix = prefix
def is_token(self, value):
return self.value == value
class _CwlInput:
def __init__(self, id, position, prefix, example_value, type_="File"):
self.id = id
self.position = position
self.prefix = prefix
self.example_value = example_value
self.type = type_
def is_token(self, value):
return False
class _CwlOutput:
def __init__(self, id, position, prefix, example_value):
self.id = id
self.position = position
self.prefix = prefix
self.glob = None
self.example_value = example_value
self.require_filename = True
def is_token(self, value):
return False
def _render(kwds, template_str=TOOL_TEMPLATE):
"""Render template variables to generate the final tool."""
return templates.render(template_str, **kwds)
def _replace_file_in_command(command, specified_file, name):
"""Replace example file with cheetah variable name.
Be sure to single quote the name.
"""
# TODO: check if the supplied variant was single quoted already.
if f'"{specified_file}"' in command:
# Sample command already wrapped filename in double quotes
command = command.replace(f'"{specified_file}"', f"'${name}'")
elif (f" {specified_file} ") in (" " + command + " "):
# In case of spaces, best to wrap filename in double quotes
command = command.replace(specified_file, f"'${name}'")
else:
command = command.replace(specified_file, f"${name}")
return command
def _handle_help(kwds):
"""Convert supplied help parameters into a help variable for template.
If help_text is supplied, use as is. If help is specified from a command,
run the command and use that help text.
"""
help_text = kwds.get("help_text")
if not help_text:
help_from_command = kwds.get("help_from_command")
if help_from_command:
p = subprocess.Popen(
help_from_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
)
help_text = p.communicate()[0]
del kwds["help_text"]
del kwds["help_from_command"]
kwds["help"] = help_text
def _handle_tests(kwds, test_case):
"""Build tool test abstractions.
Given state built up from handling rest of arguments (test_case) and
supplied kwds - build tests for template and corresponding test files.
"""
test_files = []
if kwds["test_case"]:
tests = [test_case]
test_files.extend(map(lambda x: x[1], test_case.params))
test_files.extend(map(lambda x: x[1], test_case.outputs))
else:
tests = []
return tests, test_files
def _handle_requirements(kwds):
"""Build tool requirement abstractions.
Convert requirements and containers specified from the command-line
into abstract format for consumption by the template.
"""
requirements = kwds["requirement"]
del kwds["requirement"]
requirements = list(map(Requirement, requirements or []))
container = kwds["container"]
del kwds["container"]
containers = list(map(Container, container or []))
kwds["requirements"] = requirements
kwds["containers"] = containers
def _find_command(kwds):
"""Find base command from supplied arguments or just return `None`.
If no such command was supplied (template will just replace this
with a TODO item).
"""
command = kwds.get("command")
if not command:
command = kwds.get("example_command", None)
if command:
del kwds["example_command"]
return command
class UrlCitation:
"""Describe citation for tool."""
def __init__(self, url):
self.url = url
def __str__(self):
if "github.com" in self.url:
return self._github_str()
else:
return self._url_str()
def _github_str(self):
url = self.url
title = url.split("/")[-1]
return f"""
@misc{{github{title},
author = {{LastTODO, FirstTODO}},
year = {{TODO}},
title = {{{title}}},
publisher = {{GitHub}},
journal = {{GitHub repository}},
url = {{{url}}},
}}"""
def _url_str(self):
url = self.url
return f"""
@misc{{renameTODO,
author = {{LastTODO, FirstTODO}},
year = {{TODO}},
title = {{TODO}},
url = {{{url}}},
}}"""
class ToolFile:
def __init__(self, filename, contents, description):
self.filename = filename
self.contents = contents
self.description = description
class Input:
def __init__(self, input_description, name=None, example=False):
parts = input_description.split(".")
name = name or parts[0]
if len(parts) > 0:
datatype = ".".join(parts[1:])
else:
datatype = "data"
self.input_description = input_description
self.example = example
self.name = name
self.datatype = datatype
def __str__(self):
self.datatype = self.datatype.split(".")[-1]
return f'<param type="data" name="{self.name}" format="{self.datatype}" />'
class Output:
def __init__(self, from_path=None, name=None, use_from_path=False, example=False):
if from_path:
parts = from_path.split(".")
name = name or parts[0]
if len(parts) > 1:
datatype = ".".join(parts[1:])
else:
datatype = "data"
else:
name = name
datatype = "data"
self.name = name
self.datatype = datatype
if use_from_path:
self.from_path = from_path
else:
self.from_path = None
self.example = example
if example:
self.example_path = from_path
def __str__(self):
if self.from_path:
return self._from_path_str()
else:
return self._named_str()
def _from_path_str(self):
return f'<data name="{self.name}" format="{self.datatype}" from_work_dir="{self.from_path}" />'
def _named_str(self):
return f'<data name="{self.name}" format="{self.datatype}" />'
class Requirement:
def __init__(self, requirement):
parts = requirement.split("@", 1)
if len(parts) > 1:
name = parts[0]
version = "@".join(parts[1:])
else:
name = parts[0]
version = None
self.name = name
self.version = version
def __str__(self):
if self.version is not None:
attrs = f' version="{self.version}"'
else:
attrs = ""
return f'<requirement type="package"{attrs}>{self.name}</requirement>'
def param_type(value):
if re.match(r"^\d+$", value):
return "int"
elif re.match(r"^\d+?\.\d+?$", value):
return "float"
else:
return "string"
class Container:
def __init__(self, image_id):
self.type = "docker"
self.image_id = image_id
def __str__(self):
return f'<container type="{self.type}">{self.image_id}</container>'
class TestCase:
def __init__(self):
self.params = []
self.outputs = []
__all__ = (
"build",
"ToolDescription",
"write_tool_description",
)