Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/release_notes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ Upcoming Open-TYNDP Release

* Fix labeling of climate years in CBA indicators output CSVs (https://github.com/open-energy-transition/open-tyndp/pull/620).

* Fix CBA workflow to correctly handle simple runs, collection (multi-climate year) runs (https://github.com/open-energy-transition/open-tyndp/pull/625).

**Documentation**

**Developers Note**
Expand Down
203 changes: 150 additions & 53 deletions rules/cba.smk
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ def _effective_horizon(h, warn_fn=None, msg=None):
return h


def get_run_name(w):
"""
Return a single run name from Snakemake wildcards or config.

There are instances when the run name given in the config is a string
or a list. This helper is used to ensure we always get a single string
(or empty string if no run is given) to work with in the rules.
"""
run = w.get("run", config_provider("run", "name")(w))
if isinstance(run, list):
return run[0] if run else ""
return run or ""


def presolved_sb_network_path(w, horizon=None):
sb_version = config_provider(
"cba", "cba_scenario_input", "sb_version", default="latest"
Expand Down Expand Up @@ -443,7 +457,7 @@ def input_indicators(w):

Works for collection scenarios and regular scenarios.
"""
run = w.get("run", config_provider("run", "name")(w))
run = get_run_name(w)
projects = pd.read_csv(checkpoints.clean_projects.get(run=run).output.methods)
horizon = _effective_horizon(
int(w.planning_horizons),
Expand All @@ -457,16 +471,17 @@ def input_indicators(w):
projects = projects.loc[projects["planning_horizon"] == horizon]
cba_projects = [f"t{pid}" for pid in projects["project_id"].unique()]

# Collection scenarios look for results within regular scenarios
runs = config_provider("cba", "scenarios", default="{run}")
# Collection scenarios look for results within nested source runs,
# regular scenarios look within their own run.
runs = cba_source_runs(w)
# NOTE: project_specs filtering happens on the collection scenario (and does not descend)
project_specs = config_provider("cba", "projects")(w)

return expand(
rules.make_indicators.output.indicators,
cba_project=filter_projects_by_specs(cba_projects, project_specs),
planning_horizons=[w.planning_horizons],
run=runs,
allow_missing=True,
)


Expand Down Expand Up @@ -526,8 +541,9 @@ rule average_indicators_per_project_and_planning_horizon:
input:
indicators=lambda w: expand(
rules.make_indicators.output.indicators,
run=config_provider("cba", "scenarios")(w),
allow_missing=True,
cba_project=[w.cba_project],
planning_horizons=[w.planning_horizons],
run=cba_source_runs(w),
),
output:
indicators=RESULTS
Expand All @@ -540,9 +556,9 @@ rule summarize_indicators_per_project:
input:
indicators=lambda w: expand(
rules.average_indicators_per_project_and_planning_horizon.output.indicators,
transmission_projects=rules.clean_projects.output.transmission_projects,
planning_horizons=config["cba"]["planning_horizons"],
allow_missing=True,
cba_project=[w.cba_project],
run=[w.run],
),
output:
plot_file=RESULTS + "cba/ensemble_plots/ensemble_{cba_project}_all_horizons.png",
Expand All @@ -554,63 +570,112 @@ rule summarize_all_indicators:
input:
indicators=lambda w: expand(
rules.plot_weather_benchmark.input.indicators,
transmission_projects=rules.clean_projects.output.transmission_projects,
planning_horizons=config["cba"]["planning_horizons"],
cba_project=cba_projects(w),
run=config_provider("cba", "scenarios")(w),
run=cba_source_runs(w),
),
output:
plot_file=RESULTS + "cba/ensemble_plots/ensemble_all.png",
script:
"../scripts/cba/summarize_all.py"


# pseudo-rule, to run enable running cba with snakemake cba --configfile config/config.tyndp.yaml
def cba_collection_scenarios(w):
def cba_target_runs(w):
"""
Return cba collection scenarios, ie. the meta scenarios with cba: scenarios config
Return runs explicitly requested through config["run"]["name"].
"""
names = config["run"]["name"]
if isinstance(names, str):
names = [names]
return [names]
return names


def cba_collection_scenarios(w):
"""
Return actual cba collection scenarios, i.e. runs with cba.scenarios.
"""
scenarios = []
# fall back to the raw run.name if it isn’t found in the scenarios file
for name in names:
for name in cba_target_runs(w):
try:
scn = scenario_config(name)
except KeyError:
scenarios.append(name)
continue
if scn.get("cba", {}).get("scenarios") is not None:
if scn.get("cba", {}).get("scenarios"):
scenarios.append(name)
return scenarios


def cba_source_runs(w):
"""
Return the runs that provide CBA project indicator CSVs.

Collection scenarios read from their nested cba.scenarios;
regular runs read from their own run name.
"""
run = get_run_name(w)
if not run:
return []
try:
scn = scenario_config(run)
except KeyError:
return [run]
runs = scn.get("cba", {}).get("scenarios")
return runs if runs else [run]


def cba_scenarios(w):
"""
Return cba: scenarios of a cba collection scenario
"""
run = w.get("run", config_provider("run", "name")(w))
if isinstance(run, list):
run = run[0] if run else ""
run = get_run_name(w)
try:
scn = scenario_config(run)
except KeyError:
return [run] if run else []
return scn.get("cba", {}).get("scenarios", [run] if run else [])


def cba_projects_run(w):
"""
Return the run from which project methods should be read.
"""
runs = cba_source_runs(w)
return runs[0] if runs else ""


def validate_cba_project_methods_consistency(w):
"""
Validate that nested runs of a collection scenario share the same project ids
and CBA methods across planning horizons.
"""
runs = cba_source_runs(w)
if len(runs) <= 1:
return

reference_run = runs[0]
reference = pd.read_csv(
checkpoints.clean_projects.get(run=reference_run).output.methods
)
ref_cols = ["project_id", "planning_horizon", "method"]
reference = reference[ref_cols].sort_values(ref_cols).reset_index(drop=True)

for run in runs[1:]:
current = pd.read_csv(checkpoints.clean_projects.get(run=run).output.methods)
current = current[ref_cols].sort_values(ref_cols).reset_index(drop=True)
if not reference.equals(current):
raise ValueError(
"CBA project methods differ across nested collection runs. "
f"Reference run '{reference_run}' does not match '{run}'."
)


def cba_projects(w):
"""
List all indicators csv
"""
# run = config_provider("run", "name")(w),
run = w.get("run", config_provider("run", "name")(w))
if isinstance(run, list):
run = run[0] if run else ""
if "cy" not in run and run:
run = f"{run}-cy2009"
validate_cba_project_methods_consistency(w)

run = cba_projects_run(w)
projects = pd.read_csv(checkpoints.clean_projects.get(run=run).output.methods)
cba_projects = [f"t{pid}" for pid in projects["project_id"].unique()]
project_specs = config_provider("cba", "projects")(w)
Expand All @@ -619,50 +684,82 @@ def cba_projects(w):
return expand(cba_project)


# collect files to be stored in the scenario directory, e.g., NT-cy1995
rule collect_cba_scenario:
input:
lambda w: expand(
rules.plot_weather_benchmark.output.plot_file,
planning_horizons=config_provider("cba", "planning_horizons")(w),
cba_project=cba_projects(w),
run=cba_scenarios(w),
),
lambda w: expand(
def collect_cba_scenario_inputs(w):
inputs = []
inputs.extend(
expand(
rules.plot_indicators.output.plot_dir,
planning_horizons=config_provider("cba", "planning_horizons")(w),
run=cba_scenarios(w),
),
lambda w: expand(
)
)
inputs.extend(
expand(
rules.plot_cba_benchmark.output.plot_file,
planning_horizons=config_provider("cba", "planning_horizons")(w),
cba_project=cba_projects(w),
run=cba_scenarios(w),
),
)
)

run = get_run_name(w)
if run in cba_collection_scenarios(w):
inputs.extend(
expand(
rules.plot_weather_benchmark.output.plot_file,
planning_horizons=config_provider("cba", "planning_horizons")(w),
cba_project=cba_projects(w),
run=cba_scenarios(w),
)
)

return inputs


# collect files to be stored in the scenario directory, e.g., NT-cy1995
rule collect_cba_scenario:
input:
collect_cba_scenario_inputs,
output:
touch(RESULTS + "cba/all_scenarios.txt"),


# collect files to be stored in the scenario collection directory, e.g., NT-cyears
rule cba:
input:
lambda w: expand(
def cba_ensemble_inputs(w):
runs = cba_collection_scenarios(w)
if not runs:
return []

inputs = []
inputs.extend(
expand(
rules.average_indicators_per_project_and_planning_horizon.output.indicators,
planning_horizons=config["cba"]["planning_horizons"],
cba_project=cba_projects(w),
run=cba_collection_scenarios(w),
),
lambda w: expand(
run=runs,
)
)
inputs.extend(
expand(
rules.summarize_indicators_per_project.output.plot_file,
cba_project=cba_projects(w),
run=cba_collection_scenarios(w),
),
lambda w: expand(
run=runs,
)
)
inputs.extend(
expand(
rules.summarize_all_indicators.output.plot_file,
planning_horizons=config["cba"]["planning_horizons"],
cba_project=cba_projects(w),
run=cba_collection_scenarios(w),
),
run=runs,
)
)
return inputs


# collect files to be stored in the scenario collection directory, e.g., NT-cyears
rule cba:
input:
cba_ensemble_inputs,
# lambda w: expand(
# rules.plot_all_cba_benchmark.output.plot_dir,
# planning_horizons=config["cba"]["planning_horizons"],
Expand All @@ -671,7 +768,7 @@ rule cba:
# collect files to be stored in the scenario directory, e.g., NT-cy1995
lambda w: expand(
rules.collect_cba_scenario.output[0],
run=cba_collection_scenarios(w),
run=cba_target_runs(w),
),


Expand Down
Loading