Source code for planemo.database.postgres

"""Module describes a :class:`DatabaseSource` for local postgres databases."""

import subprocess

from galaxy.util import unicodify

from planemo.io import communicate
from .interface import DatabaseSource


class ExecutesPostgresSqlMixin:
    def list_databases(self):
        """Use `psql --list` to generate a list of identifiers."""
        command_builder = self._psql_command_builder("--list")
        stdout = unicodify(self._communicate(command_builder))
        output_lines = stdout.splitlines()
        identifiers = []
        for line in output_lines:
            identifiers.append(line.split("|")[0].strip())
        return [i for i in identifiers if i]

    def create_database(self, identifier):
        """Use `psql -c "create database"` to create a database."""
        sql = "create database %s;" % identifier
        self._run_sql_command(sql)

    def delete_database(self, identifier):
        """Use `psql -c "drop database"` to delete a database."""
        sql = "drop database %s;" % identifier
        self._run_sql_command(sql)

    def _run_sql_command(self, sql):
        # communicate is just joining commands so we need to modify the
        # sql as an argument - it shouldn't do this.
        sql_arg = "%s" % sql
        command_builder = self._psql_command_builder("--command", sql_arg)
        self._communicate(command_builder)

    def _communicate(self, command_builder):
        stdout, _ = communicate(
            command_builder.command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        return stdout


[docs] class LocalPostgresDatabaseSource(ExecutesPostgresSqlMixin, DatabaseSource): """Local postgres database source managed through psql application.""" def __init__(self, **kwds): """Construct a postgres database source from planemo configuration.""" self.psql_path = kwds.get("postgres_psql_path", None) or "psql" self.database_user = kwds.get("postgres_database_user", None) self.database_host = kwds.get("postgres_database_host", None) self.database_port = kwds.get("postgres_database_port", None) self._kwds = kwds
[docs] def sqlalchemy_url(self, identifier): """Return URL or form postgresql://username:password@localhost/mydatabase.""" hostname = self.database_host or "localhost" if self.database_port: hostname += ":%s" % self.database_port return f"postgresql://{self.database_user}@{hostname}/{identifier}"
def _psql_command_builder(self, *args): command_builder = _CommandBuilder(self.psql_path) # Print only tuples so output is easier to parse command_builder.append_command("--tuples-only") # Specify connection information if self.database_user: command_builder.append_command("--username", self.database_user) if self.database_host: command_builder.append_command("--host", self.database_host) if self.database_port: command_builder.append_command("--port", self.database_port) command_builder.append_command("-P", "pager=off") command_builder.extend_command(args) return command_builder
class _CommandBuilder: def __init__(self, *args): self.command = list(args) def append_command(self, *args_or_none): args_or_none = args_or_none or [] for arg_or_none in args_or_none: if arg_or_none is not None: self.command.append(arg_or_none) def extend_command(self, args): for arg in args or []: self.append_command(arg) __all__ = ("LocalPostgresDatabaseSource",)