"""Static and Plotly network/result plotting.
Renders grid topology and analysis results as Matplotlib/Plotly figures and
network SVGs.
Owns: non-interactive figure generation.
Does not own: interactive Dash apps (see ``Graph_Dash``) or geographic maps
(see ``Mapping``).
"""
import networkx as nx
import pandas as pd
import plotly.graph_objs as go
import plotly.io as pio
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import logging
import itertools
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import os
import utm
from shapely.geometry import Point, LineString,Polygon,MultiPolygon
from .Classes import Node_AC, Node_DC
def _loading_colormap(value, vmin=0, vmax=100):
"""
Map a value to a color (green -> yellow -> red) for loading visualization.
Replaces branca.colormap.LinearColormap.
"""
# Clamp value to range
normalized = max(0, min(1, (value - vmin) / (vmax - vmin)))
# Create colormap: green (0) -> yellow (0.5) -> red (1)
cmap = mcolors.LinearSegmentedColormap.from_list(
'loading', ['green', 'yellow', 'red']
)
rgba = cmap(normalized)
# Return as hex color string
return mcolors.to_hex(rgba)
__all__ = ['plot_graph',
'time_series_prob',
'plot_neighbour_graph',
'plot_TS_res',
'plot_model_feasibility',
'save_network_svg',
'plot_3D',
'create_geometries_from_coords']
def _installation_cost_meur(element):
base_cost = getattr(element, 'base_cost', 0.0)
if isinstance(base_cost, (list, tuple, np.ndarray)):
active_config = int(getattr(element, 'active_config', -1))
if 0 <= active_config < len(base_cost):
base_cost = base_cost[active_config]
else:
base_cost = 0.0
try:
return np.round(float(base_cost) / 10**6, decimals=2)
except (TypeError, ValueError):
return 0.0
def update_ACnode_hovertext(node,S_base,text):
# print(f"Updating hover text for node: {node.name}")
dec= 2
if text =='data':
name = node.name
typ = node.type
Load = np.round(node.PLi, decimals=dec)
x_cord = node.x_coord
y_cord = node.y_coord
PZ = node.PZ
node.hover_text = f"Node: {name}<br>coord: {x_cord},{y_cord}<br>Type: {typ}<br>Load: {Load}<br>Area: {PZ}"
elif text=='inPu':
name = node.name
V = np.round(node.V, decimals=dec)
theta = np.round(node.theta, decimals=dec)
PGi= node.gen_P_node_aggregate()
Gen = np.round(PGi, decimals=dec)
Load = np.round(node.PLi, decimals=dec)
conv = np.round(node.P_s, decimals=dec)
PZ = node.PZ
node.hover_text = f"Node: {name}<br>Voltage: {V}<br>Angle: {theta}<br>Generation: {Gen}<br>Load: {Load}<br>Converters: {conv}<br>PZ: {PZ}"
else:
name = node.name
V = int(np.round(node.V*node.kV_base, decimals=0))
theta = int(np.round(np.degrees(node.theta), decimals=0))
PGi= node.gen_P_node_aggregate()
Gen = int(np.round(PGi*S_base, decimals=0))
Load = int(np.round(node.PLi*S_base, decimals=0))
conv = int(np.round(node.P_s*S_base, decimals=0))
PZ = node.PZ
node.hover_text = f"Node: {name}<br>Voltage: {V}kV<br>Angle: {theta}°<br>Generation: {Gen}MW<br>Load: {Load}MW<br>Converters: {conv}MW<br>PZ: {PZ}"
def update_DCnode_hovertext(node,S_base,text):
dec= 2
if text =='data':
name = node.name
typ = node.type
Load = np.round(node.PLi, decimals=dec)
x_cord = node.x_coord
y_cord = node.y_coord
PZ = node.PZ
node.hover_text = f"Node: {name}<br>coord: {x_cord},{y_cord}<br>Type: {typ}<br>Load: {Load}<br>Area: {PZ}"
elif text=='inPu':
name = node.name
V = np.round(node.V, decimals=dec)
conv = np.round(node.Pconv, decimals=dec)
node.hover_text = f"Node: {name}<br>Voltage: {V}<br><br>Converter: {conv}"
else:
name = node.name
V = np.round(node.V*node.kV_base, decimals=0).astype(int)
if node.ConvInv and node.Nconv >= 10**-dec:
conv = np.round(node.Pconv*S_base, decimals=0).astype(int)
nconv = np.round(node.Nconv,decimals=dec)
load = abs(int(np.round(conv / (node.conv_MW*nconv) * 100)))
node.hover_text = f"Node: {name}<br>Voltage: {V}kV<br>Converter:{conv}MW<br>Number Converter: {nconv}<br>Converters loading: {load}%"
else:
node.hover_text = f"Node: {name}<br>Voltage: {V}kV"
def update_lineAC_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if line.fromS >= 0 else 'to'
if text =='data':
name = line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
l = int(line.Length_km)
z= np.round(line.Z,decimals=5)
y= np.round(line.Y,decimals=5)
rating = line.MVA_rating
rating = np.round(rating,decimals=0)
Line_tf = 'Transformer' if line.isTf else 'Line'
cable = line.Cable_type
line.hover_text = f"{Line_tf}: {name}<br> Z:{z}<br>Y:{y}<br>Length: {l}km<br>Rating: {rating}MVA<br>Type: {cable}"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS, decimals=dec)
Sto = np.round(line.toS, decimals=dec)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=dec)
Line_tf = 'Transformer' if line.isTf else 'Line'
cable = line.Cable_type
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"{Line_tf}: {name}<br> {line_string}<br>S from: {Sfrom}<br>S to: {Sto}<br>Loading: {Loading}%<br>Type: {cable}"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS*S_base, decimals=0)
Sto = np.round(line.toS*S_base, decimals=0)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
Line_tf = 'Transformer' if line.isTf else 'Line'
cable = line.Cable_type
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"{Line_tf}: {name}<br> {line_string}<br>S from: {Sfrom}MVA<br>S to: {Sto}MVA<br>Loading: {Loading}%<br>Type: {cable}%"
def update_lineDC_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if line.fromP >= 0 else 'to'
if text =='data':
name = line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
np_line = np.round(line.np_line, decimals=1)
installation_cost = _installation_cost_meur(line)
r= np.round(line.r,decimals=5)
l = int(line.Length_km)
rating = np.round(line.MW_rating, decimals=0)
rating_total = np.round(line.capacity_MW, decimals=0)
line.hover_text = f"Line: {name}<br> R:{r}<br>Length:{l}km<br>Rating (n=1): {rating}MW<br>Total rating: {rating_total}MW<br>Number of lines: {np_line}<br>Installation cost: {installation_cost}M€"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Pfrom= np.round(line.fromP, decimals=dec)
Pto = np.round(line.toP, decimals=dec)
np_line = np.round(line.np_line, decimals=1)
if np_line == 0:
load = 0
else:
load = max(np.abs(Pfrom), np.abs(Pto))*S_base/(line.MW_rating*line.np_line)*100
Loading = np.round(load, decimals=dec)
if Pfrom > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"Line: {name}<br> {line_string}<br>P from: {Pfrom}<br>P to: {Pto}<br>Loading: {Loading}%"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Pfrom= np.round(line.fromP*S_base, decimals=0).astype(int)
Pto = np.round(line.toP*S_base, decimals=0).astype(int)
np_line = np.round(line.np_line, decimals=1)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if Pfrom > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"Line: {name}<br> {line_string}<br>P from: {Pfrom}MW<br>P to: {Pto}MW<br>Loading: {Loading}%<br>Number Lines: {np_line}"
def update_lineACexp_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if line.fromS >= 0 else 'to'
if text =='data':
name = line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
installation_cost = _installation_cost_meur(line)
l = int(line.Length_km)
z= np.round(line.Z,decimals=5)
y= np.round(line.Y,decimals=5)
rating = np.round(line.MVA_rating, decimals=0)
rating_total = np.round(line.capacity_MVA, decimals=0)
np_line = np.round(line.np_line, decimals=1)
Line_tf = 'Transformer' if line.isTf else 'Line'
line.hover_text = f"{Line_tf}: {name}<br> Z:{z}<br>Y:{y}<br>Length: {l}km<br>Rating (unitary): {rating}MVA<br>Total rating: {rating_total}MVA<br>Number of lines: {np_line}<br>Installation cost: {installation_cost}M€"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS, decimals=dec)
Sto = np.round(line.toS, decimals=dec)
np_line = np.round(line.np_line, decimals=1)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Transformer' if line.isTf else 'Line'
line.hover_text = f"{Line_tf}: {name}<br> {line_string}<br>S from: {Sfrom}<br>S to: {Sto}<br>Loading: {Loading}%<br>Number of lines: {np_line}"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS*S_base, decimals=0)
Sto = np.round(line.toS*S_base, decimals=0)
np_line = np.round(line.np_line, decimals=1)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Transformer' if line.isTf else 'Line'
line.hover_text = f"Line: {name}<br> {line_string}<br>S from: {Sfrom}MVA<br>S to: {Sto}MVA<br>Loading: {Loading}%<br>Number of lines: {np_line}"
def update_lineACrec_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if line.fromS >= 0 else 'to'
if text =='data':
name = line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
installation_cost = _installation_cost_meur(line)
l = int(line.Length_km)
z= np.round(line.Z,decimals=5) if not line.rec_branch else np.round(line.Z_new,decimals=5)
y= np.round(line.Y,decimals=5) if not line.rec_branch else np.round(line.Y_new,decimals=5)
rating = line.MVA_rating if not line.rec_branch else line.MVA_rating_new
rating = np.round(rating,decimals=0)
Line_tf = 'Reconductoring branch'
line.hover_text = f"{Line_tf}: {name}<br> Z:{z}<br>Y:{y}<br>Length: {l}km<br>Rating: {rating}MVA<br>Installation cost: {installation_cost}M€"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS, decimals=dec)
Sto = np.round(line.toS, decimals=dec)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Reconductoring branch'
line.hover_text = f"{Line_tf}: {name}<br> {line_string}<br>S from: {Sfrom}<br>S to: {Sto}<br>Loading: {Loading}%<br>Reconductoring: {line.rec_branch}"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS*S_base, decimals=0)
Sto = np.round(line.toS*S_base, decimals=0)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Reconductoring branch'
line.hover_text = f"Line: {name}<br> {line_string}<br>S from: {Sfrom}MVA<br>S to: {Sto}MVA<br>Loading: {Loading}%<br>Reconductoring: {line.rec_branch}"
def update_lineACct_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if np.real(line.fromS) >= 0 else 'to'
active_config = line.active_config
if active_config == -1:
return
if text =='data':
name = line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
installation_cost = _installation_cost_meur(line)
l = int(line.Length_km)
z= np.round(line.Z,decimals=5)
y= np.round(line.Y,decimals=5)
rating = np.round(line.MVA_rating, decimals=0)
Line_tf = 'Cable type line'
line.hover_text = f"{Line_tf}: {name}<br> Z:{z}<br>Y:{y}<br>Length: {l}km<br>Rating: {rating}MVA<br>Installation cost: {installation_cost}M€"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS, decimals=dec)
Sto = np.round(line.toS, decimals=dec)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Cable type line'
line.hover_text = f"{Line_tf}: {name}<br> {line_string}<br>S from: {Sfrom}<br>S to: {Sto}<br>Loading: {Loading}%<br>Cable type: {line._active_config}"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS*S_base, decimals=0)
Sto = np.round(line.toS*S_base, decimals=0)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
Line_tf = 'Cable type line'
line.hover_text = f"Line: {name}<br> {line_string}<br>S from: {Sfrom}MVA<br>S to: {Sto}MVA<br>Loading: {Loading}%<br>Cable type: {line._active_config}"
def update_tf_hovertext(line,S_base,text):
dec=2
line.direction = 'from' if line.fromS >= 0 else 'to'
tap_m = np.round(line.m, decimals=4)
shift_deg = np.round(np.degrees(line.shift), decimals=2)
tap_string = f"Tap: {tap_m}<br>Shift: {shift_deg}°"
if text =='data':
name = line.name
z= np.round(line.Z,decimals=5)
y= np.round(line.Y,decimals=5)
rating = line.MVA_rating
rating = np.round(rating,decimals=0)
Line_tf = 'Transformer' if line.isTf else 'Line'
line.hover_text = f"{Line_tf}: {name}<br> Z:{z}<br>Y:{y}<br>{tap_string}<br>Rating: {rating}MVA"
elif text=='inPu':
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS, decimals=dec)
Sto = np.round(line.toS, decimals=dec)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=dec)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"Transformer: {name}<br> {line_string}<br>{tap_string}<br>S from: {Sfrom}<br>S to: {Sto}<br>Loading: {Loading}%"
else:
name= line.name
fromnode = line.fromNode.name
tonode = line.toNode.name
Sfrom= np.round(line.fromS*S_base, decimals=0)
Sto = np.round(line.toS*S_base, decimals=0)
load = max(line.loading,line.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
line_string = f"{fromnode} -> {tonode}"
else:
line_string = f"{fromnode} <- {tonode}"
line.hover_text = f"Transformer: {name}<br> {line_string}<br>{tap_string}<br>S from: {Sfrom}MVA<br>S to: {Sto}MVA<br>Loading: {Loading}%"
def update_conv_hovertext(conv,S_base,text):
if text =='data':
name= conv.name
fromnode = conv.Node_DC.name
tonode = conv.Node_AC.name
installation_cost = _installation_cost_meur(conv)
rating = np.round(conv.MVA_max,decimals=0)
rating_total = np.round(conv.capacity_MVA, decimals=0)
conv.hover_text = f"Converter: {name}<br>DC node: {fromnode}<br>AC node: {tonode}<br>Rating (unitary): {rating}MVA<br>Total rating: {rating_total}MVA<br>Installation cost: {installation_cost}M€"
elif text=='inPu':
name= conv.name
fromnode = conv.Node_DC.name
tonode = conv.Node_AC.name
Sfrom= np.round(conv.P_DC, decimals=0)
Sto = np.round(np.sqrt(conv.P_AC**2 + conv.Q_AC**2) * np.sign(conv.P_AC), decimals=0)
load = max(conv.loading,conv.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
conv_string = f"{fromnode} -> {tonode}"
else:
conv_string = f"{fromnode} <- {tonode}"
conv.hover_text = f"Converter: {name}<br> {conv_string}<br>P DC: {Sfrom}<br>S AC: {Sto}<br>Loading: {Loading}%"
else:
name= conv.name
fromnode = conv.Node_DC.name
tonode = conv.Node_AC.name
Sfrom= np.round(conv.P_DC*S_base, decimals=0)
pac_sign = conv.P_AC / np.abs(conv.P_AC) if np.abs(conv.P_AC) > 0 else 0
Sto = np.round(np.sqrt(conv.P_AC**2+conv.Q_AC**2) * S_base * pac_sign, decimals=0)
load = max(conv.loading,conv.ts_max_loading)
Loading = np.round(load, decimals=0).astype(int)
if np.real(Sfrom) > 0:
conv_string = f"{fromnode} -> {tonode}"
else:
conv_string = f"{fromnode} <- {tonode}"
conv.hover_text = f"Converter: {name}<br> {conv_string}<br>P DC: {Sfrom}MVA<br>S AC: {Sto}MVA<br>Loading: {Loading}%"
def update_gen_hovertext(gen,S_base,text):
if text =='data':
name= gen.name
node = gen.Node_AC
n_gens = gen.np_gen
installation_cost = np.round(gen.base_cost/10**6, decimals=2)
P_max = np.round(gen.Max_pow_gen * S_base * gen.np_gen, decimals=2)
P_min = np.round(gen.Min_pow_gen * S_base * gen.np_gen, decimals=2)
Q_max = np.round(gen.Max_pow_genR * S_base * gen.np_gen, decimals=2)
Q_min = np.round(gen.Min_pow_genR * S_base * gen.np_gen, decimals=2)
rating = gen.capacity_MVA
rating = np.round(rating,decimals=1)
gen.hover_text = f"Generator: {name}<br>Number of generators: {n_gens}<br>Rating: {rating}MVA<br>Installation cost: {installation_cost}M€<br>Fuel: {gen.gen_type}<br>P max: {P_max}MW<br>Q max: {Q_max}MVAR<br>P min: {P_min}MW<br>Q min: {Q_min}MVAR"
elif text =='inPu':
name= gen.name
n_gens = gen.np_gen
Pto = np.round(gen.PGen, decimals=2)
Qto = np.round(gen.QGen, decimals=2)
load = gen.loading
Loading = np.round(load, decimals=0).astype(int)
gen.hover_text = f"Generator: {name}<br>Number of generators: {n_gens}<br> P gen: {Pto}<br>Q Gen: {Qto}<br>Loading: {Loading}%"
else:
name= gen.name
n_gens = gen.np_gen
Pto = np.round(gen.PGen*S_base, decimals=1)
Qto = np.round(gen.QGen*S_base, decimals=1)
load = gen.loading
Loading = np.round(load, decimals=0).astype(int)
gen.hover_text = f"Generator: {name}<br>Number of generators: {n_gens}<br> P gen: {Pto*S_base}MW<br>Q Gen: {Qto*S_base}MVAR<br>Loading: {Loading}%"
def update_renSource_hovertext(renSource,S_base,text):
if text =='data':
name= renSource.name
node = renSource.Node
n_rs = renSource.np_rsgen
installation_cost = np.round(renSource.base_cost/10**6, decimals=2)
rating = np.round(renSource.capacity_MVA, decimals=2)
Pmin = np.round(
renSource.PGi_ren_base * renSource.min_gamma * renSource.np_rsgen * S_base,
decimals=2,
)
Pmax = np.round(
renSource.PGi_ren_base * renSource.np_rsgen * S_base,
decimals=2,
)
renSource.hover_text = f"Ren Source: {name}<br>Number of sources: {n_rs}<br>Rating: {rating}<br>Installation cost: {installation_cost} M€<br>Tech: {renSource.rs_type}<br>P min: {Pmin}MW<br>P max: {Pmax}MW"
elif text=='inPu':
name= renSource.name
n_rs = renSource.np_rsgen
Pto= np.round(renSource.PGi_ren, decimals=0)
Curt = np.round((1-renSource.gamma)*100, decimals=0)
renSource.hover_text = f"Ren Source: {name}<br>Number of sources: {n_rs}<br> P : {Pto}<br>Curtailment: {Curt}%"
else:
name= renSource.name
n_rs = renSource.np_rsgen
Pto= np.round(renSource.PGi_ren*S_base, decimals=0)
Curt = np.round((1-renSource.gamma)*100, decimals=0)
renSource.hover_text = f"Ren Source: {name}<br>Number of sources: {n_rs}<br> P : {Pto}MW<br>Curtailment: {Curt}%"
def update_hovertexts(grid,text):
S_base= grid.S_base
with ThreadPoolExecutor() as executor:
futures = []
if grid.nodes_AC is not None:
# Update hover texts for nodes
for node in grid.nodes_AC:
futures.append(executor.submit(update_ACnode_hovertext, node, S_base, text))
if grid.nodes_DC is not None:
for node in grid.nodes_DC:
futures.append(executor.submit(update_DCnode_hovertext, node, S_base, text))
if grid.lines_AC is not None:
# Update hover texts for lines
for line in grid.lines_AC:
futures.append(executor.submit(update_lineAC_hovertext, line, S_base, text))
if grid.lines_DC is not None:
for line in grid.lines_DC:
futures.append(executor.submit(update_lineDC_hovertext, line, S_base, text))
if grid.lines_AC_exp is not None:
for line in grid.lines_AC_exp:
futures.append(executor.submit(update_lineACexp_hovertext, line, S_base, text))
if grid.lines_AC_rec is not None:
for line in grid.lines_AC_rec:
futures.append(executor.submit(update_lineACrec_hovertext, line, S_base, text))
if grid.lines_AC_ct is not None:
for line in grid.lines_AC_ct:
futures.append(executor.submit(update_lineACct_hovertext, line, S_base, text))
if grid.lines_AC_tf is not None:
for line in grid.lines_AC_tf:
futures.append(executor.submit(update_tf_hovertext, line, S_base, text))
if grid.Converters_ACDC is not None:
for conv in grid.Converters_ACDC:
futures.append(executor.submit(update_conv_hovertext, conv, S_base, text))
if grid.Generators is not None:
for gen in grid.Generators:
futures.append(executor.submit(update_gen_hovertext, gen, S_base, text))
if grid.RenSources is not None:
for renSource in grid.RenSources:
futures.append(executor.submit(update_renSource_hovertext, renSource, S_base, text))
# Wait for all futures to complete
for future in futures:
try:
future.result() # This will block until the task is finished
except Exception as e:
print(f"Error in thread: {e}")
def initialize_positions(Grid):
"""Initialize positions for the grid nodes."""
return Grid.node_positions if Grid.node_positions is not None else {}
def assign_layout_to_missing_nodes(G, pos):
"""Assign layout to nodes missing positions."""
missing_nodes = [
node for node in G.nodes if node not in pos or pos[node][0] is None or pos[node][1] is None]
if missing_nodes:
try:
# Attempt to apply planar layout to missing nodes
pos_missing = nx.planar_layout(G.subgraph(missing_nodes))
pos.update(pos_missing)
except nx.NetworkXException as e:
logging.warning("Planar layout failed, falling back to Kamada-Kawai layout.")
# Fall back to Kamada-Kawai layout
pos_missing = nx.kamada_kawai_layout(G.subgraph(missing_nodes))
pos.update(pos_missing)
return pos
def assign_converter_positions(Grid, pos):
"""Assign positions for DC nodes using corresponding AC node positions."""
if Grid.Converters_ACDC is not None:
for conv in Grid.Converters_ACDC:
dc_node = conv.Node_DC
ac_node = conv.Node_AC
if ac_node in pos:
pos[dc_node] = pos[ac_node]
else:
logging.warning(f"AC node {ac_node} for converter {conv.name} is missing in positions.")
return pos
def calculate_positions(G, Grid):
"""Calculate positions for nodes in the graph."""
# Step 1: Initialize positions
pos = initialize_positions(Grid)
# Step 2: Assign layout to missing nodes
pos = assign_layout_to_missing_nodes(G, pos)
# Step 3: Assign positions for converters
pos = assign_converter_positions(Grid, pos)
return pos
[docs]
def plot_neighbour_graph(grid, node=None, base_node_size=10, proximity=1,show=True):
"""Plot the ego graph of a node and its neighbours.
Builds a subgraph within ``proximity`` hops of ``node`` on ``grid.Graph_toPlot``
and opens an interactive Plotly view via :func:`plot_graph`.
Parameters
----------
grid : Grid
Grid whose topology is plotted.
node : Node_AC, Node_DC, or str, optional
Centre node object, or its ``name`` string (searched on AC then DC nodes).
base_node_size : int, optional
Base marker size passed to :func:`plot_graph`.
proximity : int, optional
Hop radius for :func:`networkx.ego_graph`.
Raises
------
ValueError
If ``node`` is omitted, not found, or absent from ``grid.Graph_toPlot``.
Examples
--------
>>> import pyflow_acdc as pyf
>>> grid, _ = pyf.cases['case24_3zones_acdc']()
>>> pyf.plot_neighbour_graph(grid, node='111')
"""
G = grid.Graph_toPlot
node_ref = node
if isinstance(node, str):
node = next((n for n in grid.nodes_AC if n.name == node), None)
if node is None and grid.nodes_DC is not None:
node = next((n for n in grid.nodes_DC if n.name == node_ref), None)
if node is None:
raise ValueError(f"Node {node_ref!r} not found in grid.")
if node not in G:
raise ValueError(f"Node {node.name!r} is not in the plot graph.")
Gn = nx.ego_graph(G, node, proximity)
plot_graph(grid, base_node_size=base_node_size, G=Gn,show=show)
[docs]
def plot_graph(Grid,text='inPu',base_node_size=10,G=None,show=True):
if G is None:
G = Grid.Graph_toPlot
update_hovertexts(Grid, text)
# Initialize pos with node_positions if provided, else empty dict
pos = calculate_positions(G, Grid)
lines_ac = Grid.lines_AC if Grid.lines_AC is not None else []
lines_ac_exp = Grid.lines_AC_exp if Grid.lines_AC_exp is not None else []
lines_ac_rec = Grid.lines_AC_rec if Grid.lines_AC_rec is not None else []
lines_ac_ct = Grid.lines_AC_ct if Grid.lines_AC_ct is not None else []
lines_dc = Grid.lines_DC if Grid.lines_DC is not None else []
nodes_DC = Grid.nodes_DC if Grid.nodes_DC is not None else []
lines_dc_set = set(lines_dc)
lines_ac_exp_set = set(lines_ac_exp)
lines_ac_rec_set = set(lines_ac_rec)
lines_ac_ct_set = set(lines_ac_ct)
pio.renderers.default = 'browser'
# Define a color palette for the subgraphs
color_palette = itertools.cycle([
'red', 'blue', 'green', 'purple', 'orange',
'cyan', 'magenta', 'brown', 'gray',
'black', 'lime', 'navy', 'teal',
'violet', 'indigo', 'turquoise', 'beige', 'coral', 'salmon', 'olive'])
#
# Find connected components (subgraphs)
connected_components = list(nx.connected_components(G))
pos_cache = pos
node_traces_data = []
edge_traces_data = []
mnode_x_data = []
mnode_y_data = []
mnode_txt_data = []
# Create traces for each subgraph with a unique color
edge_traces = []
node_traces = []
mnode_trace = []
for idx, subgraph_nodes in enumerate(connected_components):
color = next(color_palette)
# Create edge trace for the current subgraph
for edge in G.subgraph(subgraph_nodes).edges(data=True):
line = edge[2]['line']
# Skip lines with np_line == 0
if (line in lines_dc_set and line.np_line == 0) or (line in lines_ac_exp_set and line.np_line == 0):
continue # Skip plotting for lines where np_line == 0
# Set line width based on line type
if line in lines_dc_set:
line_width = line.np_line if line.np_line > 0 else 0
elif line in lines_ac_exp_set:
line_width = line.np_line if line.np_line > 0 else 0
else:
line_width = 1
# Cache positions to avoid repeated access
x0, y0 = pos_cache[edge[0]]
x1, y1 = pos_cache[edge[1]]
# Collect midpoint data for marker
mnode_x_data.append((x0 + x1) / 2)
mnode_y_data.append((y0 + y1) / 2)
mnode_txt_data.append(line.hover_text)
# Append edge trace data
edge_traces_data.append((x0, y0, x1, y1, line_width, color))
# Process nodes for the current subgraph
x_subgraph_nodes = []
y_subgraph_nodes = []
hover_texts_nodes_sub = []
node_sizes = []
node_opacities = []
for node in subgraph_nodes:
x_subgraph_nodes.append(pos_cache[node][0])
y_subgraph_nodes.append(pos_cache[node][1])
# Adjust for DC nodes
if node in nodes_DC:
if Grid.TEP_run:
node_size = max(base_node_size * (node.Nconv - node.Nconv_i) + base_node_size, base_node_size)
node_opacity = max(min(node.Nconv, 1.0),0) if node.ConvInv else 1.0
else:
node_size = base_node_size
node_opacity = 1.0
hover_texts_nodes_sub.append(node.hover_text)
node_sizes.append(node_size)
node_opacities.append(node_opacity)
# Collect node trace data
node_traces_data.append((x_subgraph_nodes, y_subgraph_nodes, node_sizes, node_opacities, hover_texts_nodes_sub, color))
# After the loops, create all traces in bulk
# Edge Traces
for (x0, y0, x1, y1, line_width, color) in edge_traces_data:
edge_traces.append(go.Scatter(
x=[x0, x1, None],
y=[y0, y1, None],
mode='lines',
line=dict(width=line_width, color=color),
visible=True,
text="hover_text_placeholder", # Replace with actual hover text
hoverinfo='text'
))
# Node Traces
for (x_subgraph_nodes, y_subgraph_nodes, node_sizes, node_opacities, hover_texts_nodes_sub, color) in node_traces_data:
node_traces.append(go.Scatter(
x=x_subgraph_nodes,
y=y_subgraph_nodes,
mode='markers',
marker=dict(
size=node_sizes,
color=color,
opacity=node_opacities,
line=dict(width=2)
),
text=hover_texts_nodes_sub,
hoverinfo='text',
visible=True
))
# Create mnode_trace (midpoint node trace) only after processing edges
mnode_trace = go.Scatter(
x=mnode_x_data,
y=mnode_y_data,
mode="markers",
showlegend=False,
hovertemplate="%{hovertext}<extra></extra>",
visible=True,
hovertext=mnode_txt_data,
marker=dict(
opacity=0,
size=10,
color=color
)
)
layout = go.Layout(
showlegend=False,
hovermode='closest',
margin=dict(b=20, l=5, r=5, t=40),
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
width=600, # Set width
height=600,
# updatemenus=updatemenus
)
# Create figure
fig = go.Figure(data=edge_traces + node_traces + [mnode_trace], layout=layout)
# Display plot
if show:
pio.show(fig)
s=1
return fig
[docs]
def plot_TS_res(grid, start, end, plotting_choices=None,show=True,path=None,save_format=None,skip_failed=False):
Plot = [
'Power Generation by price zone' ,
'Power Generation by generator' ,
'Curtailment' ,
'Market Prices' ,
'Net price zone power' ,
'AC line loading' ,
'DC line loading' ,
'ACDC Converters' ,
'Power Generation by generator area chart' ,
'Power Generation by price zone area chart' ,
]
if not plotting_choices:
plotting_choices = Plot
for plotting_choice in plotting_choices:
# Verify that the choice is valid
if plotting_choice not in Plot:
print(f"Invalid plotting option: {plotting_choice}")
continue
# Retrieve the time series data for curtailment
y_label = None
ylim = None
if plotting_choice == 'Curtailment':
df = grid.time_series_results['curtailment'].loc[start:end]*100
y_label = 'Curtailment (%)'
ylim = [0,110]
elif plotting_choice in ['Power Generation by generator','Power Generation by generator area chart']:
df = grid.time_series_results['real_power_opf'].loc[start:end]*grid.S_base
y_label = 'Power Generation (MW)'
elif plotting_choice in ['Power Generation by price zone','Power Generation by price zone area chart'] :
df = grid.time_series_results['real_power_by_zone'].loc[start:end] * grid.S_base
y_label = 'Power Generation (MW)'
elif plotting_choice == 'Market Prices':
df = grid.time_series_results['prices_by_zone'].loc[start:end]
df = df.loc[:, ~df.columns.str.startswith('o_')]
y_label = 'Market Prices (€/MWh)'
elif plotting_choice == 'Net price zone power':
df = grid.time_series_results['net_price_zone_power'].loc[start:end]
df = df.loc[:, ~df.columns.str.startswith('o_')]
y_label = 'Net price zone power (MW)'
elif plotting_choice == 'AC line loading':
df = grid.time_series_results['ac_loading'].loc[start:end]*100
y_label = 'AC Line Loading (%)'
ylim = [0,110]
elif plotting_choice == 'DC line loading':
df = grid.time_series_results['dc_loading'].loc[start:end]*100
y_label = 'DC Line Loading (%)'
ylim = [0,110]
elif plotting_choice == 'ACDC Converters':
df = grid.time_series_results['converter_loading'].loc[start:end] * grid.S_base
y_label = 'ACDC Converters (MW)'
ylim = [0,110]
if skip_failed:
if len(df.index) > 0:
horizon_end = min(end, int(df.index.max()))
else:
horizon_end = end
full_index = pd.Index(range(start, horizon_end + 1), name=df.index.name or 'time')
df = df.reindex(full_index)
columns = df.columns
if len(columns) == 0:
print(f"Skipping '{plotting_choice}': no data in time series results.")
continue
time = df.index # Assuming the DataFrame index is time
if show:
# Show figure
pio.renderers.default = 'browser'
layout = dict(
title=f"Time Series Plot: {plotting_choice}", # Set title based on user choice
hovermode="x"
)
cumulative_sum = None
fig = go.Figure()
# Check if we need to stack the areas for specific plotting choices
stack_areas = plotting_choice in ['Power Generation by generator area chart', 'Power Generation by price zone area chart']
# Adding traces to the subplots
for col in columns:
y_values = df[col]
if stack_areas:
# print(stack_areas)
# If stacking, add the current values to the cumulative sum
if cumulative_sum is None:
cumulative_sum = y_values.copy() # Start cumulative sum with the first selected row
fig.add_trace(
go.Scatter(x=time, y=y_values, name=col, hoverinfo='x+y+name', fill='tozeroy')
)
else:
y_values = cumulative_sum + y_values # Stack current on top of cumulative sum
cumulative_sum = y_values # Update cumulative sum
fig.add_trace(
go.Scatter(x=time, y=y_values, name=col, hoverinfo='x+y+name', fill='tonexty')
)
else:
# Plot normally (no stacking)
fig.add_trace(
go.Scatter(x=time, y=y_values, name=col, hoverinfo='x+y+name')
)
# Update layout
fig.update_layout(layout)
fig.show()
if save_format is not None:
# Convert 8.25 cm to inches and maintain ratio
width_cm = 8.25
ratio = 6/10 # Original height/width ratio
width_inches = width_cm / 2.54
height_inches = width_inches * ratio
if len(df) > 10000:
width_inches = width_inches * 2
# Set publication-quality plotting parameters
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
'figure.figsize': (width_inches, height_inches),
'font.family': 'sans-serif',
'font.size': 8,
'axes.labelsize': 8,
'axes.titlesize': 8,
'xtick.labelsize': 7,
'ytick.labelsize': 7,
'legend.fontsize': 6,
'lines.markersize': 4,
'lines.linewidth': 1,
'grid.alpha': 0.3
})
# Create figure with proper spacing for legend
plt.figure(figsize=(width_inches, height_inches))
# Adjust the plot area to make room for the legend
plt.subplots_adjust(right=0.85) # Make room for legend on the right
max_colors = 8
colors = plt.cm.Set2(np.linspace(0, 1, max_colors))
line_markers = ['-', '--', ':', '-.']
i = 0
stack_areas = plotting_choice in ['Power Generation by generator area chart', 'Power Generation by price zone area chart']
cumulative_sum = pd.Series(0.0, index=df.index)
for col in columns:
y_values = df[col]
current_line = '-' if i < max_colors else line_markers[((i - max_colors) % len(line_markers))]
if stack_areas:
y_values = cumulative_sum + y_values
cumulative_sum = y_values
plt.plot(time, y_values, color=colors[i % max_colors], linestyle=current_line, label=col)
else:
plt.plot(time, y_values, color=colors[i % max_colors], linestyle=current_line, label=col)
i += 1
plt.title(plotting_choice)
plt.xlabel('Time')
plt.xlim(time[0], time[-1])
if ylim is not None:
plt.ylim(ylim)
plt.ylabel(y_label)
# Adjust legend position based on number of items
if i < 14:
plt.legend(loc='center left', bbox_to_anchor=(1.02, 0.5),
frameon=False,
ncol=1)
# Make x-axis labels horizontal
plt.xticks(rotation=0)
# Ensure everything fits
plt.tight_layout()
# Save with extra width to accommodate legend
if path is None:
plt.savefig(f"{plotting_choice}.{save_format}",
bbox_inches='tight', # Always use tight to include legend
dpi=300)
else:
plt.savefig(f"{path}/{plotting_choice}.{save_format}",
bbox_inches='tight',
dpi=300)
plt.close()
[docs]
def time_series_prob(grid, element_name, save_format=None, path=None, show=True):
"""Plot the PDF/CDF of a time-series input or OPF result column.
Parameters
----------
grid : Grid
Grid with ``Time_series`` and/or ``time_series_results`` populated.
element_name : str
``Time_series`` name or column in merged OPF result tables.
save_format : str, optional
File format extension (e.g. ``'svg'``). If omitted, nothing is saved.
path : str, optional
Directory for saved figures; defaults to the current working directory.
show : bool, optional
Open an interactive matplotlib window after plotting.
Examples
--------
>>> import pyflow_acdc as pyf
>>> pyf.time_series_prob(grid, 'OWPP_BE', save_format='svg', show=False)
"""
a = grid.Time_series
df_gen = grid.time_series_results['real_power_opf']
df_prices = grid.time_series_results['prices_by_zone']
df_AC_line_res = grid.time_series_results['ac_loading']
df_DC_line_res = grid.time_series_results['dc_loading']
df_conv_res = grid.time_series_results['converter_loading']
merged_df = pd.concat([df_gen, df_prices, df_AC_line_res, df_DC_line_res, df_conv_res], axis=1)
width_cm = 8 # Doubled for side-by-side plots
ratio = 6/10
width_inches = width_cm / 2.54
height_inches = width_inches * ratio
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
'figure.figsize': (width_inches, height_inches),
'font.family': 'sans-serif',
'font.size': 8,
'axes.labelsize': 8,
'axes.titlesize': 8,
'xtick.labelsize': 7,
'ytick.labelsize': 7,
'legend.fontsize': 6,
'lines.markersize': 4,
'lines.linewidth': 1,
'grid.alpha': 0.3
})
found = False
for ts in a:
if ts.name == element_name:
data = ts.data
found = True
break
if not found:
for col in merged_df.columns:
if col == element_name:
data = merged_df[col]
break
fig, ax1 = plt.subplots()
# Plot histogram on primary y-axis
ax1.hist(data, bins=100, density=True, alpha=0.5, color='b', label='PDF')
ax1.set_xlabel(element_name)
ax1.set_ylabel('Probability Density', color='b')
ax1.tick_params(axis='y', labelcolor='b')
# Create secondary y-axis and plot CDF
ax2 = ax1.twinx()
sorted_data = np.sort(data)
cumulative_prob = np.linspace(0, 1, len(sorted_data))
ax2.plot(sorted_data, cumulative_prob, color='r', label='CDF')
ax2.set_ylabel('Cumulative Probability', color='r')
ax2.tick_params(axis='y', labelcolor='r')
# Adjust layout to prevent label cutoff
plt.tight_layout()
# Save before showing
if save_format:
if path is None:
plt.savefig(f"{element_name}_distribution.{save_format}",
bbox_inches='tight',
dpi=300)
else:
plt.savefig(f"{path}/{element_name}_distribution.{save_format}",
bbox_inches='tight',
dpi=300)
if show:
plt.show()
plt.close()
return
def create_subgraph_color_dict(G):
color_palette_0 = itertools.cycle([
'violet', 'limegreen', 'salmon',
'burlywood', 'pink', 'cyan'
])
color_palette_1 = itertools.cycle([
'darkviolet', 'green', 'red',
'darkorange', 'hotpink', 'lightseagreen'
])
color_palette_2 = itertools.cycle([
'darkmagenta', 'darkolivegreen', 'brown',
'darkgoldenrod', 'crimson', 'darkcyan'
])
color_palette_3 = itertools.cycle([
'orchid', 'lightgreen', 'navajowhite',
'tan', 'lightpink', 'paleturquoise'
])
# Get connected components (subgraphs) of the graph G
connected_components = list(nx.connected_components(G))
subgraph_color_dict = {'MV':{},'HV': {}, 'EHV': {}, 'UHV': {}}
# Loop through the connected components and assign colors
for idx, subgraph_nodes in enumerate(connected_components):
subgraph_color_dict['MV'][idx] = next(color_palette_0)
subgraph_color_dict['HV'][idx] = next(color_palette_1)
subgraph_color_dict['EHV'][idx] = next(color_palette_2)
subgraph_color_dict['UHV'][idx] = next(color_palette_3)
return subgraph_color_dict
def create_geometries_from_coords(grid):
"""Build shapely Point/LineString geometries from element x/y coordinates."""
for node in grid.nodes_AC + grid.nodes_DC:
if node.x_coord is not None and node.y_coord is not None and node.geometry is None:
node.geometry = Point(node.x_coord, node.y_coord)
for line in (
grid.lines_AC
+ grid.lines_DC
+ grid.lines_AC_tf
+ grid.lines_AC_rec
+ grid.lines_AC_ct
+ grid.lines_AC_exp
):
if (
line.fromNode.x_coord is not None
and line.fromNode.y_coord is not None
and line.toNode.x_coord is not None
and line.toNode.y_coord is not None
and line.geometry is None
):
line.geometry = LineString(
[
(line.fromNode.x_coord, line.fromNode.y_coord),
(line.toNode.x_coord, line.toNode.y_coord),
]
)
for conv in grid.Converters_ACDC:
if (
conv.Node_AC.x_coord is not None
and conv.Node_AC.y_coord is not None
and conv.Node_DC.x_coord is not None
and conv.Node_DC.y_coord is not None
and conv.geometry is None
):
conv.geometry = LineString(
[
(conv.Node_AC.x_coord, conv.Node_AC.y_coord),
(conv.Node_DC.x_coord, conv.Node_DC.y_coord),
]
)
for gen in grid.Generators + grid.Generators_DC + grid.RenSources:
if gen.x_coord is not None and gen.y_coord is not None and gen.geometry is None:
gen.geometry = Point(gen.x_coord, gen.y_coord)
def create_geometries_from_layout(grid):
"""
Create geometries for all grid elements if they don't exist.
First checks if nodes have x and y coordinates, if not uses calculate_positions.
Then creates geometries for all elements (nodes, lines, converters).
"""
# Step 1: Check if nodes have coordinates, if not create synthetic ones
G = grid.Graph_toPlot
pos = calculate_positions(G, grid)
# Step 2: Create geometries for nodes
for node in grid.nodes_AC + grid.nodes_DC:
if not hasattr(node, 'geometry') or node.geometry is None:
if node in pos:
x, y = pos[node]
node.geometry = Point(x, y)
# Update coordinates if they were None
if node.x_coord is None:
node.x_coord = x
if node.y_coord is None:
node.y_coord = y
# Step 3: Create geometries for AC lines
for line in grid.lines_AC + grid.lines_AC_tf + grid.lines_AC_rec + grid.lines_AC_ct + grid.lines_AC_exp:
if not hasattr(line, 'geometry') or line.geometry is None:
if hasattr(line, 'fromNode') and hasattr(line, 'toNode'):
from_node = line.fromNode
to_node = line.toNode
if from_node in pos and to_node in pos:
x1, y1 = pos[from_node]
x2, y2 = pos[to_node]
line.geometry = LineString([(x1, y1), (x2, y2)])
# Step 4: Create geometries for DC lines
for line in grid.lines_DC:
if not hasattr(line, 'geometry') or line.geometry is None:
if hasattr(line, 'fromNode') and hasattr(line, 'toNode'):
from_node = line.fromNode
to_node = line.toNode
if from_node in pos and to_node in pos:
x1, y1 = pos[from_node]
x2, y2 = pos[to_node]
line.geometry = LineString([(x1, y1), (x2, y2)])
# Step 5: Create geometries for converters
for conv in grid.Converters_ACDC:
if not hasattr(conv, 'geometry') or conv.geometry is None:
if hasattr(conv, 'Node_AC') and hasattr(conv, 'Node_DC'):
ac_node = conv.Node_AC
dc_node = conv.Node_DC
if ac_node in pos and dc_node in pos:
x1, y1 = pos[ac_node]
x2, y2 = pos[dc_node]
conv.geometry = LineString([(x1, y1), (x2, y2)])
# Step 6: Create geometries for generators and renewable sources
ac_nodes_by_name = {str(n.name): n for n in grid.nodes_AC}
dc_nodes_by_name = {str(n.name): n for n in grid.nodes_DC}
for gen in grid.Generators + grid.Generators_DC + grid.RenSources:
if not hasattr(gen, 'geometry') or gen.geometry is None:
node_ref = None
if hasattr(gen, 'Node_AC'):
node_ref = gen.Node_AC
elif hasattr(gen, 'Node'):
node_ref = gen.Node
if node_ref is None:
continue
# Node refs are sometimes stored as node objects, sometimes as node names.
if node_ref in pos:
x, y = pos[node_ref]
gen.geometry = Point(x, y)
else:
node_name = str(node_ref)
node_obj = ac_nodes_by_name.get(node_name) or dc_nodes_by_name.get(node_name)
if node_obj is not None and node_obj in pos:
x, y = pos[node_obj]
gen.geometry = Point(x, y)
[docs]
def save_network_svg(
grid,
name='grid_network',
width=1000,
height=800,
journal=True,
legend=True,
square_ratio=False,
poly=None,
linestrings=None,
coloring=None,
poly_size=None,
tee=False,
line_size_factor=1.0,
draw_converters=True,
scale_ac_nodes_with_rs=False,
node_size_factor=1.0,
scale_dc_nodes_with_conv=False,
dc_node_size_factor=1.0,
):
"""Save the network as SVG file
Parameters:
-----------
square_ratio : bool
If True, expands the data bounds so both axes span the same range, then scales uniformly.
If False, uses the true data extent and **one scale factor for both x and y** (one data unit
in x equals one data unit in y on the figure); the SVG size is then fitted to that extent.
poly_size : tuple or None
If provided and poly is not None, specifies the target size (width, height) in pixels
for the polygon. Everything else will be scaled to fit this polygon size.
Format: (target_width, target_height) in pixels.
draw_converters : bool
If False, converter line segments are not drawn.
scale_ac_nodes_with_rs : bool
If True, AC node marker radius is increased when connected renewable sources exist.
The increase scales with total connected `np_rsgen` and `node_size_factor`.
node_size_factor : float
Multiplier used for AC node marker scaling with renewable source multiplicity.
Must be >= 0.
scale_dc_nodes_with_conv : bool
If True, DC node marker radius is increased based on total connected converter `np_conv`
and `dc_node_size_factor`.
dc_node_size_factor : float
Multiplier used for DC node marker scaling with connected converter multiplicity.
Must be >= 0.
"""
try:
import svgwrite
line_size_factor = float(line_size_factor)
if line_size_factor <= 0:
raise ValueError("line_size_factor must be > 0.")
node_size_factor = float(node_size_factor)
if node_size_factor < 0:
raise ValueError("node_size_factor must be >= 0.")
dc_node_size_factor = float(dc_node_size_factor)
if dc_node_size_factor < 0:
raise ValueError("dc_node_size_factor must be >= 0.")
# Check if all elements have geometries, if not create them
elements_without_geometry = []
# Check nodes
for node in grid.nodes_AC + grid.nodes_DC:
if not hasattr(node, 'geometry') or node.geometry is None:
elements_without_geometry.append(f"Node {node.name}")
# Check lines
for line in grid.lines_AC + grid.lines_AC_tf + grid.lines_DC + grid.lines_AC_rec + grid.lines_AC_ct + grid.lines_AC_exp:
if not hasattr(line, 'geometry') or line.geometry is None:
elements_without_geometry.append(f"Line {line.name}")
# Check converters
for conv in grid.Converters_ACDC:
if not hasattr(conv, 'geometry') or conv.geometry is None:
elements_without_geometry.append(f"Converter {conv.name}")
# Check generators and renewable sources
for gen in grid.Generators + grid.RenSources:
if not hasattr(gen, 'geometry') or gen.geometry is None:
elements_without_geometry.append(f"Generator/RenSource {gen.name}")
# If any elements are missing geometries, create them
if elements_without_geometry:
if tee:
print(f"Creating geometries for {len(elements_without_geometry)} elements without geometries...")
print("Missing geometries:", ", ".join(elements_without_geometry))
create_geometries_from_layout(grid)
if journal:
# Convert 88mm to pixels (assuming 96 DPI)
width = int(88 * 96 / 25.4) # 25.4mm = 1 inch
# Maintain aspect ratio
if square_ratio:
height = width
else:
height = int(width * 0.8) # Using 0.8 as a common aspect ratio for journal figures
if tee:
print(f"Current working directory: {os.getcwd()}")
print(f"Will save as: {os.path.abspath(f'{name}.svg')}")
# Create SVG drawing
dwg = svgwrite.Drawing(f"{name}.svg", size=(f'{width}px', f'{height}px'), profile='tiny')
# Get all geometries and their bounds
all_bounds = []
# Add lines
for line in grid.lines_AC + grid.lines_AC_tf + grid.lines_DC + grid.lines_AC_rec + grid.lines_AC_ct +grid.lines_AC_exp:
if hasattr(line, 'geometry') and line.geometry:
all_bounds.append(line.geometry.bounds)
# Add nodes
for node in grid.nodes_AC + grid.nodes_DC:
if hasattr(node, 'geometry') and node.geometry:
all_bounds.append(node.geometry.bounds)
# Add generators and renewable sources
for gen in grid.Generators + grid.RenSources:
if hasattr(gen, 'geometry') and gen.geometry:
all_bounds.append(gen.geometry.bounds)
# Add polygon bounds if provided
def _iter_polys(obj):
if obj is None:
return
if isinstance(obj, Polygon):
yield obj
elif isinstance(obj, MultiPolygon):
for poly in obj.geoms:
yield poly
elif isinstance(obj, (list, tuple)):
for o in obj:
yield from _iter_polys(o) # Recursively handle nested structures
# Calculate polygon bounds separately if poly_size is specified
poly_bounds = None
if poly is not None:
poly_bounds_list = []
for geom in _iter_polys(poly):
bounds = geom.bounds
poly_bounds_list.append(bounds)
all_bounds.append(bounds)
if poly_bounds_list:
poly_bounds = (
min(bound[0] for bound in poly_bounds_list),
min(bound[1] for bound in poly_bounds_list),
max(bound[2] for bound in poly_bounds_list),
max(bound[3] for bound in poly_bounds_list)
)
# Calculate overall bounds
if all_bounds:
minx = min(bound[0] for bound in all_bounds)
miny = min(bound[1] for bound in all_bounds)
maxx = max(bound[2] for bound in all_bounds)
maxy = max(bound[3] for bound in all_bounds)
else:
print("No geometries found to plot")
return
# Calculate scaling factors
# If poly_size is specified, determine scale from polygon FIRST, then use for everything
if poly_size is not None and poly_bounds is not None:
target_poly_width, target_poly_height = poly_size
poly_minx, poly_miny, poly_maxx, poly_maxy = poly_bounds
poly_x_range = poly_maxx - poly_minx
poly_y_range = poly_maxy - poly_miny
if poly_x_range == 0 or poly_y_range == 0:
print("Warning: Polygon has zero width or height, cannot scale")
# Fall through to normal scaling
poly_size = None
else:
# Calculate scale factors for polygon to fit target size
# Use minimum to maintain uniform scaling (circles stay circular)
scale_x_poly = target_poly_width / poly_x_range
scale_y_poly = target_poly_height / poly_y_range
scale = min(scale_x_poly, scale_y_poly) # Uniform scale for everything
padding = 25
# Calculate the overall bounds in scaled coordinates
overall_x_range = (maxx - minx) * scale
overall_y_range = (maxy - miny) * scale
# Adjust width and height to accommodate everything
width = int(overall_x_range + 2 * padding)
height = int(overall_y_range + 2 * padding)
# Update the SVG drawing size
dwg = svgwrite.Drawing(f"{name}.svg", size=(f'{width}px', f'{height}px'), profile='tiny')
# If poly_size was not set or failed, use normal scaling logic
if poly_size is None or poly_bounds is None:
if square_ratio:
padding = 10
# For square ratio: make both axes have the same range
x_range = maxx - minx
y_range = maxy - miny
max_range = max(x_range, y_range)
# Expand the smaller dimension to match the larger one
if x_range < max_range:
center_x = (minx + maxx) / 2
minx = center_x - max_range / 2
maxx = center_x + max_range / 2
if y_range < max_range:
center_y = (miny + maxy) / 2
miny = center_y - max_range / 2
maxy = center_y + max_range / 2
# Now both ranges are equal, so use the same scale for both axes
available_width = width - 2*padding
available_height = height - 2*padding
scale = min(available_width, available_height) / max_range
else:
padding = 25 # pixels of padding
x_range = maxx - minx
y_range = maxy - miny
# Uniform scale: same factor for x and y so 1 data unit in x = 1 data unit in y.
if x_range > 0 and y_range > 0:
scale_x = (width - 2 * padding) / x_range
scale_y = (height - 2 * padding) / y_range
scale = min(scale_x, scale_y)
# Fit canvas to geographic aspect ratio (no fixed 5:4 letterboxing).
width = int(x_range * scale + 2 * padding)
height = int(y_range * scale + 2 * padding)
dwg = svgwrite.Drawing(
f"{name}.svg", size=(f"{width}px", f"{height}px"), profile="tiny"
)
else:
scale_x = (width - 2 * padding) / max(x_range, 1e-12)
scale_y = (height - 2 * padding) / max(y_range, 1e-12)
scale = min(scale_x, scale_y)
def transform_coords(x, y):
"""Transform coordinates to SVG space"""
return (
padding + (x - minx) * scale,
height - (padding + (y - miny) * scale) # Flip Y axis
)
_LOADING_MODES = {'loading', 'ts_max_loading', 'ts_avg_loading'}
_is_custom_color = coloring is not None and coloring not in _LOADING_MODES
cable_type_colors = {
0: 'cyan',
1: 'magenta',
2: 'brown',
3: 'gray',
4: 'lime',
5: 'navy',
6: 'teal',
7: 'violet',
8: 'indigo',
9: 'turquoise',
10: 'beige',
11: 'coral',
12: 'salmon',
13: 'olive'
}
# Draw background polygon(s) if provided (behind lines/nodes)
if poly is not None:
for geom in _iter_polys(poly):
if isinstance(geom, Polygon):
poly_list = [geom]
elif isinstance(geom, MultiPolygon):
poly_list = list(geom.geoms)
else:
poly_list = []
for pg in poly_list:
rings = [list(pg.exterior.coords)] + [list(r.coords) for r in pg.interiors]
d = ""
for coords in rings:
pts = [transform_coords(x, y) for (x, y) in coords]
d += "M " + " L ".join(f"{x},{y}" for (x, y) in pts) + " Z "
dwg.add(dwg.path(
d=d,
fill='#ADD8E6',
stroke='#ADD8E6',
stroke_width=2,
fill_rule='evenodd',
fill_opacity=0.15
))
# Draw contour lines (exterior and interior rings)
for coords in rings:
pts = [transform_coords(x, y) for (x, y) in coords]
contour_d = "M " + " L ".join(f"{x},{y}" for (x, y) in pts) + " Z"
dwg.add(dwg.path(
d=contour_d,
fill='none',
stroke='blue',
stroke_width=1
))
# Draw LineStrings if provided
if linestrings is not None:
for linestring in linestrings:
if hasattr(linestring, 'geometry') and linestring.geometry:
coords = list(linestring.geometry.coords)
elif hasattr(linestring, 'coords'):
coords = list(linestring.coords)
else:
continue
path_data = "M "
for c in coords:
svg_x, svg_y = transform_coords(c[0], c[1])
path_data += f"{svg_x},{svg_y} L "
path_data = path_data[:-2]
dwg.add(dwg.path(d=path_data, stroke='black', stroke_width=2, fill='none'))
# Draw AC lines
for line in grid.lines_AC + grid.lines_AC_tf + grid.lines_AC_rec + grid.lines_AC_ct:
if line in grid.lines_AC_ct and getattr(line, 'active_config', 0) < 0:
continue
if hasattr(line, 'geometry') and line.geometry:
coords = list(line.geometry.coords)
path_data = "M "
for c in coords:
svg_x, svg_y = transform_coords(c[0], c[1])
path_data += f"{svg_x},{svg_y} L "
path_data = path_data[:-2] # Remove last "L "
if _is_custom_color:
color = coloring
elif coloring in _LOADING_MODES:
if coloring == 'ts_max_loading':
load_show = line.ts_max_loading
elif coloring == 'ts_avg_loading':
load_show = line.ts_avg_loading
else:
load_show = line.loading
if int(load_show) > 100:
color = 'blue'
else:
color = _loading_colormap(load_show)
color, opacity = _svg_color_and_opacity(color)
else:
if line in grid.lines_AC_rec and line.rec_branch:
color = "green"
elif line in grid.lines_AC_ct:
color = cable_type_colors.get(line.active_config, "black")
else:
color = "red" if getattr(line, 'isTf', False) else "black"
stroke_width = float(2.0 * line_size_factor)
dwg.add(dwg.path(d=path_data, stroke=color, stroke_width=stroke_width, fill='none'))
for line in grid.lines_AC_exp:
if hasattr(line, 'geometry') and line.geometry:
coords = list(line.geometry.coords)
path_data = "M "
for c in coords:
svg_x, svg_y = transform_coords(c[0], c[1])
path_data += f"{svg_x},{svg_y} L "
path_data = path_data[:-2]
if _is_custom_color:
color = coloring
elif coloring == 'loading':
map_color = _loading_colormap(min(max(line.loading,line.ts_max_loading),100))
color, opacity = _svg_color_and_opacity(map_color)
else:
if line.np_line - line.np_line_b > 0.001:
color = "orange"
else:
color = "black"
# Ensure stroke width is a plain Python float (svgwrite validator
# does not accept NumPy scalar types).
stroke_width = float((2 * float(line.np_line)) * line_size_factor)
dwg.add(dwg.path(d=path_data, stroke=color, stroke_width=stroke_width, fill='none'))
# Draw DC lines
for line in grid.lines_DC:
if hasattr(line, 'geometry') and line.geometry:
coords = list(line.geometry.coords)
path_data = "M "
for c in coords:
svg_x, svg_y = transform_coords(c[0], c[1])
path_data += f"{svg_x},{svg_y} L "
path_data = path_data[:-2]
stroke_width = float((2 * float(line.np_line)) * line_size_factor)
dwg.add(dwg.path(d=path_data, stroke='blue', stroke_width=stroke_width, fill='none'))
ac_node_rsgen_total = {}
ac_nodes_with_rs_connection = set()
if scale_ac_nodes_with_rs:
for ren_source in grid.RenSources:
node_ref = None
if hasattr(ren_source, 'Node_AC'):
node_ref = ren_source.Node_AC
elif hasattr(ren_source, 'Node'):
node_ref = ren_source.Node
if node_ref is None:
continue
ac_nodes_with_rs_connection.add(node_ref)
ac_nodes_with_rs_connection.add(str(node_ref))
rsgen_count = float(getattr(ren_source, 'np_rsgen', 0.0))
ac_node_rsgen_total[node_ref] = ac_node_rsgen_total.get(node_ref, 0.0) + rsgen_count
ac_node_rsgen_total[str(node_ref)] = ac_node_rsgen_total.get(str(node_ref), 0.0) + rsgen_count
dc_node_conv_total = {}
if scale_dc_nodes_with_conv:
for conv in grid.Converters_ACDC:
node_ref = getattr(conv, 'Node_DC', None)
if node_ref is None:
continue
conv_count = float(getattr(conv, 'np_conv', 0.0))
if conv_count <= 0:
continue
dc_node_conv_total[node_ref] = dc_node_conv_total.get(node_ref, 0.0) + conv_count
dc_node_conv_total[str(node_ref)] = dc_node_conv_total.get(str(node_ref), 0.0) + conv_count
# Draw converters
if draw_converters:
for conv in grid.Converters_ACDC:
if hasattr(conv, 'geometry') and conv.geometry:
if float(getattr(conv, 'np_conv', 0.0)) <= 0:
continue
coords = list(conv.geometry.coords)
path_data = "M "
for c in coords:
svg_x, svg_y = transform_coords(c[0], c[1])
path_data += f"{svg_x},{svg_y} L "
path_data = path_data[:-2]
stroke_width = float((2 * float(conv.np_conv)) * line_size_factor)
dwg.add(dwg.path(d=path_data, stroke='purple', stroke_width=stroke_width, fill='none'))
# Draw nodes
for node in grid.nodes_AC + grid.nodes_DC:
if hasattr(node, 'geometry') and node.geometry:
x, y = node.geometry.x, node.geometry.y
svg_x, svg_y = transform_coords(x, y)
color = "black" if isinstance(node, Node_AC) else "purple"
node_radius = 1.0
if scale_ac_nodes_with_rs and isinstance(node, Node_AC):
node_connected_to_rs = (node in ac_nodes_with_rs_connection) or (str(node.name) in ac_nodes_with_rs_connection)
total_rsgen = ac_node_rsgen_total.get(node, ac_node_rsgen_total.get(str(node.name), 0.0))
if node_connected_to_rs and total_rsgen <= 0:
continue
if node_connected_to_rs and total_rsgen > 0:
node_radius = 1.0 + float(total_rsgen) * node_size_factor
if scale_dc_nodes_with_conv and isinstance(node, Node_DC):
total_conv = dc_node_conv_total.get(node, dc_node_conv_total.get(str(node.name), 0.0))
if total_conv <= 0:
continue
if total_conv > 0:
node_radius = 1.0 + float(total_conv) * dc_node_size_factor
dwg.add(dwg.circle(center=(svg_x, svg_y), r=float(node_radius),
fill=color, stroke=color))
if grid.nct_AC != 0 and hasattr(grid.lines_AC_ct[0], 'cable_types'):
# Transform the legend position to be within the visible bounds
legend_x, legend_y = transform_coords(minx, maxy) # Use the top-left corner of the bounds
legend_spacing = 20 # Space between legend items
# Add legend title
# Add legend items
if legend:
dwg.add(dwg.text("Cable Types",
insert=(legend_x, legend_y - 10),
font_size=15,
font_family="NewComputerModernSans"))
if grid.Cable_options[0].active_config is not None:
# Only show cable types that are active (>0.9)
space = 0
for i, cable_type in enumerate(grid.lines_AC_ct[0].cable_types):
if grid.Cable_options[0].active_config[i] > 0.9:
space += 1
color = cable_type_colors.get(i, "black")
# Add colored line
dwg.add(dwg.line(start=(legend_x, legend_y + space * legend_spacing),
end=(legend_x + 30, legend_y + space * legend_spacing),
stroke=color,
stroke_width=2))
# Add text
dwg.add(dwg.text(f"{grid.lines_AC_ct[0].cable_types[i]}",
insert=(legend_x + 40, legend_y + space * legend_spacing + 5),
font_size=12,
font_family="NewComputerModernSans",
fill=color))
else:
for i, cable_type in enumerate(grid.lines_AC_ct[0].cable_types):
color = cable_type_colors.get(i, "black")
# Add colored line
dwg.add(dwg.line(start=(legend_x, legend_y + i * legend_spacing),
end=(legend_x + 30, legend_y + i * legend_spacing),
stroke=color,
stroke_width=2))
# Add text
dwg.add(dwg.text(f"{grid.lines_AC_ct[0].cable_types[i]}",
insert=(legend_x + 40, legend_y + i * legend_spacing + 5),
font_size=12,
font_family="NewComputerModernSans",
fill=color))
# Save the SVG file
dwg.save()
print(f"Network saved as {name}.svg")
except ImportError as e:
print(f"Could not save SVG: {e}. Please install svgwrite package.")
return
[docs]
def plot_model_feasibility(solver_stats,sol='all', x_axis='time', y_axis= 'objective', normalize = False,show=True, save_path=None, width_mm=None):
import matplotlib.pyplot as plt
# Respect optional width in millimeters for journal-style figures
fig = None
if width_mm is not None:
fig_w_in = width_mm / 25.4
fig_h_in = fig_w_in # square by default
fig = plt.figure(figsize=(fig_w_in, fig_h_in))
#feasible_solutions.append((time_sec, objective, iterations))
if sol == 'all':
# [time_sec, objective, cumulative_iterations, nlp_call_num, is_feasible]
solutions = solver_stats['all_solutions']
if normalize:
# Only consider objectives that have feasible flag set to True
feasible_objectives = [s[1] for s in solutions if len(s) > 4 and s[4]]
if feasible_objectives:
min_objective = min(feasible_objectives)
else:
min_objective = min(s[1] for s in solutions)
norm_solutions = []
for s in solutions:
row = list(s)
row[1] = (row[1] / min_objective - 1) * 100
norm_solutions.append(row)
solutions = norm_solutions
if x_axis == 'time':
x_data = [s[0] for s in solutions]
elif x_axis == 'iterations':
x_data = [s[2] for s in solutions]
if y_axis == 'objective':
y_axis = 'objective [%]' if normalize else 'objective'
y_data = [s[1] for s in solutions]
elif y_axis == 'iterations':
y_data = [s[2] for s in solutions]
# Separate feasible and non-feasible points
feasible_x = []
feasible_y = []
regular_x = []
regular_y = []
for i, solution in enumerate(solutions):
is_feasible = bool(solution[4]) if len(solution) > 4 else False
if is_feasible:
feasible_x.append(x_data[i])
feasible_y.append(y_data[i])
regular_x.append(x_data[i])
regular_y.append(y_data[i])
else:
regular_x.append(x_data[i])
regular_y.append(y_data[i])
# Plot regular points in default color
if regular_x:
plt.plot(regular_x, regular_y, 'o-', color='blue', label='NLP Progress')
# Plot feasible points in red
if feasible_x:
plt.plot(feasible_x, feasible_y, 'o', color='red', markersize=8, label='Feasible Solutions')
plt.xlabel(x_axis)
plt.ylabel(y_axis)
plt.grid(True)
plt.legend()
if show:
plt.show()
if save_path is not None:
plt.savefig(save_path, bbox_inches='tight')
if not show and fig is not None:
plt.close(fig)
return
else:
# [time_sec, objective, iterations]
solutions = solver_stats['feasible_solutions']
if normalize:
min_objective = min(objective for _, objective, _ in solutions)
solutions = [ (time_sec, objective/min_objective, iterations) for time_sec, objective, iterations in solutions]
if x_axis == 'time':
x_data = [time_sec for time_sec, _, _ in solutions]
elif x_axis == 'iterations':
x_data = [iterations for _, _, iterations in solutions]
if y_axis == 'objective':
y_data = [objective for _, objective, _ in solutions]
elif y_axis == 'iterations':
y_data = [iterations for _, _, iterations in solutions]
plt.plot(x_data, y_data, 'o-')
plt.xlabel(x_axis)
plt.ylabel(y_axis)
plt.grid(True)
if show:
plt.show()
if save_path is not None:
plt.savefig(save_path, bbox_inches='tight')
if not show and fig is not None:
plt.close(fig)
plt.close(fig)
return
[docs]
def plot_3D(grid, show=True, save_path=None, coloring='cable_type',
line_width=6, node_size=6, title=None,
show_unused=False, poly=None, coords_lonlat=False,
elevation_grid=None, show_elevation_surface=True,
show_elevation_points=False, elevation_opacity=0.35,
elevation_colorscale='Viridis',
show_verticals=1.0,
dev_area=None):
"""Plot the grid network in 3D using plotly.
When ``coords_lonlat=True`` (default), node positions and LineString
geometries are assumed to be in lon/lat and are converted to local
meters so that X, Y and Z (elevation) share the same unit.
Set ``coords_lonlat=True`` when coordinates are in lon/lat.
Parameters
----------
grid : Grid object
Must contain ``lines_AC_ct`` with 3D ``geometry`` (has_z=True).
show : bool
Whether to display the figure interactively in the browser.
save_path : str or None
If given, save the figure as an HTML file.
coloring : str
'cable_type' colours cables by ``active_config``;
'loading' colours by cable loading percentage.
line_width : float
Width of cable traces.
node_size : float
Marker size for nodes.
title : str or None
Figure title.
show_unused : bool
If True, draw unused cables (active_config < 0) as thin grey lines.
poly : shapely Polygon/MultiPolygon or list, optional
Development area polygon(s) to draw on the z=0 plane.
coords_lonlat : bool
If True, convert lon/lat to local meters via equirectangular
projection. If False, assume X/Y are already in meters.
elevation_grid : pandas.DataFrame or dict, optional
Optional set of elevation points to plot as a surface/plane.
Expected columns/keys: ``x``, ``y``, ``elevation``.
Coordinates are assumed to be in the same system as the plot
(if ``coords_lonlat=True``, pass lon/lat so they are converted too).
show_elevation_surface : bool
If True (default) and ``elevation_grid`` is provided, draw a triangulated
surface (Mesh3d) colored by elevation.
show_elevation_points : bool
If True, also plot the elevation points as markers.
elevation_opacity : float
Opacity for the elevation surface.
elevation_colorscale : str or list
Plotly colorscale for the elevation surface/points.
show_verticals : float
Opacity of vertical cable segments from 0 (hidden) to 1 (fully visible).
"""
cable_type_colors = [
'#00BCD4', '#E91E63', '#795548', '#9E9E9E', '#8BC34A',
'#3F51B5', '#009688', '#9C27B0', '#303F9F', '#00ACC1',
'#F5F5DC', '#FF7043', '#EF9A9A', '#827717',
]
fig = go.Figure()
# -- Coordinate projection ------------------------------------------------
if coords_lonlat:
slack = next((n for n in grid.nodes_AC if getattr(n, 'type', '') == 'Slack'), grid.nodes_AC[0])
origin_lon, origin_lat = slack.x_coord, slack.y_coord
x0, y0, zone0, letter0 = utm.from_latlon(origin_lat, origin_lon)
def _to_m(lon, lat):
lon = np.asarray(lon, dtype=float)
lat = np.asarray(lat, dtype=float)
if lon.ndim == 0:
x, y, _, _ = utm.from_latlon(float(lat), float(lon), zone0, letter0)
return (x - x0, y - y0)
xs, ys = np.empty_like(lon), np.empty_like(lat)
for i in range(len(lon)):
xi, yi, _, _ = utm.from_latlon(float(lat[i]), float(lon[i]), zone0, letter0)
xs[i], ys[i] = xi - x0, yi - y0
return (xs, ys)
else:
def _to_m(x, y):
return (x, y)
# -- Optional elevation surface / plane ----------------------------------
if elevation_grid is not None:
if isinstance(elevation_grid, pd.DataFrame):
if coords_lonlat and 'lon' in elevation_grid.columns and 'lat' in elevation_grid.columns:
ex = elevation_grid['lon'].to_numpy()
ey = elevation_grid['lat'].to_numpy()
else:
ex = elevation_grid['x'].to_numpy()
ey = elevation_grid['y'].to_numpy()
ez = elevation_grid['elevation'].to_numpy()
else:
if coords_lonlat and 'lon' in elevation_grid and 'lat' in elevation_grid:
ex = np.asarray(elevation_grid['lon'])
ey = np.asarray(elevation_grid['lat'])
else:
ex = np.asarray(elevation_grid['x'])
ey = np.asarray(elevation_grid['y'])
ez = np.asarray(elevation_grid['elevation'])
# Build prepared dev_area polygon for triangle filtering
_dev_prepared = None
if dev_area is not None:
from shapely.ops import unary_union as _union
from shapely.prepared import prep as _prep
polys = []
for p in (dev_area if isinstance(dev_area, (list, tuple)) else [dev_area]):
if isinstance(p, MultiPolygon):
polys.extend(p.geoms)
else:
polys.append(p)
_dev_prepared = _prep(_union(polys))
ex_m, ey_m = _to_m(ex, ey)
ex_m = np.asarray(ex_m).ravel()
ey_m = np.asarray(ey_m).ravel()
ez = np.asarray(ez).ravel()
if show_elevation_surface and len(ex_m) >= 3:
import matplotlib.tri as mtri
tri = mtri.Triangulation(ex_m, ey_m)
tris = tri.triangles
# Remove triangles whose centroid falls outside the dev_area
if _dev_prepared is not None and tris is not None and len(tris) > 0:
keep = np.ones(len(tris), dtype=bool)
for t_idx in range(len(tris)):
i0, i1, i2 = tris[t_idx]
cx = (ex[i0] + ex[i1] + ex[i2]) / 3.0
cy = (ey[i0] + ey[i1] + ey[i2]) / 3.0
if not _dev_prepared.contains(Point(cx, cy)):
keep[t_idx] = False
tris = tris[keep]
if tris is not None and len(tris) > 0:
fig.add_trace(go.Mesh3d(
x=ex_m, y=ey_m, z=ez,
i=tris[:, 0], j=tris[:, 1], k=tris[:, 2],
intensity=ez,
colorscale=elevation_colorscale,
opacity=float(elevation_opacity),
name='Elevation surface',
legendgroup='elevation',
showlegend=True,
showscale=False,
hoverinfo='skip',
))
if show_elevation_points and len(ex_m) > 0:
fig.add_trace(go.Scatter3d(
x=ex_m, y=ey_m, z=ez,
mode='markers',
marker=dict(size=2, color=ez, colorscale=elevation_colorscale, opacity=0.9),
name='Elevation points',
legendgroup='elevation',
showlegend=not show_elevation_surface,
hoverinfo='skip',
))
# -- Helper: extract 3D coords from LineString, convert to meters ---------
def _line_coords(geometry):
coords = list(geometry.coords)
if geometry.has_z:
result = [(*_to_m(c[0], c[1]), c[2]) for c in coords]
else:
result = [(*_to_m(c[0], c[1]), 0.0) for c in coords]
xs, ys, zs = zip(*result)
return list(xs), list(ys), list(zs)
def _split_verticals(xs, ys, zs):
"""Split a cable into (seabed, leading_vertical, trailing_vertical).
A vertical is detected where consecutive points share the same x,y.
Returns (main_xs, main_ys, main_zs), (lead_xs, lead_ys, lead_zs), (trail_xs, trail_ys, trail_zs).
Lead/trail lists are empty if no vertical exists.
"""
n = len(xs)
# Find end of leading vertical
lead_end = 0
while lead_end < n - 1 and xs[lead_end] == xs[lead_end + 1] and ys[lead_end] == ys[lead_end + 1]:
lead_end += 1
# Find start of trailing vertical
trail_start = n - 1
while trail_start > lead_end and xs[trail_start] == xs[trail_start - 1] and ys[trail_start] == ys[trail_start - 1]:
trail_start -= 1
lead = (xs[:lead_end + 1], ys[:lead_end + 1], zs[:lead_end + 1]) if lead_end > 0 else ([], [], [])
trail = (xs[trail_start:], ys[trail_start:], zs[trail_start:]) if trail_start < n - 1 else ([], [], [])
main = (xs[lead_end:trail_start + 1], ys[lead_end:trail_start + 1], zs[lead_end:trail_start + 1])
return main, lead, trail
# -- Draw cables ----------------------------------------------------------
used_configs = set()
for line in grid.lines_AC_ct:
geo = getattr(line, 'geometry', None)
if geo is None:
continue
is_used = getattr(line, 'active_config', 0) >= 0
if not is_used and not show_unused:
continue
xs, ys, zs = _line_coords(geo)
if not is_used:
color = 'lightgrey'
width = 1
legend_group = 'unused'
legend_name = 'Unused'
show_legend = legend_group not in used_configs
used_configs.add(legend_group)
elif coloring == 'loading':
load_val = getattr(line, 'loading', 0)
color = _loading_colormap(min(load_val, 100))
width = line_width
legend_group = None
legend_name = f'{line.name} ({load_val:.0f}%)'
show_legend = True
else:
cfg = getattr(line, 'active_config', 0)
color = cable_type_colors[cfg % len(cable_type_colors)]
width = line_width
cable_types = getattr(line, 'cable_types', None) or getattr(line, '_cable_types', [])
cable_type_name = cable_types[cfg] if 0 <= cfg < len(cable_types) else f'index {cfg}'
legend_group = f'type_{cable_type_name}'
legend_name = cable_type_name
show_legend = cfg not in used_configs
used_configs.add(cfg)
vertical_opacity = 1.0 if show_verticals is True else (0.0 if show_verticals is False else float(show_verticals))
vertical_opacity = max(0.0, min(1.0, vertical_opacity))
(mx, my, mz), (lx, ly, lz), (tx, ty, tz) = _split_verticals(xs, ys, zs)
fig.add_trace(go.Scatter3d(
x=mx, y=my, z=mz,
mode='lines',
line=dict(color=color, width=width),
name=legend_name,
legendgroup=legend_group,
showlegend=show_legend,
hovertext=f'{line.name}',
hoverinfo='text',
))
if vertical_opacity > 0:
for vx, vy, vz in [(lx, ly, lz), (tx, ty, tz)]:
if vx:
fig.add_trace(go.Scatter3d(
x=vx, y=vy, z=vz,
mode='lines',
line=dict(color=color, width=max(1, width // 2)),
opacity=vertical_opacity,
legendgroup=legend_group,
showlegend=False,
hoverinfo='skip',
))
# -- Draw export cables ---------------------------------------------------
_exp_added = False
for line in getattr(grid, 'lines_AC_exp', []):
geo = getattr(line, 'geometry', None)
if geo is None:
continue
xs, ys, zs = _line_coords(geo)
fig.add_trace(go.Scatter3d(
x=xs, y=ys, z=zs,
mode='lines',
line=dict(color='black', width=line_width + 1),
name='Export cable',
legendgroup='export',
showlegend=not _exp_added,
hovertext=f'{line.name}',
hoverinfo='text',
))
_exp_added = True
# -- Build node elevation lookup from line endpoints -----------------------
node_z_lookup = {}
for line in grid.lines_AC_ct + getattr(grid, 'lines_AC_exp', []):
geo = getattr(line, 'geometry', None)
if geo is None or not getattr(geo, 'has_z', False):
continue
coords = list(geo.coords)
if coords and len(coords[0]) >= 3:
fn = getattr(line.fromNode, 'name', None)
tn = getattr(line.toNode, 'name', None)
start_z = coords[0][2]
end_z = coords[-1][2]
if fn is not None and fn not in node_z_lookup:
node_z_lookup[fn] = start_z
if tn is not None and tn not in node_z_lookup:
node_z_lookup[tn] = end_z
# -- Draw nodes -----------------------------------------------------------
turbine_x, turbine_y, turbine_z, turbine_text = [], [], [], []
sub_x, sub_y, sub_z, sub_text = [], [], [], []
for node in grid.nodes_AC:
ntype = getattr(node, 'type', '')
mx, my = _to_m(node.x_coord, node.y_coord)
z = node_z_lookup.get(node.name, 0.0)
hover = f'{node.name}<br>({mx:.0f}, {my:.0f}, {z:.1f}) m'
if ntype == 'Slack':
sub_x.append(mx); sub_y.append(my); sub_z.append(z); sub_text.append(hover)
else:
turbine_x.append(mx); turbine_y.append(my); turbine_z.append(z); turbine_text.append(hover)
if turbine_x:
fig.add_trace(go.Scatter3d(
x=turbine_x, y=turbine_y, z=turbine_z,
mode='markers',
marker=dict(size=node_size, color='green', symbol='circle'),
name='Turbines',
hovertext=turbine_text,
hoverinfo='text',
))
if sub_x:
fig.add_trace(go.Scatter3d(
x=sub_x, y=sub_y, z=sub_z,
mode='markers',
marker=dict(size=node_size * 2, color='red', symbol='diamond'),
name='Substations',
hovertext=sub_text,
hoverinfo='text',
))
# -- Draw development area polygon on z=0 plane ---------------------------
if poly is not None:
def _iter_polys(obj):
if isinstance(obj, Polygon):
yield obj
elif isinstance(obj, MultiPolygon):
for p in obj.geoms:
yield p
elif isinstance(obj, (list, tuple)):
for o in obj:
yield from _iter_polys(o)
for pg in _iter_polys(poly):
ring = list(pg.exterior.coords)
pts = [_to_m(c[0], c[1]) for c in ring]
px, py = zip(*pts)
pz = [0.0] * len(px)
fig.add_trace(go.Scatter3d(
x=list(px), y=list(py), z=pz,
mode='lines',
line=dict(color='dodgerblue', width=2),
name='Dev. area',
legendgroup='dev_area',
showlegend=True,
hoverinfo='skip',
))
# -- Layout ---------------------------------------------------------------
fig.update_layout(
title=title or 'Array Cable Layout – 3D',
scene=dict(
xaxis_title='X (meters)',
yaxis_title='Y (meters)',
zaxis_title='Elevation (meters)',
),
width=900,
height=800,
template='plotly_white',
legend=dict(
itemsizing='constant',
yanchor='top', y=0.99,
xanchor='left', x=1.01,
bgcolor='rgba(255,255,255,0.8)',
bordercolor='rgba(0,0,0,0.2)',
borderwidth=1,
),
margin=dict(l=0, r=0, t=40, b=0),
)
if save_path is not None:
pio.write_html(fig, save_path)
print(f"3D plot saved to {save_path}")
if show:
fig.show(renderer='browser')
return fig
def _svg_color_and_opacity(color):
# Return (svg_color, opacity_or_None)
# Accepts '#RRGGBB', '#RRGGBBAA', (r,g,b), (r,g,b,a), 'rgba(r,g,b,a)'
if isinstance(color, (list, tuple)):
if len(color) == 4:
r, g, b, a = color
# handle 0..1 floats or 0..255 ints
if max(r, g, b) <= 1:
r, g, b = int(r*255), int(g*255), int(b*255)
if a > 1:
a = a/255.0
return f"rgb({int(r)},{int(g)},{int(b)})", float(a)
if len(color) == 3:
r, g, b = color
if max(r, g, b) <= 1:
r, g, b = int(r*255), int(g*255), int(b*255)
return f"rgb({int(r)},{int(g)},{int(b)})", None
if isinstance(color, str):
s = color.strip()
if s.startswith('#') and len(s) == 9: # #RRGGBBAA
rgb = s[:7]
a = int(s[7:9], 16) / 255.0
return rgb, a
if s.lower().startswith('rgba(') and s.endswith(')'):
parts = [p.strip() for p in s[5:-1].split(',')]
r, g, b = [int(float(x)) for x in parts[:3]]
a = float(parts[3])
return f"rgb({r},{g},{b})", a
return color, None