import os
import json
import tempfile
import pandas as pd
import pyomo.environ as pyo
from .grid_analysis import analyse_grid, current_fuel_type_distribution
from .grid_modifications import add_inv_series, add_gen_mix_limits
from .ACDC_Static_TEP import transmission_expansion, multi_scenario_TEP
from .ACDC_MultiPeriod_TEP import (
_fill_investment_decisions,
_validate_grid_for_MP_TEP,
_update_grid_investment_period,
_calculate_decomision_period,
_deactivate_non_pre_existing_loads
)
from .Graph_and_plot import save_network_svg
__all__ = ['export_results_to_csv', 'sequential_STEP', 'sequential_MS_STEP']
def export_results_to_csv(run_results, export_dir, file_name="sequential_step_results.csv"):
"""Persist sequential STEP results dict as a pandas CSV."""
out_path = os.path.join(export_dir, file_name)
df = pd.DataFrame.from_dict(run_results, orient="index")
df.index.name = "run_key"
df = df.reset_index()
df.to_csv(out_path, index=False)
def _iter_dynamic_elements_typed(grid):
for gen in grid.Generators:
yield str(gen.name), gen, "np_gen", "Generator"
for ren in grid.RenSources:
yield str(ren.name), ren, "np_rsgen", "Renewable Source"
for line in grid.lines_AC_exp:
yield str(line.name), line, "np_line", "AC Line"
for line in grid.lines_DC:
yield str(line.name), line, "np_line", "DC Line"
for conv in grid.Converters_ACDC:
yield str(conv.name), conv, "np_conv", "ACDC Conv"
def _max_attr_from_np_attr(np_attr):
if np_attr == "np_gen":
return "np_gen_max"
if np_attr == "np_rsgen":
return "np_rsgen_max"
if np_attr == "np_line":
return "np_line_max"
if np_attr == "np_conv":
return "np_conv_max"
raise ValueError(f"Unsupported dynamic np attribute: {np_attr}")
def _iter_dynamic_elements(grid):
for name, el, np_attr, _ in _iter_dynamic_elements_typed(grid):
yield name, el, np_attr
def _round_dynamic_np_to_nearest_integer(grid):
for _, el, np_attr in _iter_dynamic_elements(grid):
value = float(getattr(el, np_attr))
setattr(el, np_attr, float(int(round(value))))
def _snapshot_dynamic_counts(grid):
snap = {}
for name, el, np_attr in _iter_dynamic_elements(grid):
snap[name] = float(getattr(el, np_attr))
return snap
def _series_value_for_run(series, run_idx, label):
if isinstance(series, (list, tuple)):
if len(series) == 0:
raise ValueError(f"{label} has no values.")
if len(series) == 1:
return float(series[0])
if run_idx >= len(series):
raise ValueError(f"{label} has length {len(series)}, cannot access run {run_idx}.")
return float(series[run_idx])
return float(series)
def _series_has_positive(series):
if isinstance(series, (list, tuple)):
return any(float(v) > 0 for v in series)
return float(series) > 0
def _apply_decommission_for_run(grid, linked_decommission_schedule, run_idx):
linked_now = dict(linked_decommission_schedule.pop(int(run_idx), {}))
planned_now = {}
for name, el, _ in _iter_dynamic_elements(grid):
inv = el.investment_decisions
if "planned_decommission" in inv:
planned_now[name] = _series_value_for_run(
inv["planned_decommission"], run_idx, f"{name}:planned_decommission"
)
by_name = {name: (el, np_attr) for name, el, np_attr in _iter_dynamic_elements(grid)}
applied = {}
names_to_apply = set(planned_now.keys()) | set(linked_now.keys())
for name in names_to_apply:
el, np_attr = by_name.get(name, (None, None))
if el is None:
raise ValueError(f"Decommission references unknown element '{name}'")
total_decommission = float(linked_now.get(name, 0.0)) + float(planned_now.get(name, 0.0))
if total_decommission < 0:
raise ValueError(f"Negative total decommission for '{name}': {total_decommission}")
current_stock = float(getattr(el, np_attr))
if total_decommission > current_stock + 1e-9:
raise ValueError(
f"Decommission exceeds current stock for '{name}': "
f"requested={total_decommission} > current={current_stock}"
)
setattr(el, np_attr, current_stock - total_decommission)
#print(f"Base np for optimal solution {current_stock - total_decommission} of {name}")
applied[name] = total_decommission
return linked_now, planned_now, applied
def _apply_generation_type_limits_from_run(grid, run_idx):
if run_idx < 0:
raise ValueError(f"run_idx must be >= 0, got {run_idx}")
for gen_type, series in dict(grid.generation_type_limits).items():
if isinstance(series, list):
if run_idx >= len(series):
raise ValueError(
f"generation_type_limits['{gen_type}'] has length {len(series)}; "
f"cannot access run index {run_idx}"
)
value = float(series[run_idx])
else:
value = float(series)
grid.current_generation_type_limits[gen_type] = value
def _apply_sequential_run_np_caps(grid, run_idx, absolute_np_max_by_name):
for name, el, np_attr in _iter_dynamic_elements(grid):
max_attr = _max_attr_from_np_attr(np_attr)
if not hasattr(el, max_attr):
continue
current_stock = float(getattr(el, np_attr))
absolute_max = float(absolute_np_max_by_name.get(name, getattr(el, max_attr)))
max_inv_series = el.investment_decisions.get("max_inv") if isinstance(el.investment_decisions, dict) else None
if max_inv_series is None:
setattr(el, max_attr, absolute_max)
continue
install_max = _series_value_for_run(max_inv_series, run_idx, f"{name}:max_inv")
run_max = min(current_stock + install_max, absolute_max)
setattr(el, max_attr, run_max)
def _restore_absolute_np_caps(element_meta, absolute_np_max_by_name):
for name, meta in element_meta.items():
el = meta["element"]
np_attr = meta["np_attr"]
max_attr = _max_attr_from_np_attr(np_attr)
if hasattr(el, max_attr) and name in absolute_np_max_by_name:
setattr(el, max_attr, float(absolute_np_max_by_name[name]))
def _register_future_aged_decommission(grid, run_idx, n_years, decommission_applied_by_name, np_before_by_name, schedule):
run_idx = int(run_idx)
for name, el, np_attr in _iter_dynamic_elements(grid):
np_before = float(np_before_by_name[name])
np_after = float(getattr(el, np_attr))
decomm_now = float(decommission_applied_by_name.get(name, 0.0))
added_now = np_after - (np_before - decomm_now)
if added_now <= 1e-9:
continue
decomision_period = int(el.decomision_period)
future_idx = run_idx + decomision_period
bucket = schedule.setdefault(future_idx, {})
bucket[name] = bucket.get(name, 0.0) + float(added_now)
def _build_cluster_cache_payload(grid, n_clusters):
if not hasattr(grid, "Clusters") or n_clusters not in grid.Clusters:
raise ValueError(f"Grid has no clustering data for n_clusters={n_clusters}")
clusters = grid.Clusters[n_clusters]
reps = clusters.get("Representatives", None)
reps_data = reps.to_dict(orient="list") if hasattr(reps, "to_dict") else {}
cluster_idx = clusters.get("Cluster idx", {})
cluster_idx_json = {str(int(k)): [int(v) for v in vals] for k, vals in cluster_idx.items()}
time_series_clustered = {}
for ts in grid.Time_series:
values = getattr(ts, "data_clustered", {}).get(n_clusters, None)
if values is None:
raise ValueError(f"Missing clustered time-series data for '{ts.name}' and n_clusters={n_clusters}")
time_series_clustered[ts.name] = [float(v) for v in values]
return {
"n_clusters": int(n_clusters),
"time_series_clustered": time_series_clustered,
"cluster_idx": cluster_idx_json,
"weight": [float(v) for v in clusters.get("Weight", [])],
"cluster_count": [int(v) for v in clusters.get("Cluster Count", [])],
"labels": [int(v) for v in clusters.get("Labels", [])],
"representatives": {"data": reps_data},
}
def _prepare_ms_clustering_reuse(grid, clustering_options, cache_json_path=None):
if clustering_options is None:
return None
opts = dict(clustering_options)
if opts.get("precomputed_clusters") is not None or opts.get("precomputed_clusters_path") is not None:
return opts
from .Time_series_clustering import cluster_analysis
n_clusters, _ = cluster_analysis(grid, opts)
payload = _build_cluster_cache_payload(grid, n_clusters)
if cache_json_path is None:
cache_json_path = os.path.join(
tempfile.gettempdir(),
f"pyflow_seq_ms_step_clusters_{id(grid)}_{int(n_clusters)}.json",
)
with open(cache_json_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh)
opts["n_clusters"] = int(n_clusters)
opts["precomputed_clusters_path"] = cache_json_path
return opts
def _run_sequential_core(
*,
grid,
inv_data,
mix_data,
n_years,
Hy,
discount_rate,
tee,
export_dir,
save_svgs,
svg_prefix,
export_steps,
period_solver,
step_name,
excel_prefix,
export_csv_name,
run_results_attr,
run_flag_attr,
results_attr,
obj_attr,
fuel_attr,
):
grid.reset_run_flags()
analyse_grid(grid)
if inv_data is not None:
add_inv_series(grid, inv_data)
if mix_data is not None:
add_gen_mix_limits(grid, mix_data)
for gen in grid.Generators:
planned_positive = _series_has_positive(gen.investment_decisions.get("planned_installation", 0.0))
gen.np_gen_opf = bool(gen.np_gen_opf or planned_positive)
gen.np_gen_mp = False
for rs in grid.RenSources:
planned_positive = _series_has_positive(rs.investment_decisions.get("planned_installation", 0.0))
rs.np_rsgen_opf = bool(rs.np_rsgen_opf or planned_positive)
rs.np_rsgen_mp = False
n_runs = _fill_investment_decisions(grid)
_validate_grid_for_MP_TEP(grid)
if n_runs <= 0:
raise ValueError("No investment periods found in grid investment decisions.")
grid.TEP_n_years = n_years
grid.TEP_discount_rate = discount_rate
grid.TEP_n_periods = n_runs
grid.GPR = any(gen.np_gen_opf for gen in grid.Generators)
grid.rs_GPR = any(rs.np_rsgen_opf for rs in grid.RenSources)
for element in grid.Generators + grid.lines_AC_exp + grid.lines_DC + grid.Converters_ACDC + grid.RenSources:
_calculate_decomision_period(element, n_years)
if export_dir is not None:
os.makedirs(export_dir, exist_ok=True)
aged_decommission_schedule = {}
run_results = {}
pre_existing_by_name = _snapshot_dynamic_counts(grid)
absolute_np_max_by_name = {}
element_meta = {}
report_rows = {}
for name, el, np_attr, type_name in _iter_dynamic_elements_typed(grid):
max_attr = _max_attr_from_np_attr(np_attr)
absolute_np_max_by_name[name] = float(getattr(el, max_attr, getattr(el, np_attr)))
element_meta[name] = {"element": el, "np_attr": np_attr, "type": type_name}
report_rows[name] = {
"Element": name,
"Type": type_name,
"Pre Existing": pre_existing_by_name.get(name, 0.0),
}
_deactivate_non_pre_existing_loads(grid)
fuel_type_dist_by_period = {0: current_fuel_type_distribution(grid, output="df")}
obj_rows = []
aborted = False
abort_reason = None
try:
for k in range(n_runs):
np_before = _snapshot_dynamic_counts(grid)
_update_grid_investment_period(grid, k)
load_multiplier = None
linked_now, planned_now, applied_decommission = _apply_decommission_for_run(
grid, aged_decommission_schedule, k
)
_apply_sequential_run_np_caps(grid, k, absolute_np_max_by_name)
_apply_generation_type_limits_from_run(grid, k)
_round_dynamic_np_to_nearest_integer(grid)
model, model_res, timing_info, solver_stats, extra_run_data = period_solver(k)
if export_steps:
from .Results_class import Results
res = Results(grid)
res.pyomo_model_results(model, solver_stats=solver_stats, model_results=model_res, print_table=False)
res.all(
export_location=export_dir,
export_type="excel",
file_name=f"{excel_prefix}_{k+1}",
print_table=False,
)
_round_dynamic_np_to_nearest_integer(grid)
has_feasible_solution = bool(solver_stats and solver_stats.get("solution_found", False))
if not has_feasible_solution:
aborted = True
termination = solver_stats.get("termination_condition", "unknown") if solver_stats else "unknown"
abort_reason = f"run {k + 1} has no feasible solution (termination={termination})"
if tee:
print(f"{step_name} aborted at run {k + 1}: no solution found (termination={termination})")
break
_register_future_aged_decommission(
grid=grid,
run_idx=k,
n_years=n_years,
decommission_applied_by_name=applied_decommission,
np_before_by_name=np_before,
schedule=aged_decommission_schedule,
)
if save_svgs and export_dir is not None:
save_network_svg(grid, name=os.path.join(export_dir, f"{svg_prefix}_{k+1}"))
run_data = {
"run": k + 1,
"csv_row_index": k + 2,
"load_multiplier": load_multiplier,
"model": model,
"model_res": model_res,
"timing_info": timing_info,
"solver_stats": solver_stats,
"decommission_applied": applied_decommission,
"linked_decommission_requested": linked_now,
"planned_decommission_requested": planned_now,
"linked_decommission_requested_total": float(sum(linked_now.values())) if linked_now else 0.0,
"linked_decommission_applied_total": float(sum(linked_now.values())) if linked_now else 0.0,
}
if extra_run_data:
run_data.update(extra_run_data)
run_results[k] = run_data
period = k + 1
np_after = _snapshot_dynamic_counts(grid)
period_tep_obj = 0.0
for name, row in report_rows.items():
decommissioned = float(applied_decommission.get(name, 0.0))
installed = float(np_after[name]) - (float(np_before[name]) - decommissioned)
if abs(installed) <= 1e-9:
installed = 0.0
active = float(np_after[name])
base_cost = float(getattr(element_meta[name]["element"], "base_cost", 0.0) or 0.0)
cost = installed * base_cost
row[f"Decommissioned_{period}"] = decommissioned
row[f"Installed_{period}"] = installed
row[f"Active_{period}"] = active
row[f"Cost_{period}"] = cost
period_tep_obj += cost
fuel_type_dist_by_period[period] = current_fuel_type_distribution(grid, output="df")
opf_obj = sum(float(x.get("v", 0.0)) for x in grid.OPF_obj.values())
npv_opf_obj = sum(float(x.get("NPV", 0.0)) for x in grid.OPF_obj.values())
# Sequential STEP report: TEP is investment-only for this run.
tep_obj = float(period_tep_obj)
economic_step_obj = tep_obj + npv_opf_obj
model_step_obj = float(pyo.value(model.obj) * getattr(model, "obj_scaling", 1.0))
present_value_tep = 1 / (1 + discount_rate) ** (k * n_years)
obj_rows.append(
{
"Investment_Period": period,
"OPF_Objective": opf_obj,
"NPV_OPF_Objective": npv_opf_obj,
"TEP_Objective": tep_obj,
"STEP_Objective": model_step_obj,
"NPV_STEP_Objective": model_step_obj * present_value_tep,
"STEP_Objective_Economic": economic_step_obj,
"NPV_STEP_Objective_Economic": economic_step_obj * present_value_tep,
}
)
if export_dir is not None:
export_results_to_csv(run_results, export_dir, file_name=export_csv_name)
finally:
_restore_absolute_np_caps(element_meta, absolute_np_max_by_name)
seq_results_df = pd.DataFrame(list(report_rows.values()))
if not seq_results_df.empty:
cost_cols = [f"Cost_{i+1}" for i in range(n_runs) if f"Cost_{i+1}" in seq_results_df.columns]
seq_results_df["Total_Cost"] = seq_results_df[cost_cols].sum(axis=1) if cost_cols else 0.0
total_row = {}
for col in seq_results_df.columns:
if col == "Element":
total_row[col] = "Total cost"
elif "Cost" in col:
total_row[col] = seq_results_df[col].sum()
else:
total_row[col] = ""
seq_results_df = pd.concat([seq_results_df, pd.DataFrame([total_row])], ignore_index=True)
run_results["_meta"] = {
"aborted": bool(aborted),
"abort_reason": abort_reason,
}
setattr(grid, run_results_attr, run_results)
setattr(grid, run_flag_attr, True)
setattr(grid, results_attr, seq_results_df)
setattr(
grid,
obj_attr,
pd.DataFrame(
obj_rows,
columns=[
"Investment_Period",
"OPF_Objective",
"NPV_OPF_Objective",
"TEP_Objective",
"STEP_Objective",
"NPV_STEP_Objective",
"STEP_Objective_Economic",
"NPV_STEP_Objective_Economic",
],
),
)
setattr(grid, fuel_attr, fuel_type_dist_by_period)
setattr(grid, f"{run_flag_attr}_aborted", bool(aborted))
setattr(grid, f"{run_flag_attr}_abort_reason", abort_reason)
return run_results
def _get_time_limit_for_run(time_limit, run_idx):
"""
Supports:
- scalar time_limit: returned as-is
- dict time_limit: per run index overrides
For dict keys we accept:
- `run_idx` (0-based) and `run_idx+1` (1-based)
- string versions of the same keys (e.g. "0", "1", "2")
- optional `'default'`
"""
if time_limit is None:
return None
if not isinstance(time_limit, dict):
return time_limit
# Build candidate keys (int + str) for robust matching.
candidates = [
run_idx,
run_idx + 1,
str(run_idx),
str(run_idx + 1),
]
for key in candidates:
if key in time_limit:
return time_limit[key]
if "default" in time_limit:
return time_limit["default"]
return None
[docs]
def sequential_STEP(
grid,
inv_data=None,
mix_data=None,
n_years=10,
Hy=8760,
discount_rate=0.02,
ObjRule=None,
solver="bonmin",
time_limit=None,
tee=False,
callback=False,
solver_options=None,
obj_scaling=1.0,
export_dir=None,
svg_prefix="sequential_STEP",
save_svgs=False,
export_steps=False,
nlp_warmstart=False,
):
"""
Sequentially solve static transmission expansion one investment period at a time.
Each period calls :func:`transmission_expansion` on the current grid; results
are linked across periods as an alternative to
:func:`multi_period_transmission_expansion`.
"""
def _period_solver(k):
time_limit_k = _get_time_limit_for_run(time_limit, k)
model, model_res, timing_info, solver_stats = transmission_expansion(
grid,
NPV=True,
n_years=n_years,
Hy=Hy,
discount_rate=discount_rate,
ObjRule=ObjRule,
solver=solver,
time_limit=time_limit_k,
tee=tee,
callback=callback,
solver_options=solver_options,
obj_scaling=obj_scaling,
nlp_warmstart=nlp_warmstart,
)
return model, model_res, timing_info, solver_stats, {}
return _run_sequential_core(
grid=grid,
inv_data=inv_data,
mix_data=mix_data,
n_years=n_years,
Hy=Hy,
discount_rate=discount_rate,
tee=tee,
export_dir=export_dir,
save_svgs=save_svgs,
svg_prefix=svg_prefix,
export_steps=export_steps,
period_solver=_period_solver,
step_name="Sequential STEP",
excel_prefix="sequential_STEP",
export_csv_name="sequential_step_results.csv",
run_results_attr="sequential_STEP_run_results",
run_flag_attr="Seq_STEP_run",
results_attr="Seq_STEP_results",
obj_attr="Seq_STEP_obj_res",
fuel_attr="Seq_STEP_fuel_type_distribution",
)
[docs]
def sequential_MS_STEP(
grid,
inv_data=None,
mix_data=None,
n_years=10,
Hy=8760,
discount_rate=0.02,
clustering_options=None,
ObjRule=None,
solver="bonmin",
tee=False,
callback=False,
solver_options=None,
obj_scaling=1.0,
export_dir=None,
svg_prefix="sequential_MS_STEP",
save_svgs=False,
export_steps=False,
alpha=None,
limit_flow_rate=True,
nlp_warmstart=False,
clustering_cache_json_path=None,
reuse_clustering_cache=True,
):
"""
Sequentially solve multi-scenario transmission expansion one investment period at a time.
Each period calls :func:`multi_scenario_TEP` on the current grid; results are
linked across periods as an alternative to
:func:`multi_period_MS_TEP`.
"""
ms_clustering_options = clustering_options
if reuse_clustering_cache:
ms_clustering_options = _prepare_ms_clustering_reuse(
grid,
clustering_options,
cache_json_path=clustering_cache_json_path,
)
def _period_solver(k):
model, model_res, timing_info, solver_stats, step_ms_res = multi_scenario_TEP(
grid,
NPV=True,
n_years=n_years,
Hy=Hy,
discount_rate=discount_rate,
clustering_options=ms_clustering_options,
ObjRule=ObjRule,
solver=solver,
tee=tee,
callback=callback,
alpha=alpha,
limit_flow_rate=limit_flow_rate,
obj_scaling=obj_scaling,
solver_options=solver_options,
nlp_warmstart=nlp_warmstart,
)
return model, model_res, timing_info, solver_stats, {"TEP_multiScenario_res": step_ms_res}
return _run_sequential_core(
grid=grid,
inv_data=inv_data,
mix_data=mix_data,
n_years=n_years,
Hy=Hy,
discount_rate=discount_rate,
tee=tee,
export_dir=export_dir,
save_svgs=save_svgs,
svg_prefix=svg_prefix,
export_steps=export_steps,
period_solver=_period_solver,
step_name="Sequential MS STEP",
excel_prefix="sequential_MS_STEP",
export_csv_name="sequential_ms_step_results.csv",
run_results_attr="sequential_MS_STEP_run_results",
run_flag_attr="Seq_MS_STEP_run",
results_attr="Seq_MS_STEP_results",
obj_attr="Seq_MS_STEP_obj_res",
fuel_attr="Seq_MS_STEP_fuel_type_distribution",
)