Source code for clowdr.controller.metadata

#!/usr/bin/env python
#
# This software is distributed with the MIT license:
# https://github.com/gkiar/clowdr/blob/master/LICENSE
#
# clowdr/controller/metadata.py
# Created by Greg Kiar on 2018-02-28.
# Email: gkiar@mcin.ca

from copy import deepcopy
import os.path as op
import datetime
import time
import string
import json
import sys
import os

from clowdr import utils


[docs]def consolidateTask(tool, invocation, clowdrloc, dataloc, bids=False, sweep=[], verbose=False, **kwargs): """consolidateTask Creates Clowdr task JSON files and Boutiques invocations which summarize all associated metadata with the tasks being launched. Parameters ---------- tool : str Path to a boutiques descriptor for the tool to be run. invocation : str Path to a boutiques invocation for the tool and parameters to be run. clowdrloc : str Path for storing Clowdr intermediate files and output logs. dataloc : str Path for accessing input data on an S3 bucket (must include s3://) or localhost for non-cloud hosted data. bids : bool (default = False) Flag toggling BIDS-aware metadata preparation. sweep : list (default = []) List of parameters to sweep over in the provided invocations. verbose : bool (default = False) Flag toggling verbose output printing. **kwargs : dict Arbitrary additional keyword arguments which may be passed. Returns ------- tuple: (list, list) The task dictionary JSONs, and associated Boutiques invocation files. """ ts = time.time() dt = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H-%M-%S') randx = utils.randstring(8) modif = "{}-{}".format(dt, randx) # Scrub inputs tool = utils.truepath(tool) invocation = utils.truepath(invocation) clowdrloc = utils.truepath(clowdrloc) dataloc = utils.truepath(dataloc) # Initialize task dictionary taskdict = {} with open(tool) as fhandle: toolname = json.load(fhandle)["name"].replace(' ', '-') taskloc = op.join(clowdrloc, toolname, modif) os.makedirs(taskloc) taskdict["taskloc"] = op.join(clowdrloc, modif, toolname) taskdict["dataloc"] = [dataloc] taskdict["invocation"] = utils.get(invocation, taskloc)[0] taskdict["tool"] = utils.get(tool, taskloc)[0] # Case 1: User supplies directory of invocations if op.isdir(invocation): tempinvocations = os.listdir(invocation) taskdicts = [] invocations = [] for invoc in tempinvocations: tempdict = deepcopy(taskdict) tempinvo = utils.get(op.join(invocation, invoc), taskloc) tempdict["invocation"] = utils.truepath(tempinvo[0]) invocations += tempinvo taskdicts += [tempdict] # Case 2: User supplies a single invocation else: # Case 2a: User is running a BIDS app if bids: taskdicts, invocations = bidsTasks(taskloc, taskdict) # Case 2b: User is quite simply just launching a single invocation else: taskdicts = [taskdict] invocations = [taskdict["invocation"]] # Post-case: User is performing a parameter sweep over invocations if sweep: for sweep_param in sweep: taskdicts, invocations = sweepTasks(taskdicts, invocations, sweep_param) # Store task definition files to disk taskdictnames = [] for idx, taskdict in enumerate(taskdicts): taskfname = op.join(taskloc, "task-{}.json".format(idx)) taskdictnames += [taskfname] with open(taskfname, 'w') as fhandle: fhandle.write(json.dumps(taskdict, indent=4, sort_keys=True)) return (taskdictnames, invocations)
[docs]def sweepTasks(taskdicts, invocations, sweep_param): """sweepTasks Sweeps through provided fields for creating more tasks than specified. Parameters ---------- taskdicts : str Dictionary of the tasks invocations : str Corresponding invocations for each task dictionary sweep_param : str Parameter to be swept over in each invocation Returns ------- tuple: (list, list) The task dictionary JSONs, and associated Boutiques invocation files. """ tdicts = [] invos = [] for ttdict, tinvo in zip(taskdicts, invocations): invo = json.load(open(tinvo)) sweep_vals = invo.get(sweep_param) for sidx, sval in enumerate(sweep_vals): tempdict = deepcopy(ttdict) invo[sweep_param] = sval svalstr = sidx if len(str(sval)) > 5 else str(sval) invofname = op.join(tinvo.replace(".json", "_sweep-" "{0}-{1}.json".format(sweep_param, svalstr))) with open(invofname, 'w') as fhandle: fhandle.write(json.dumps(invo, indent=4, sort_keys=True)) tempdict["invocation"] = invofname invos += [invofname] tdicts += [tempdict] return (tdicts, invos)
[docs]def bidsTasks(clowdrloc, taskdict): """bidsTask Scans through BIDS app fields for creating more tasks than specified. Parameters ---------- clowdrloc : str Path for storing Clowdr intermediate files and outputs taskdict : str Dictionary of the tasks (pre-BIDS-ification) Returns ------- tuple: (list, list) The task dictionary JSONs, and associated Boutiques invocation files. """ dataloc = taskdict["dataloc"][0] invocation = taskdict["invocation"] invo = json.load(open(invocation)) participants = invo.get("participant_label") sessions = invo.get("session_label") # Case 1: User is running BIDS group-level analysis if invo.get("analysis_level") == "group": return ([taskdict], [invocation]) # Case 2: User is running BIDS participant- or session-level analysis # ... and specified neither participant(s) nor session(s) elif not participants and not sessions: return ([taskdict], [invocation]) # Case 3: User is running BIDS participant- or session-level analysis # ... and specified participant(s) but not session(s) elif participants and not sessions: taskdicts = [] invos = [] for part in participants: partstr = "sub-{}".format(part) tempdict = deepcopy(taskdict) tempdict["dataloc"] = [op.join(dataloc, partstr)] invo["participant_label"] = [part] invofname = op.join(clowdrloc, "invocation_sub-{}.json".format(part)) with open(invofname, 'w') as fhandle: fhandle.write(json.dumps(invo, indent=4, sort_keys=True)) tempdict["invocation"] = invofname invos += [invofname] taskdicts += [tempdict] return (taskdicts, invos) # Case 4: User is running BIDS participant- or session-level analysis # ... and specified participants(s) and session(s) elif participants and sessions: taskdicts = [] invos = [] for part in participants: partstr = "sub-{}".format(part) for sesh in sessions: seshstr = "ses-{}".format(sesh) tempdict = deepcopy(taskdict) tempdict["dataloc"] = [op.join(dataloc, partstr, seshstr)] invo["participant_label"] = [part] invo["session_label"] = [sesh] invofname = op.join(clowdrloc, "invocation_" "sub-{}_ses-{}.json".format(part, sesh)) with open(invofname, 'w') as fhandle: fhandle.write(json.dumps(invo, indent=4, sort_keys=True)) tempdict["invocation"] = invofname invos += [invofname] taskdicts += [tempdict] return (taskdicts, invos) # Case 5: User is running BIDS participant- or session-level analysis # ... and specified sessions(s) but not participant(s) elif sessions and not participants: taskdicts = [] invos = [] for sesh in sessions: seshstr = "ses-{}".format(sesh) tempdict = deepcopy(taskdict) tempdict["dataloc"] = [op.join(dataloc)] invo["session_label"] = [sesh] invofname = op.join(clowdrloc, "invocation_ses-{}.json".format(sesh)) with open(invofname, 'w') as fhandle: fhandle.write(json.dumps(invo, indent=4, sort_keys=True)) tempdict["invocation"] = invofname invos += [invofname] taskdicts += [tempdict] return (taskdicts, invos)
[docs]def prepareForRemote(tasks, tmploc, clowdrloc): """prepareForRemote Scans through BIDS app fields for creating more tasks than specified. Parameters ---------- tasks : list List of task dictionaries on disk for Clowdr tasks. tmploc : str Temporary location where the invocations and task files are stored. clowdrloc : str Path for storing Clowdr intermediate files and outputs Returns ------- tuple: (list, list) The task dictionary JSONs, and associated Boutiques invocation files, with paths corrected to eventual remote locations. """ # Modify tasks for task in tasks: with open(task) as fhandle: task_dict = json.load(fhandle) task_dict["invocation"] = op.join(clowdrloc, op.relpath(task_dict["invocation"], tmploc)) task_dict["taskloc"] = op.join(clowdrloc, op.relpath(task_dict["taskloc"], tmploc)) task_dict["tool"] = op.join(clowdrloc, op.relpath(task_dict["tool"], tmploc)) with open(task, 'w') as fhandle: fhandle.write(json.dumps(task_dict, indent=4, sort_keys=True)) return 0