Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/parallel_config.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@ of key-value pairs (e.g. `{"getenv": "true"}`).
`memory`, and `disk` are required by the other clusters. All other parameters are optional. Additional parameters
defined in the [dask-jobqueue API](https://jobqueue.dask.org/en/latest/api.html) can be supplied.

!!! note
The fields available in `job_extra_directives` vary by cluster type. For example, `{"getenv": "true"}` will work to start
the active conda environment on each worker in an HTCondor cluster but not on a PBS cluster.
`plantcv.parallel.run_parallel` and `JupyterConfig.run()` will check for an active conda environment
and attempt to start that environment on each worker if the `cluster_config` does not have a `job_script_prologue` already.

### Example

```python
Expand Down
38 changes: 38 additions & 0 deletions plantcv/parallel/run_parallel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import re
import sys
import time
import datetime
Expand Down Expand Up @@ -27,6 +28,7 @@ def run_parallel(config):
# Job start time
start_time = datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
print("Starting run " + start_time + '\n', file=sys.stderr)
config = _check_for_conda(config)
# Create temporary directory for job
if config.tmp_dir is not None:
os.makedirs(os.path.join(config.tmp_dir, "_PCV_PARALLEL_CHECKPOINT_"), exist_ok=True)
Expand Down Expand Up @@ -97,3 +99,39 @@ def run_parallel(config):
# Cleanup
if config.cleanup is True:
shutil.rmtree(config.tmp_dir)


def _check_for_conda(config):
"""Checks a running python process for a conda env, adding that env to cluster configuration

Parameters
----------
config = plantcv.parallel.WorkflowConfig object

Returns
-------
config = plantcv.parallel.WorkflowConfig object
"""
running_in_conda = re.search("conda|mamba|miniforge", sys.executable) is not None
# if workflow is executed from a conda environment then activate that conda environment on workers
if "job_script_prologue" not in config.cluster_config.keys() and running_in_conda:
# find where the conda installation is, replace python with activate
activation_path = re.sub("(.*conda|mamba|miniforge)(\\d)?.*$",
os.path.join("\\1\\2", "bin", "activate"), sys.executable)
commands = ["source " + activation_path]
# if there is an env in the executable path after the conda/mamba/miniforge
# then find that env and add a commmand to activate it
if re.search("env(s)?", re.sub(".*(conda|mamba|miniforge)", "", sys.executable)) is not None:
ex_list = re.sub(".*(conda|mamba|miniforge)", "", sys.executable).split(os.sep)
# get name of env that was active to run plantcv
env_index = [i for i, element in enumerate(ex_list) if re.search("^env(s)?$", element)][0]
env_name = ex_list[env_index+1]
commands.append("conda activate" + env_name)
# if changing config always print a message
print("Setting job_script_prologue to fetch active environment:\n",
commands,
file=sys.stderr)
# write job prologue to activate the env
config.cluster_config["job_script_prologue"] = commands

return config
23 changes: 23 additions & 0 deletions tests/parallel/test_run_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import sys
from plantcv.parallel import WorkflowConfig
from plantcv.parallel.run_parallel import _check_for_conda


def test_checking_conda_env(parallel_test_data, monkeypatch, tmpdir):
"""Test for plantCV."""
monkeypatch.setattr(sys, "executable", "/my/Xconda3/env/example/python")
# set up a configuration
config = WorkflowConfig()
# check for conda
config = _check_for_conda(config)
assert config.cluster_config["job_script_prologue"][0] == "source /my/Xconda3/bin/activate"


def test_checking_conda_base_env(monkeypatch):
"""Test for PlantCV."""
# Simulate a base conda environment executable (no envs/ segment)
monkeypatch.setattr(sys, "executable", "/my/Xconda4/bin/python")
config = WorkflowConfig()
# This should not raise and should produce an activation line for the base env
config = _check_for_conda(config)
assert config.cluster_config["job_script_prologue"] == ["source /my/Xconda4/bin/activate"]