import os
import subprocess
import sys
import tarfile
import zipfile
from ftplib import all_errors as FTPErrors # tuple of exceptions
from typing import List
from urllib.error import URLError
from urllib.request import urlretrieve
from xml.etree import ElementTree
from galaxy.util import unicodify
TOOLSHED_MAP = {
"toolshed": "https://toolshed.g2.bx.psu.edu",
"testtoolshed": "https://testtoolshed.g2.bx.psu.edu",
}
[docs]
class Dependencies:
"""Base class for parsing Tool Shed dependency files."""
def __init__(
self,
dependencies_file,
repo=None,
package_factory=None,
):
if package_factory is None:
package_factory = BasePackage
self.repo = repo
self.root = ElementTree.parse(dependencies_file).getroot()
packages = []
dependencies = []
package_els = self.root.findall("package")
assert package_els is not None
for package_el in package_els:
install_els = package_el.findall("install")
readme_els = package_el.findall("readme")
if len(readme_els) > 0:
readme = readme_els[0].text
else:
readme = None
assert len(install_els) in (0, 1)
if len(install_els) == 1:
install_el = install_els[0]
package = package_factory(self, package_el, install_el, readme=readme)
packages.append(package)
else:
repository_el = package_el.find("repository")
if repository_el is None:
message = f"no repository in package el for {repo}"
raise AssertionError(message)
dependency = Dependency(self, package_el, repository_el)
dependencies.append(dependency)
self.packages = packages
self.dependencies = dependencies
[docs]
def single_package(self):
return len(self.packages) == 1
def __repr__(self):
return f"Dependencies[for_repo={self.repo}]"
[docs]
class Repo:
def __init__(self, **kwds):
for key, value in kwds.items():
setattr(self, key, value)
[docs]
def recipe_base_name(self):
owner = self.owner.replace("-", "")
name = self.name
name = name.replace("_", "").replace("-", "")
base = f"{owner}_{name}"
return base
[docs]
@staticmethod
def from_xml(elem):
tool_shed_url = elem.attrib.get("toolshed", None)
if tool_shed_url and ("testtoolshed" in tool_shed_url):
prefix = "testtoolshed"
else:
prefix = "toolshed"
prior = elem.attrib.get("prior_installation_required", False)
return Repo(
prefix=prefix,
name=elem.attrib["name"],
owner=elem.attrib["owner"],
tool_shed_url=tool_shed_url,
changeset_revision=elem.attrib.get("changeset_revision", None),
prior_installation_required=prior,
)
[docs]
@staticmethod
def from_api(prefix, repo_json):
return Repo(
prefix=prefix,
name=repo_json["name"],
owner=repo_json["owner"],
tool_shed_url=TOOLSHED_MAP[prefix],
)
[docs]
def get_file(self, path):
try:
url = f"{self.tool_shed_url}/repos/{self.owner}/{self.name}/raw-file/tip/{path}"
path, headers = urlretrieve(url)
return path
except Exception as e:
print(e)
return None
def __repr__(self):
return f"Repository[name={self.name},owner={self.owner}]"
[docs]
class Dependency:
def __init__(self, dependencies, package_el, repository_el):
self.dependencies = dependencies
self.package_el = package_el
self.repository_el = repository_el
self.repo = Repo.from_xml(repository_el)
def __repr__(self):
return (
f"Dependency[package_name={self.package_el.attrib['name']},version={self.package_el.attrib['version']},"
f"dependent_package={self.repository_el.attrib['name']}]"
)
[docs]
class BasePackage:
def __init__(self, dependencies, package_el, install_el, readme):
self.dependencies = dependencies
self.package_el = package_el
self.install_el = install_el
self.readme = readme
self.all_actions = self.get_all_actions()
self.no_arch_option = self.has_no_achitecture_install()
[docs]
def get_all_actions(self):
action_or_group = self.install_el[0]
parsed_actions = []
if action_or_group.tag == "actions":
parsed_actions.append(self.parse_actions(action_or_group))
elif action_or_group.tag == "actions_group":
actions_els = action_or_group.findall("actions")
assert actions_els is not None
for actions in actions_els:
parsed_actions.append(self.parse_actions(actions))
action_els = action_or_group.findall("action")
assert action_els is not None
for action in action_els:
for parsed_a in parsed_actions:
parsed_a.actions.append(self.parse_action(action))
return parsed_actions
[docs]
def has_no_achitecture_install(self):
all_actions = self.all_actions
if len(all_actions) < 2:
return False
else:
last_action = all_actions[-1]
return (not last_action.architecture) and (not last_action.os)
[docs]
def has_explicit_set_environments(self):
all_actions = self.all_actions
for actions in all_actions:
for action in actions.actions:
if action.explicit_variables:
return True
return False
[docs]
def has_multiple_set_environments(self):
all_actions = self.all_actions
for actions in all_actions:
count = 0
for action in actions.actions:
if action.explicit_variables:
count += 1
if count > 1:
return True
return False
[docs]
def parse_actions(self, actions):
os = actions.attrib.get("os", None)
architecture = actions.get("architecture", None)
action_els = actions.findall("action")
assert action_els is not None
parsed_actions = list(map(self.parse_action, action_els))
action_packages = []
for package in actions.findall("package"):
action_packages.append(self.parse_action_package(package))
return Actions(parsed_actions, os, architecture, action_packages)
[docs]
def parse_action_package(self, elem):
name = elem.attrib["name"]
version = elem.attrib["version"]
repo = Repo.from_xml(elem.find("repository"))
return ActionPackage(name, version, repo)
[docs]
def parse_action(self, action):
return BaseAction.from_elem(action, package=self)
def __repr__(self):
actions = self.all_actions
return (
f"Install[name={self.package_el.attrib['name']},version={self.package_el.attrib['version']},"
f"dependencies={self.dependencies},actions={actions}]"
)
[docs]
class Actions:
def __init__(self, actions, os=None, architecture=None, action_packages=[]):
self.os = os
self.architecture = architecture
self.actions = actions or []
self.action_packages = action_packages
[docs]
def first_download(self):
for action in self.actions:
if action.action_type in ["download_by_url", "download_file"]:
return action
return None
[docs]
def downloads(self):
actions = []
for action in self.actions:
if action.action_type in ["download_by_url", "download_file"]:
actions.append(action)
return actions
def __repr__(self):
platform = ""
if self.os or self.architecture:
platform = f"os={self.os},arch={self.architecture},"
return f"Actions[{platform}{map(str, self.actions)}]"
def _indent_extend(self, target, new_entries, indent=" "):
for line in new_entries:
target.append(indent + line)
[docs]
def to_bash(self):
# Use self.os.title() to match "Linux" or "Darwin" in bash where case matters:
if self.os and self.architecture:
condition = f'("{self.os.title()}" == `uname`) && ("{self.architecture}" == `arch`)'
elif self.os:
condition = f'"{self.os.title()}" == `uname`'
elif self.architecture:
condition = f'"{self.architecture}" == `arch`'
else:
condition = None
install_cmds = []
env_cmds = []
if condition:
# Conditional actions block
install_cmds = [
"#" + "-" * 60,
f"if [[ $specifc_action_done == 0 && {condition} ]]",
"then",
f' echo "Platform-specific action for os={self.os}, arch={self.architecture}"',
]
env_cmds = install_cmds[:]
# TODO - Refactor block indentation?
for action in self.actions:
i_cmds, e_cmds = action.to_bash()
self._indent_extend(install_cmds, i_cmds)
self._indent_extend(env_cmds, e_cmds)
# If we run the action, do not want to run any later actions!
install_cmds.extend([" specifc_action_done=1", "fi"])
env_cmds.extend([" specifc_action_done=1", "fi"])
else:
# Non-specific default action...
install_cmds = [
"#" + "-" * 60,
"if [[ $specifc_action_done == 0 ]]",
"then",
' echo "Non-platform-specific actions"',
]
env_cmds = install_cmds[:]
for action in self.actions:
i_cmds, e_cmds = action.to_bash()
self._indent_extend(install_cmds, i_cmds)
self._indent_extend(env_cmds, e_cmds)
install_cmds.append("fi")
env_cmds.append("fi")
return install_cmds, env_cmds
[docs]
class ActionPackage:
def __init__(self, name, version, repo):
self.name = name
self.version = version
self.repo = repo
[docs]
class BaseAction:
_keys: List[str] = []
action_type: str
def __repr__(self):
return f"Action[type={self.action_type}]"
[docs]
def same_as(self, other):
if self._keys != other._keys:
return False
else:
for key in self._keys:
if getattr(self, key) != getattr(other, key):
return False
return True
[docs]
def parse_action_repo(self, elem):
repo_elem = elem.find("repository")
repo = Repo.from_xml(repo_elem)
self.repo = repo
[docs]
def parse_package_elems(self, elem):
package_els = elem.findall("package")
packages = []
assert package_els is not None
for package_el in package_els:
packages.append(package_el.text)
self.packages = packages
[docs]
@classmethod
def from_elem(cls, elem, package):
type = elem.attrib["type"]
action_class = actions_by_type[type]
return action_class(elem)
[docs]
def to_bash(self):
"""Return lists of bash shell commands to execute this action.
This method is be implemented by each sub-class, and will
return two list of strings (for ``dep_install.sh`` and
``env.sh`` respectively).
"""
raise NotImplementedError(f"No to_bash defined for {self!r}")
def _tar_folders(filename):
with tarfile.open(filename, "r", errorlevel=0) as archive:
folders = set()
for i in archive.getmembers():
if i.isdir():
folders.add(i.name.rstrip("/"))
else:
folders.add(os.path.split(i.name)[0])
return list(folders)
def _zip_folders(filename):
archive = zipfile.ZipFile(filename, "r")
return list({i.filename.rstrip("/") for i in archive.infolist() if i.filename.endswith("/")})
def _common_prefix(folders):
common_prefix = ""
if len(folders) == 1:
common_prefix = list(folders)[0]
else:
common_prefix = os.path.commonprefix(folders)
assert not os.path.isabs(common_prefix), folders
return common_prefix
def _cache_download(url, filename, sha256sum=None):
"""Returns local path to cached copy of URL using given filename."""
cache = os.environ.get("DOWNLOAD_CACHE", "./download_cache/")
# TODO - expose this as a command line option
if not os.path.isdir(cache):
os.mkdir(cache)
local = os.path.join(cache, filename)
if not os.path.isfile(local):
# Must download it...
try:
# TODO - log this nicely...
sys.stderr.write(f"Downloading {url} to {local!r}\n")
urlretrieve(url, local)
except URLError:
# Most likely server is down, could be bad URL in XML action:
raise RuntimeError(f"Unable to download {url}")
except FTPErrors:
# Most likely server is down, could be bad URL in XML action:
raise RuntimeError(f"Unable to download {url}")
# Verifying the checksum is slow, only do this on a fresh
# download. Assume locally cached files are already OK.
if sha256sum:
# TODO - log this nicely...
sys.stderr.write(f"Verifying checksum for {filename}\n")
filehash = subprocess.check_output(["shasum", "-a", "256", local])[0:64].strip()
filehash = unicodify(filehash)
if filehash != sha256sum:
raise RuntimeError(f"Checksum failure for {local}, got {filehash!r} but wanted {sha256sum!r}")
return local
def _determine_compressed_file_folder(url, downloaded_filename, target_filename=None, sha256sum=None):
"""Determine how to decompress the file & its directory structure.
Returns a list of shell commands. Consider this example where the
folder to change to cannot be guessed from the tar-ball filename:
$ curl -o "ncbi-blast-2.2.30+-ia32-linux.tar.gz" \
"ftp://ftp.ncbi.nlm.nih.gov/blast/executables/blast+/2.2.30/ncbi-blast-2.2.30+-ia32-linux.tar.gz"
$ tar -zxvf ncbi-blast-2.2.30+-ia32-linux.tar.gz
$ cd ncbi-blast-2.2.30+
Here it would return:
['tar -zxvf ncbi-blast-2.2.30+-ia32-linux.tar.gz', 'cd ncbi-blast-2.2.30+']
If not cached, this function will download the file to the
$DOWNLOAD_CACHE folder, and then open it / decompress it in
order to find common folder prefix used. This will also verify
how to decompress the file, and the checksum if given.
"""
answer = []
local = _cache_download(url, downloaded_filename, sha256sum)
if not target_filename:
target_filename = downloaded_filename
if tarfile.is_tarfile(local):
folders = _tar_folders(local)
if target_filename.endswith((".tar.gz", ".tgz")):
answer.append(f"tar -zxvf {target_filename}")
elif target_filename.endswith(".tar.bz2"):
answer.append(f"tar -jxvf {target_filename}")
elif target_filename.endswith(".tar"):
answer.extend(f"tar -xvf {target_filename}")
else:
# Quite possibly this file doesn't need decompressing,
# but until we've tested lots of real world tool_dependencies.xml
# files I'd like to check these cases to confirm this.
raise NotImplementedError(f"How to decompress tar file {target_filename}?")
elif zipfile.is_zipfile(local):
if target_filename.endswith(".jar"):
# Do not decompress!
return answer
folders = _zip_folders(local)
answer.append(f"unzip {target_filename}")
elif target_filename.endswith(".dmg"):
# Do not decompress!
return answer
else:
# No compression? Leave as it is?
raise NotImplementedError(f"What kind of compression is {local} using?")
common_prefix = _common_prefix(folders)
if common_prefix:
answer.append(f'cd "{common_prefix}"')
return answer
def _commands_and_downloaded_file(url, target_filename=None, sha256sum=None):
# We preserve the filename from the URL in the cache.
# i.e. We do NOT use the target_filename in the cache.
# This because some Galaxy recipes normalise platform specific downloads
# to use a single target filename, which would therefore break checksums etc
# e.g. tests/data/repos/package_1/tool_dependencies.xml
downloaded_filename = os.path.split(url)[-1]
if "?" in downloaded_filename:
downloaded_filename = downloaded_filename[: downloaded_filename.index("?")]
if "#" in downloaded_filename:
downloaded_filename = downloaded_filename[: downloaded_filename.index("#")]
if not target_filename:
target_filename = downloaded_filename
# Curl is present on Mac OS X, can we assume it will be on Linux?
# Cannot assume that wget will be on Mac OS X.
answer = [
f'if [[ -f "{target_filename}" ]]',
"then",
f' echo "Reusing existing {target_filename}"',
f'elif [[ -f "$DOWNLOAD_CACHE/{downloaded_filename}" ]]',
"then",
f' echo "Reusing cached {downloaded_filename}"',
f' cp "$DOWNLOAD_CACHE/{downloaded_filename}" "{target_filename}"',
"else",
f' echo "Downloading {downloaded_filename}"',
f' curl -L -o "$DOWNLOAD_CACHE/{downloaded_filename}" "{url}"',
f' cp "$DOWNLOAD_CACHE/{downloaded_filename}" "{target_filename}"',
]
if sha256sum:
# This is inserted into the if-else for a fresh download only.
# Note double space between checksum and filename:
answer.append(f' echo "{sha256sum} {target_filename}" | shasum -a 256 -c -')
answer.append("fi")
return answer, downloaded_filename
def _commands_to_download_and_extract(url, target_filename=None, sha256sum=None):
answer, downloaded_filename = _commands_and_downloaded_file(url, target_filename, sha256sum)
# Now should we unpack the tar-ball etc?
answer.extend(_determine_compressed_file_folder(url, downloaded_filename, target_filename, sha256sum))
return answer, []
[docs]
class DownloadByUrlAction(BaseAction):
action_type = "download_by_url"
_keys = ["url"]
def __init__(self, elem):
self.url = elem.text.strip()
assert self.url
self.sha256sum = elem.attrib.get("sha256sum", None)
self.target_filename = elem.attrib.get("target_filename", None)
[docs]
def to_bash(self):
# See class DownloadByUrl in Galaxy,
# lib/tool_shed/galaxy_install/tool_dependencies/recipe/step_handler.py
return _commands_to_download_and_extract(self.url, self.target_filename, self.sha256sum)
[docs]
class DownloadFileAction(BaseAction):
action_type = "download_file"
_keys = ["url", "extract"]
def __init__(self, elem):
self.url = elem.text.strip()
self.extract = asbool(elem.attrib.get("extract", False))
self.sha256sum = elem.attrib.get("sha256sum", None)
self.target_filename = elem.attrib.get("target_filename", None)
[docs]
def to_bash(self):
if self.extract:
return _commands_to_download_and_extract(self.url, self.target_filename, self.sha256sum)
else:
commands, downloaded_file = _commands_and_downloaded_file(self.url, self.target_filename, self.sha256sum)
return commands, []
[docs]
class DownloadBinary(BaseAction):
action_type = "download_binary"
_keys = ["url_template", "target_directory"]
def __init__(self, elem):
self.url_template = elem.text
assert self.url_template
self.target_directory = elem.get("target_directory", None)
[docs]
class ShellCommandAction(BaseAction):
action_type = "shell_command"
_keys = ["command"]
def __init__(self, elem):
self.command = elem.text
[docs]
def to_bash(self):
# Galaxy would run each action from the same temp
# working directory - possible that tool_dependencies.xml
# shell_command could change $PWD so reset this:
return ["pushd . > /dev/null", self.command, "popd > /dev/null"], []
[docs]
class TemplateShellCommandAction(BaseAction):
action_type = "template_command"
_keys = ["language", "command"]
def __init__(self, elem):
self.command = elem.text
self.language = elem.get("language", "cheetah").lower()
assert self.command
assert self.language == "cheetah"
[docs]
class MoveFileAction(BaseAction):
action_type = "move_file"
_keys = ["move_file"]
def __init__(self, elem):
self.source = elem.find("source").text
self.destination = elem.find("destination").text
[docs]
def to_bash(self):
return [f"mv {self.source} {self.destination}"], []
[docs]
class MoveDirectoryFilesAction(BaseAction):
action_type = "move_directory_files"
_keys = ["source_directory", "destination_directory"]
def __init__(self, elem):
source_directory = elem.find("source_directory").text
destination_directory = elem.find("destination_directory").text
self.source_directory = source_directory
self.destination_directory = destination_directory
[docs]
def to_bash(self):
return [f"mv {self.source_directory}/* {self.destination_directory}/"], []
[docs]
class SetEnvironmentAction(BaseAction):
action_type = "set_environment"
_keys = ["variables"]
def __init__(self, elem):
variables = []
var_els = elem.findall("environment_variable")
assert var_els is not None
for ev_elem in var_els:
var = SetVariable(ev_elem)
variables.append(var)
self.variables = variables
assert self.variables
[docs]
def to_bash(self):
answer = []
for var in self.variables:
# Expand $INSTALL_DIR here?
if var.action == "set_to":
answer.append(f"export {var.name}={var.raw_value}")
elif var.action == "prepend_to":
answer.append(f"export {var.name}={var.raw_value}:${var.name}")
elif var.action == "append_to":
answer.append(f"export {var.name}=${var.name}:{var.raw_value}")
else:
raise ValueError(f"Undefined environment variable action {var.action!r}")
return answer, answer # Actions needed in env.sh here!
[docs]
class ChmodAction(BaseAction):
action_type = "chmod"
_keys = ["mods"]
def __init__(self, elem):
mods = []
file_els = elem.findall("file")
assert file_els is not None
for mod_elem in file_els:
mod = {}
mod["mode"] = mod_elem.attrib["mode"]
mod["target"] = mod_elem.text
mods.append(mod)
self.mods = mods
assert self.mods
[docs]
def to_bash(self):
return [f"chmod {m['mode']} {m['target']}" for m in self.mods], []
[docs]
class MakeInstallAction(BaseAction):
action_type = "make_install"
_keys = [] # type: List[str]
def __init__(self, elem):
pass
[docs]
def to_bash(self):
return ["make install"], []
[docs]
class AutoconfAction(BaseAction):
action_type = "autoconf"
_keys = ["options"]
def __init__(self, elem):
self.options = elem.text
[docs]
def to_bash(self):
if self.options:
raise NotImplementedError("Options with action autoconf not implemented yet.")
return ["./configure", "make", "make install"], []
[docs]
class ChangeDirectoryAction(BaseAction):
action_type = "change_directory"
_keys = ["directory"]
def __init__(self, elem):
self.directory = elem.text
assert self.directory
[docs]
def to_bash(self):
return [f"cd {self.directory}"], []
[docs]
class MakeDirectoryAction(BaseAction):
action_type = "make_directory"
_keys = ["directory"]
def __init__(self, elem):
self.directory = elem.text
[docs]
def to_bash(self):
return [f"mkdir -p {self.directory}"], []
[docs]
class SetupPerlEnvironmentAction(BaseAction):
action_type = "setup_perl_environment"
_keys = ["repo", "packages"]
def __init__(self, elem):
self.parse_action_repo(elem)
self.parse_package_elems(elem)
[docs]
class SetupRubyEnvironmentAction(BaseAction):
action_type = "setup_ruby_environment"
_keys = ["repo", "packages"]
def __init__(self, elem):
self.parse_action_repo(elem)
self.parse_package_elems(elem)
[docs]
class SetupPythonEnvironmentAction(BaseAction):
action_type = "setup_python_environment"
_keys = ["repo", "packages"]
def __init__(self, elem):
self.parse_action_repo(elem)
self.parse_package_elems(elem)
[docs]
class SetupVirtualenvAction(BaseAction):
action_type = "setup_virtualenv"
_keys = ["use_requirements_file", "python", "requirements"]
def __init__(self, elem):
use_reqs = elem.attrib.get("use_requirements_file", "True")
self.use_requirements_file = asbool(use_reqs)
self.python = elem.get("python", "python")
self.requirements = elem.text or "requirements.txt"
[docs]
class SetupREnvironmentAction(BaseAction):
action_type = "setup_r_environment"
_keys = ["repo", "packages"]
def __init__(self, elem):
self.parse_action_repo(elem)
self.parse_package_elems(elem)
[docs]
class SetEnvironmentForInstallAction(BaseAction):
action_type = "set_environment_for_install"
def __init__(self, elem):
pass
[docs]
def to_bash(self):
# TODO - How could we resolve/check the dependencies?
return ['echo "WARNING: Assuming packages already installed!"'], []
[docs]
class SetVariable:
def __init__(self, elem):
self.action = elem.attrib["action"]
self.name = elem.attrib["name"]
self.raw_value = elem.text
truthy = frozenset(["true", "yes", "on", "y", "t", "1"])
falsy = frozenset(["false", "no", "off", "n", "f", "0"])
[docs]
def asbool(obj):
if isinstance(obj, str):
obj = obj.strip().lower()
if obj in truthy:
return True
elif obj in falsy:
return False
else:
raise ValueError(f"String is not true/false: {obj!r}")
return bool(obj)
action_classes = [
DownloadByUrlAction,
DownloadFileAction,
DownloadBinary,
ShellCommandAction,
TemplateShellCommandAction,
MoveFileAction,
MoveDirectoryFilesAction,
SetEnvironmentAction,
ChmodAction,
MakeInstallAction,
AutoconfAction,
ChangeDirectoryAction,
MakeDirectoryAction,
SetupPerlEnvironmentAction,
SetupRubyEnvironmentAction,
SetupPythonEnvironmentAction,
SetupVirtualenvAction,
SetupREnvironmentAction,
SetEnvironmentForInstallAction,
]
actions_by_type = dict(map(lambda c: (c.action_type, c), action_classes))