#!/usr/bin/env python
#
# This software is distributed with the MIT license:
# https://github.com/gkiar/clowdr/blob/master/LICENSE
#
# clowdr/driver.py
# Created by Greg Kiar on 2018-02-28.
# Email: gkiar@mcin.ca
from argparse import ArgumentParser, RawTextHelpFormatter
import argparse
import os.path as op
import tempfile
import json
import sys
import os
from clowdr.controller import metadata, launcher, rerunner
from clowdr.task import TaskHandler
# from clowdr.server import shareapp, updateIndex
from clowdr.share import consolidate, portal
from clowdr import utils
[docs]def local(descriptor, invocation, provdir, backoff_time=36000, sweep=[],
verbose=False, workdir=None, simg=None, rerun=None, run_id=None,
task_ids=[], volumes=[], s3=None, cluster=None, jobname=None,
clusterargs=None, dev=False, groupby=1, user=False, setup=False,
bids=False, **kwargs):
"""cluster
Launches a pipeline locally through the Clowdr wrappers.
Parameters
----------
descriptor : str
Path to a boutiques descriptor for the tool to be run.
invocation : str
Path to a boutiques invocation for the tool and parameters to be run.
provdir : str
Path for storing Clowdr intermediate files and output logs.
backoff_time : int (default = 36000)
Maximum delay time before attempting resubmission of jobs that failed to
be submitted to a scheduler, in seconds.
sweep : list (default = [])
List of parameters to sweep over in the provided invocations.
verbose : bool (default = False)
Flag toggling verbose output printing
workdir : str (default = None)
Working directory to be used in execution, if different from provdir.
simg : str (default = None)
Path to local copy of Singularity image to be used during execution.
rerun : str (default = None)
One of "all", "select", "failed", and "incomplete," which enables
re-launching tasks from a previous execution either individually or in
commonly-desired groups.
run_id : str (default = None)
Required when using rerun, above, this specifies the experiment ID to be
re-run. This is the directory created for metadata, of the form:
year-month-day_hour-minute-second-8digitID.
task_ids : list (default = [])
If re-running with the "select" mode, a list of task IDs within the
directory specified by run_id which are to be re-run.
volumes : list (default = [])
List of volume mount-path strings, specified using the standard:
/path/on/host/:/path/in/container/
s3 : str (default = None)
Path for accessing input data on an S3 bucket. Must include s3://.
cluster : str (default = None)
Scheduler on the cluster being used. Currently only slurm is supported.
jobname : str (default = None)
Base-name for the jobs as they will appear in the scheduler.
clusterargs : str (default = None)
Comma-separated list of arguments to be provided to the cluster on job
submission. Such as: time:4:00,mem:2048,account:ABC
dev : bool (default = False)
Flag to toggle dev mode which only runs the first execution in the set.
groupby : int (default = 1)
Value which dictates the grouping of tasks. Particularly useful when
tasks are short or a cluster restricts the number of unique jobs.
user : bool (default = False)
When running with Docker, toggles whether or not the host-user's UID is
used within the container.
setup : bool (default = False)
Flag which prevents execution of tasks after the metadata task and
invocation files are generated.
bids : bool (default = False)
Flag toggling BIDS-aware metadata preparation.
**kwargs : dict
Arbitrary additional keyword arguments which may be passed.
Returns
-------
str
The path to the created directory containing Clowdr experiment metadata.
"""
descriptor = descriptor.name
tool = utils.truepath(descriptor)
if simg:
simg = utils.truepath(simg)
if verbose:
print("Consolidating metadata...")
dataloc = s3 if s3 else "localhost"
if rerun:
if not run_id:
raise SystemExit("**Error: Option --rerun requires --run_id")
if rerun == "select" and not task_ids:
raise SystemExit("**Error: Option --rerun 'select' requires "
"--task_ids")
tasks = rerunner.getTasks(provdir, run_id, rerun, descriptor,
task_ids=task_ids)
if not len(tasks):
if verbose:
print("No tasks to run.")
return 0
else:
[tasks, invocs] = metadata.consolidateTask(descriptor, invocation,
provdir, dataloc, bids=bids,
sweep=sweep, verbose=verbose)
taskdir = op.dirname(utils.truepath(tasks[0]))
try:
os.mkdir(taskdir)
except FileExistsError:
pass
os.chdir(taskdir)
if setup:
print(taskdir)
return taskdir
with open(tool) as fhandle:
container = json.load(fhandle).get("container-image")
if container:
if verbose:
print("Getting container...")
if simg is None:
outp = utils.getContainer(taskdir, container, verbose=verbose,
simg=simg)
else:
outp = simg
if cluster:
from slurmpy import Slurm
jobname = jobname if jobname else "clowdr"
cargs = {}
if clusterargs:
for opt in clusterargs.split(","):
k, v = opt.split(":")[0], opt.split(":")[1:]
v = ":".join(v)
cargs[k] = v
job = Slurm(jobname, cargs)
script = "clowdr task {} -p {} --local"
if workdir:
script += " -w {}".format(workdir)
if volumes:
script += " ".join([" -v {}".format(vol)
for vol in volumes])
if container:
script += " --imagepath {}".format(outp)
if verbose:
script += " -V"
# Groups tasks into collections to be run together (default size = 1)
gsize = groupby if groupby else 1
taskgroups = [tasks[i:i+gsize] for i in range(0, len(tasks), gsize)]
if dev:
taskgroups = [taskgroups[0]] # Just launch the first in dev mode
if verbose:
print("Launching tasks...")
for taskgroup in taskgroups:
if verbose:
print("... Processing task(s): {}".format(", ".join(taskgroup)))
if cluster:
tmptaskgroup = " ".join(taskgroup)
func = job.run
args = [script.format(tmptaskgroup, taskdir)]
# Submit. If submission fails, retry with fibonnaci back-off
utils.backoff(func, args, {},
backoff_time=backoff_time, **kwargs)
else:
runtask(taskgroup, provdir=taskdir, local=True, verbose=verbose,
workdir=workdir, volumes=volumes, user=user,
imagepath=outp, **kwargs)
if verbose:
print(taskdir)
return taskdir
[docs]def cloud(descriptor, invocation, provdir, s3, cloud, credentials, **kwargs):
"""cloud
Launches a pipeline locally at scale through Clowdr.
Parameters
----------
descriptor : str
Path to a boutiques descriptor for the tool to be run
invocation : str
Path to a boutiques invocation for the tool and parameters to be run
provdir : str
Path on S3 for storing Clowdr intermediate files and outputs
s3 : str
Path on S3 for accessing input data
cloud : str
Which endpoint to use for deployment
credentials : str
Credentials for Amazon with access to dataloc, clowdrloc, and Batch
**kwargs : dict
Arbitrary keyword arguments (i.e. {'verbose': True})
Returns
-------
int
The exit-code returned by the task being executed
"""
# TODO: scrub inputs better
descriptor = descriptor.name
provdir = provdir.strip('/')
# Create temp dir for clowdrloc
tmploc = utils.truepath(tempfile.mkdtemp())
[tasks, invocs] = metadata.consolidateTask(descriptor, invocation, tmploc,
s3, **kwargs)
metadata.prepareForRemote(tasks, tmploc, provdir)
resource = launcher.configureResource(cloud, credentials, **kwargs)
tasks_remote = [task for task in utils.post(tmploc, provdir)
if "task-" in task]
if kwargs.get("dev"):
tasks_remote = [tasks_remote[0]] # Just launch the first in dev mode
jids = []
for task in tasks_remote:
jids += [resource.launchJob(task)]
taskdir = op.dirname(utils.truepath(tasks_remote[0]))
print(taskdir)
return taskdir, jids
[docs]def runtask(tasklist, **kwargs):
print(kwargs)
for task in tasklist:
handler = TaskHandler(task, **kwargs)
[docs]def share(provdir, prepare=False, host="0.0.0.0", port=8050, verbose=False,
debug=False, **kwargs):
"""share
Launches a simple web server which showcases all runs at the clowdrloc.
Parameters
----------
provdir : str
Path with Clowdr metdata files (returned from "local" and "deploy")
**kwargs : dict
Arbitrary keyword arguments (i.e. {'verbose': True})
Returns
-------
None
"""
if provdir.startswith("s3://"):
# Create temp dir for clowdrloc
tmploc = utils.truepath(tempfile.mkdtemp())
utils.get(provdir, tmploc, **kwargs)
tmpdir = op.join(tmploc, utils.splitS3Path(provdir)[1])
provdir = tmpdir
if verbose:
print("Local cache of directory: {}".format(provdir))
if op.isfile(provdir):
if verbose:
print("Summary file provided - no need to generate.")
summary = provdir
with open(summary) as fhandle:
experiment_dict = json.load(fhandle)
else:
summary = op.join(provdir, 'clowdr-summary.json')
experiment_dict = consolidate.summary(provdir, summary)
if prepare:
if verbose:
print("Summary file location: {}".format(summary))
return summary
customDash = portal.CreatePortal(experiment_dict, N=100)
app = customDash.launch()
app.run_server(host=host, debug=debug, port=port)
[docs]def makeparser():
"""makeparser
Command-line API wrapper for Clowdr as a CLI, not Python API.
For information about the command-line wrapper and arguments it accepts,
please try running "clowdr --help".
Parameters
----------
args: list
List of all command-line arguments being passed.
Returns
-------
int
The exit-code returned by the driver.
"""
# Create an outer argparser which can wrap subparsers for each function.
desc = """
Scalable deployment and provenance-rich wrapper for Boutiques tools locally,
on clusters, and in the cloud. For more information, go to our website:
https://github.com/clowdr/clowdr.
"""
parser = ArgumentParser("clowdr", description=desc,
formatter_class=RawTextHelpFormatter)
htext = """Clowdr has several distinct modes of operation:
- local: This mode allows you to develop your Clowdr execution, deploy
analyses on your local system, and deploy them on clusters.
- cloud: This mode allows you to deploy your Clowdr exectuion on a cloud
resource. Currently, this only supports Amazon Web Services.
- share: This mode launches a lightweight webserver for you to explore your
executions, monitor job progress, and share your results.
- task: This mode is generally only for super-users. It is used by Clowdr
to launch your tasks and record provenance information from them
without you needing to call this option yourself. It can be useful
when debugging or re-running failed executions.
"""
subparsers = parser.add_subparsers(dest="mode",
help=htext)
# Create the subparser for local/cluster execution.
desc = ("Manages local and cluster deployment. Ideal for development, "
"testing, executing on local resources, or deployment on a "
"computing cluster environment.")
parser_loc = subparsers.add_parser("local", description=desc)
parser_loc.add_argument("descriptor", type=argparse.FileType('r'),
help="Local path to Boutiques descriptor for the "
"tool you wish to run. To learn about "
"descriptors and Boutiques, go to: "
"https://boutiques.github.io.")
parser_loc.add_argument("invocation",
help="Local path to Boutiques invocation (or "
"directory containing multiple invocations) "
"for the analysis you wish to run. To learn "
"about invocations and Boutiques, go to: "
"https://boutiques.github.io.")
parser_loc.add_argument("provdir",
help="Local directory for Clowdr provenance records"
" and other captured metadata to be stored. "
"This directory needs to exist prior to "
"running Clowdr.")
parser_loc.add_argument("--verbose", "-V", action="store_true",
help="Toggles verbose output statements.")
parser_loc.add_argument("--dev", "-d", action="store_true",
help="Launches only the first created task. This "
"is intended for development purposes.")
parser_loc.add_argument("--workdir", "-w", action="store",
help="Specifies the working directory to be used "
"by the tasks created.")
parser_loc.add_argument("--volumes", "-v", action="append",
help="Specifies any volumes to be mounted to the "
"container. This is usually related to the "
"path of any data files as specified in your "
"invocation(s).")
parser_loc.add_argument("--groupby", "-g", type=int,
help="If you wish to run tasks in batches, specify "
"the number of tasks to group here. For "
"imperfect multiples, the last group will be "
"the remainder.")
parser_loc.add_argument("--sweep", type=str, action="append",
help="If you wish to perform a parameter sweep with"
" Clowdr, you can use this flag and provide "
"Boutiques parameter ID as the argument here. "
"This requires: 1) the parameter exists in "
"the provided invocation, and 2) that field "
"contains a list of the parameter values to "
"be used (if it is ordinarily a list, this "
"means it must be a list of lists here). This"
" option does not work with directories of "
"invocations, but only single files.")
parser_loc.add_argument("--setup", action="store_true",
help="If you wish to generate metadata but not "
"launch tasks then you can use this mode.")
parser_loc.add_argument("--cluster", "-c", choices=["slurm"],
help="If you wish to submit your local tasks to a "
"scheduler, you must specify it here. "
"Currently this only supports SLURM clusters.")
parser_loc.add_argument("--clusterargs", "-a", action="store",
help="This allows users to supply arguments to the "
"cluster, such as specifying RAM or requesting"
" a certain amount of time on CPU. These are "
"provided in the form of key:value pairs, and "
"separated by commas. For example: "
"--clusterargs time:4:00,mem:2048,account:ABC")
parser_loc.add_argument("--jobname", "-n", action="store",
help="If running on a cluster, and you wish to "
"specify a unique identifier to appear in the"
"submitted tasks, you can specify it with "
"this flag.")
parser_loc.add_argument("--simg", "-s", action="store",
help="If the Boutiques descriptor summarizes a "
"tool wrapped in Singularity, and the image "
"has already been downloaded, this option "
"allows you to specify that image file.")
parser_loc.add_argument("--user", "-u", action="store_true",
help="If the Boutiques descriptor summarizes a "
"tool wrapped in Docker, toggles propagating "
"the current user within the container.")
parser_loc.add_argument("--rerun", "-R",
choices=["all", "select", "failed", "incomplete"],
help="Allows user to re-run jobs in a previous "
"execution that either failed or didn't "
"finish, etc. This requires the --run_id "
"argument to also be supplied. Four choices "
"are: 'all' to re-run all tasks, 'select' to "
"re-run specific tasks, 'failed' to re-run "
"tasks which finished with a non-zero "
"exit-code, 'incomplete' to re-run tasks "
"which have not yet indicated job completion. "
"While the descriptor and invocations will be "
"adopted from the previous executions, other "
"options such as clusterargs or volume can "
"be set to different values, if they were the "
"source of errors. Pairing the incomplete mode"
" with the --dev flag allows you to walk "
"through your dataset one group at a time.")
parser_loc.add_argument("--run_id", action="store",
help="Pairs with --rerun. This ID is the directory"
" within the supplied provdir which contains "
"execution you wish to relaunch. These IDs/"
"directories are in the form: year-month-day_"
"hour-minute-second-8digitID.")
parser_loc.add_argument("--task_ids", action="store", nargs="+",
help="Pairs with --rerun. This list of task IDs are"
" the task numbers within the directory "
"supplied with --run_id and provdir. These "
"IDs are integers greater than or equal to 0.")
parser_loc.add_argument("--s3", action="store",
help="Amazon S3 bucket and path for remote data. "
"Accepted in the format: s3://{bucket}/{path}")
parser_loc.add_argument("--bids", "-b", action="store_true",
help="Indicates that the tool being launched is a "
"BIDS app. BIDS is a data organization format"
" in neuroimaging. For more information about"
" this, go to https://bids.neuroimaging.io.")
parser_loc.set_defaults(func=local)
# Create the subparser for cloud execution.
desc = ("Manages cloud deployment. Ideal for running jobs at scale on data "
"stored in Amazon Web Services S3 buckets (or similar object "
"store).")
parser_cld = subparsers.add_parser("cloud", description=desc)
parser_cld.add_argument("descriptor", type=argparse.FileType('r'),
help="Local path to Boutiques descriptor for the "
"tool you wish to run. To learn about "
"descriptors and Boutiques, go to: "
"https://boutiques.github.io.")
parser_cld.add_argument("invocation",
help="Local path to Boutiques invocation (or "
"directory containing multiple invocations) "
"for the analysis you wish to run. To learn "
"about invocations and Boutiques, go to: "
"https://boutiques.github.io.")
parser_cld.add_argument("provdir",
help="Local directory for Clowdr provenance records"
" and other captured metadata to be stored. "
"This directory needs to exist prior to "
"running Clowdr.")
parser_cld.add_argument("s3",
help="Amazon S3 bucket and path for remote data. "
"Accepted in the format: s3://{bucket}/{path}")
parser_cld.add_argument("cloud", choices=["aws"],
help="Specifies which cloud endpoint you'd like to"
" use. Currently, only AWS is supported.")
parser_cld.add_argument("credentials",
help="Your credentials file for the resource.")
parser_cld.add_argument("--verbose", "-V", action="store_true",
help="Toggles verbose output statements.")
parser_cld.add_argument("--dev", "-d", action="store_true",
help="Launches only the first created task. This "
"is intended for development purposes.")
parser_cld.add_argument("--region", "-r", action="store",
help="The Amazon region to use for processing.")
parser_cld.add_argument("--sweep", type=str, action="append",
help="If you wish to perform a parameter sweep with"
" Clowdr, you can use this flag and provide "
"Boutiques parameter ID as the argument here. "
"This requires: 1) the parameter exists in "
"the provided invocation, and 2) that field "
"contains a list of the parameter values to "
"be used (if it is ordinarily a list, this "
"means it must be a list of lists here). This"
" option does not work with directories of "
"invocations, but only single files.")
parser_cld.add_argument("--bids", "-b", action="store_true",
help="Indicates that the tool being launched is a "
"BIDS app. BIDS is a data organization format"
" in neuroimaging. For more information about"
" this, go to https://bids.neuroimaging.io.")
parser_cld.set_defaults(func=cloud)
# Create the subparser for sharing outputs
desc = ("Launches light-weight web service for exploring, managing, and "
"sharing the outputs and provenance recorded from Clowdr "
"executed workflows.")
parser_shr = subparsers.add_parser("share")
parser_shr.add_argument("provdir",
help="Local or S3 directory where Clowdr provenance"
"records and metadata are stored. This path "
"was returned by running either clowdr cloud "
"or clowdr local. This can also be a clowdr-"
"generated summary file.")
parser_shr.add_argument("--prepare", "-p", action="store_true",
help="If provided, this prevents a server from "
"being launched after metadata is consolidated"
" into a single file, and the path to that "
"file is returned.")
parser_shr.add_argument("--host", action="store", default="0.0.0.0",
help="The host to broadcast the share service at. "
"Default is 0.0.0.0.")
parser_shr.add_argument("--port", action="store", type=int, default=8050,
help="The port to broadcast the share service at. "
"Default is 8050.")
parser_shr.add_argument("--debug", "-d", action="store_true",
help="Toggles server messages and logging. This "
"is intended for development purposes.")
parser_shr.add_argument("--verbose", "-V", action="store_true",
help="Toggles verbose output statements.")
parser_shr.set_defaults(func=share)
# Create the subparser for launching tasks
desc = ("Launches a list of tasks with provenance recording. This method "
"is what specifically wraps tool execution, is called by other "
"Clowdr modes, and can be used to re-execute or debug tasks.")
parser_task = subparsers.add_parser("task")
parser_task.add_argument("tasklist", nargs="+",
help="One or more Clowdr-created task.json files "
"summarizing the jobs to be run. These task "
"files are created by one of clowdr cloud or"
" clowdr local.")
parser_task.add_argument("--verbose", "-V", action="store_true",
help="Toggles verbose output statements.")
parser_task.add_argument("--provdir", "-p", action="store",
help="Local or directory where Clowdr provenance "
"records and metadata will be stored. This "
"is optional here because it will be stored "
"by default in a temporary location and "
"moved, unless this is specified.")
parser_task.add_argument("--local", "-l", action="store_true",
help="Flag indicator to identify whether the task"
" is being launched on a cloud or local "
"resource. This is important to ensure data "
"is transferred off clouds before shut down.")
parser_task.add_argument("--workdir", "-w", action="store",
help="Specifies the working directory to be used "
"by the tasks created.")
parser_task.add_argument("--volumes", "-v", action="append",
help="Specifies any volumes to be mounted to the "
"container. This is usually related to the "
"path of any data files as specified in your "
"invocation(s).")
parser_task.add_argument("--imagepath", action="store",
help="If the Boutiques descriptor summarizes a "
"tool wrapped in Singularity, and the image "
"has already been downloaded, this option "
"allows you to specify that image file.")
parser_task.set_defaults(func=runtask)
return parser
[docs]def main(args=None):
parser = makeparser()
# Parse arguments
inps = parser.parse_args(args) if args is not None else parser.parse_args()
# If no args are provided, print help
if len(sys.argv) < 2 and args is None:
parser.print_help()
sys.exit()
else:
inps.func(**vars(inps))
return 0
if __name__ == "__main__":
main()