Source code for planemo.cwl.toil

import json
import tempfile

    from toil.cwl import cwltoil
except ImportError:
    cwltoil = None

from planemo.deps import ensure_dependency_resolvers_conf_configured
from import (
from planemo.runnable import ErrorRunResponse
from .run import (

TOIL_REQUIRED_MESSAGE = "This functionality requires Toil, please install with 'pip install toil'"

[docs]def run_toil(ctx, path, job_path, **kwds): """Translate planemo kwds to cwltool kwds and run cwltool main function.""" _ensure_toil_available() args = [] if not ctx.verbose: args.append("--quiet") output_directory = kwds.get("output_directory", None) if output_directory: args.append("--outdir") args.append(output_directory) if kwds.get("no_container", False): args.append("--no-container") ensure_dependency_resolvers_conf_configured(ctx, kwds) args.append("--beta-dependency-resolvers-configuration") args.append(kwds["dependency_resolvers_config_file"]) if kwds.get("mulled_containers"): args.append("--beta-use-biocontainers") if kwds.get("non_strict_cwl", False): args.append("--non-strict") args.extend([path, job_path]) ctx.vlog("Calling cwltoil with arguments %s" % args) with tempfile.NamedTemporaryFile("w") as tmp_stdout: # cwltool passes sys.stderr to subprocess.Popen - ensure it has # and actual fileno. with real_io(): ret_code = cwltoil.main(args, stdout=tmp_stdout) tmp_stdout.flush() with open( as stdout_f: try: result = json.load(stdout_f) except ValueError: message = JSON_PARSE_ERROR_MESSAGE % ( open(,, ) error(message) raise Exception(message) if ret_code != 0: return ErrorRunResponse("Error running Toil") outputs = result return CwlToolRunResponse( "", outputs=outputs, )
def _ensure_toil_available(): if cwltoil is None: raise Exception(TOIL_REQUIRED_MESSAGE)