"""Representative-period clustering of time-series inputs.
Reduces full time-series data to representative periods (k-means, k-medoids,
Ward, PAM) for use in clustered TEP/OPF studies, with cluster weights.
Owns: clustering algorithms, cluster evaluation, and writing clusters onto the
grid.
Does not own: time-series power flow / OPF execution (see ``Time_series``).
"""
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import pairwise_distances,davies_bouldin_score
from sklearn.preprocessing import StandardScaler,RobustScaler
from kmedoids import KMedoids
import os
import json
if 'MPLBACKEND' not in os.environ:
os.environ['MPLBACKEND'] = 'Agg'
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import time as time
from pathlib import Path
from sklearn.decomposition import PCA
from .constants import DEFAULT_CLUSTERING_MAX_ITER
__all__ = ['cluster_TS',
'cluster_Kmeans',
'cluster_Ward',
'cluster_Kmedoids',
'cluster_PAM_Hierarchical',
'cluster_analysis',
'run_clustering_analysis',
'run_clustering_analysis_and_plot',
'run_elbow_analysis',
'identify_correlations',
'load_precomputed_clusters_to_grid']
# Plotting constants
FIGURE_WIDTH_CM = 8.25
FIGURE_RATIO = 6/10
CM_TO_INCHES = 2.54
LARGE_DATA_THRESHOLD = 10000
SCATTER_SIZE = 16
SAVE_DPI = 300
CORRELATION_FIGSIZE = (12, 10)
LABEL_ROTATION = 90
LEGEND_Y_POSITION = 1.15
# Algorithm constants
MAX_ITERATIONS = DEFAULT_CLUSTERING_MAX_ITER
DEFAULT_RANDOM_STATE = 42
DEFAULT_N_INIT = 10
DEFAULT_GAMMA = 1.0
# Default parameters
DEFAULT_CLUSTER_NUMBERS = [1, 4, 8, 16, 24, 48]
DEFAULT_TS_OPTIONS = [None, 0, 0.8]
DEFAULT_CORRELATION_DECISIONS = [True, '2', True]
# Additional algorithm constants
DEFAULT_CORRELATION_THRESHOLD = 0.8
DEFAULT_CV_THRESHOLD = 0
DEFAULT_SEASONAL_PERIOD_HOURS = 168
DEFAULT_TIME_RESOLUTION_HOURS = 1
# Plotting constants
TICK_LENGTH = 3
TICK_WIDTH = 0.8
INERTIA_PLOT_LIMIT = 100000
def _prepare_scaled_data(data, scaling_data, scaler_type="robust"):
if scaling_data is None:
if scaler_type == "robust":
scaler = RobustScaler()
else:
scaler = StandardScaler()
data_scaled = scaler.fit_transform(data)
else:
data_scaled, scaler = scaling_data
return data_scaled, scaler
def filter_data(grid, time_series, cv_threshold=0, central_market=None, print_details=False):
"""
Filter time series data based on type and Coefficient of Variation threshold.
Parameters:
-----------
grid : Grid object
The grid object containing time series
time_series : list
List of time series types to include
cv_threshold : float, default=0
Minimum Coefficient of Variation threshold. Time series with CV below this
will be excluded. CV = std/mean (absolute value)
central_market : str, default=None
Central market name. If provided, only time series associated with this market will be included
print_details : bool, default=True
If True, print detailed statistics of time series
Returns:
--------
pandas.DataFrame, StandardScaler, pandas.DataFrame
Filtered scaled data, scaler object, and raw data
"""
if not time_series:
time_series = [
'a_CG', # Price zone cost generation parameter a
'b_CG', # Price zone cost generation parameter b
#'c_CG', # Price zone cost generation parameter c
'PGL_min', # Price zone minimum generation limit
'PGL_max', # Price zone maximum generation limit
'price', # Price for price zones and AC nodes
'Load', # Load factor for price zones and AC nodes
'WPP', # Wind Power Plant availability
'OWPP', # Offshore Wind Power Plant availability
'SF', # Solar Farm availability
'REN' , # Generic Renewable source availability
'Wind',
'Solar'
]
if not central_market:
central_market = set(grid.Price_Zones_dic.keys())
PZ_centrals = [grid.Price_Zones[grid.Price_Zones_dic[cm]] for cm in central_market]
data = pd.DataFrame()
excluded_ts = [] # Track excluded time series
columns_to_drop = []
non_time_series = []
# Track which columns are used in clustering vs not used
used_in_clustering = set()
not_used_in_clustering = set()
# First collect all valid time series
for ts in grid.Time_series:
name = ts.name
ts_data = ts.data
if ts.type in time_series:
is_in_central = any(ts.TS_num in pz.TS_dict.values() for pz in PZ_centrals)
if not is_in_central and ts.type in ['price','Load','PGL_min','PGL_max','a_CG','b_CG','c_CG']:
columns_to_drop.append(name)
not_used_in_clustering.add(name)
else:
used_in_clustering.add(name)
else:
columns_to_drop.append(name)
non_time_series.append(name)
not_used_in_clustering.add(name)
if data.empty:
data[name] = ts_data
expected_length = len(ts_data)
else:
if len(ts_data) != expected_length:
print(f"Error: Length mismatch for time series '{name}'. Expected {expected_length}, got {len(ts_data)}. Time series not included")
continue
data[name] = ts_data
if not data.empty:
# Calculate and print statistics
stats = {}
for column in data.columns:
mean = np.mean(data[column])
std = np.std(data[column])
var = np.var(data[column])
if mean == 0:
cv = np.inf if std != 0 else 0
else:
cv = abs(std / mean)
stats[column] = {
'mean': mean,
'std': std,
'var': var,
'cv': cv
}
# Track columns excluded before CV threshold check
excluded_before_cv = not_used_in_clustering.copy()
# Filter based on CV threshold
cv_excluded_columns = set()
if cv_threshold > 0:
for column, stat in stats.items():
if stat['cv'] > cv_threshold:
excluded_ts.append((column, stat['cv']))
columns_to_drop.append(column)
# Remove from used set if it was there
used_in_clustering.discard(column)
not_used_in_clustering.add(column)
# Only add to cv_excluded_columns if it wasn't already excluded
if column not in excluded_before_cv:
cv_excluded_columns.add(column)
# Scale the remaining data after filtering
scaler = StandardScaler()
data_scaled = pd.DataFrame(
scaler.fit_transform(data),
columns=data.columns
)
if columns_to_drop:
data_scaled = data_scaled.drop(columns=columns_to_drop)
# Determine final used columns (those that remain in data_scaled)
final_used_columns = set(data_scaled.columns) if not data_scaled.empty else set()
# Create DataFrame from statistics, separated into used, cv_excluded, and other_excluded
stats_rows_used = []
stats_rows_cv_excluded = []
stats_rows_other_excluded = []
for column, stat in stats.items():
row_data = {
'Name': column,
'Mean': stat['mean'],
'Std': stat['std'],
'Var': stat['var'],
'CV': stat['cv']
}
if column in final_used_columns:
stats_rows_used.append(row_data)
elif column in cv_excluded_columns:
stats_rows_cv_excluded.append(row_data)
else:
stats_rows_other_excluded.append(row_data)
# Sort each group separately by CV
stats_rows_used.sort(key=lambda x: x['CV'])
stats_rows_cv_excluded.sort(key=lambda x: x['CV'])
stats_rows_other_excluded.sort(key=lambda x: x['CV'])
# Rename for consistency with rest of code
cv_excluded_rows = stats_rows_cv_excluded
other_excluded_rows = stats_rows_other_excluded
# Combine into one DataFrame with separator rows
all_stats_rows = stats_rows_used.copy()
if cv_excluded_rows or other_excluded_rows:
# Add CV excluded rows first (if any)
if cv_excluded_rows:
all_stats_rows.append({
'Name': '---',
'Mean': '----',
'Std': '--',
'Var': '--',
'CV': '--'
})
all_stats_rows.append({
'Name': 'Excluded',
'Mean': 'due',
'Std': ' to',
'Var': ' cv_threshold',
'CV': f'{cv_threshold:.2f}'
})
all_stats_rows.append({
'Name': '---',
'Mean': '----',
'Std': '--',
'Var': '--',
'CV': '--'
})
all_stats_rows.extend(cv_excluded_rows)
# Add other excluded rows (if any)
if other_excluded_rows:
all_stats_rows.append({
'Name': '---',
'Mean': '----',
'Std': '--',
'Var': '--',
'CV': '--'
})
all_stats_rows.append({
'Name': 'Not',
'Mean': 'used',
'Std': ' in',
'Var': ' clustering',
'CV': 'analysis'
})
all_stats_rows.append({
'Name': '---',
'Mean': '----',
'Std': '--',
'Var': '--',
'CV': '--'
})
all_stats_rows.extend(other_excluded_rows)
stats_df = pd.DataFrame(all_stats_rows)
stats_df.reset_index(drop=True, inplace=True)
# Store in grid object for later access (always save, even if not printed)
grid.Clustering_information['Time_series_statistics'] = stats_df
# Print sorted by both CV and variance
if print_details:
print("\nTime series statistics (sorted by CV):")
print(f"{'Name':20} {'Mean':>12} {'Std':>12} {'Var':>12} {'CV':>12}")
print("-" * 70)
# Print used time series
for row in stats_rows_used:
print(f"{row['Name']:20} {row['Mean']:12.6f} {row['Std']:12.6f} {row['Var']:12.6f} {row['CV']:12.6f}")
# Print CV excluded time series
if cv_excluded_rows:
print("-" * 70)
print(f"Excluded due to cv_threshold ({cv_threshold:.2f}):")
print(f"{'Name':20} {'Mean':>12} {'Std':>12} {'Var':>12} {'CV':>12}")
print("-" * 70)
for row in cv_excluded_rows:
print(f"{row['Name']:20} {row['Mean']:12.6f} {row['Std']:12.6f} {row['Var']:12.6f} {row['CV']:12.6f}")
# Print other excluded time series
if other_excluded_rows:
print("-" * 70)
print("Time series not used in clustering:")
print(f"{'Name':20} {'Mean':>12} {'Std':>12} {'Var':>12} {'CV':>12}")
print("-" * 70)
for row in other_excluded_rows:
print(f"{row['Name']:20} {row['Mean']:12.6f} {row['Std']:12.6f} {row['Var']:12.6f} {row['CV']:12.6f}")
if columns_to_drop and print_details:
print(f"\nExcluded {len(excluded_ts)} time series with CV below {cv_threshold}:")
for name, cv in excluded_ts:
print(f"- {name}: CV = {cv:.6f}")
print(f"\nExcluded {len(non_time_series)} for being outside of user defined time series: {time_series}")
for name in non_time_series:
print(f"- {name}")
print(f"\nExcluded {len(columns_to_drop)-len(excluded_ts)-len(non_time_series)} time series not in central market {central_market}:")
for name in columns_to_drop:
if name not in excluded_ts and name not in non_time_series:
print(f"- {name}")
if print_details:
if data.empty:
print("Warning: No time series passed the filtering criteria")
else:
print(f"\nIncluded {len(data_scaled.columns)} time series in analysis")
return data_scaled, scaler, data
[docs]
def identify_correlations(grid,time_series=None, correlation_threshold=0,cv_threshold=0,central_market=None,print_details=False,correlation_decisions=None):
"""
Identify highly correlated time series variables.
Parameters:
grid: Grid object containing time series
correlation_threshold: Correlation coefficient threshold (default: 0.8)
cv_threshold: Minimum variance threshold (default: 0)
Returns:
dict: Dictionary containing:
- correlation_matrix: Full correlation matrix
- high_correlations: List of tuples (var1, var2, corr_value) for highly correlated pairs
- groups: List of groups of correlated variables
"""
data_scaled,scaler, data = filter_data(grid,time_series,cv_threshold,central_market,print_details)
groups = []
high_corr = []
corr_matrix = None
if correlation_decisions is not None and (not correlation_decisions or correlation_decisions[0]):
if correlation_threshold > 0:
# Calculate correlation matrix
corr_matrix = data_scaled.corr()
high_corr = []
corr_stack = corr_matrix.stack()
upper_triangle = corr_stack[corr_stack.index.get_level_values(0) < corr_stack.index.get_level_values(1)]
high_corr_filtered = upper_triangle[abs(upper_triangle) > correlation_threshold]
high_corr = [(var1, var2, abs(corr)) for (var1, var2), corr in high_corr_filtered.items()]
groups = []
used_vars = set()
for var1, var2, corr in high_corr:
# Find if any existing group contains either variable
found_group = False
for group in groups:
if var1 in group or var2 in group:
group.add(var1)
group.add(var2)
found_group = True
break
# If no existing group found, create new group
if not found_group:
groups.append({var1, var2})
used_vars.add(var1)
used_vars.add(var2)
# Print results
if print_details:
print(f"\nHighly correlated variables (|correlation| > {correlation_threshold}):")
for var1, var2, corr in high_corr:
print(f"{var1:20} - {var2:20}: {corr:.3f}")
if print_details:
print("\nCorrelated groups:")
for i, group in enumerate(groups, 1):
print(f"Group {i}: {', '.join(sorted(group))}")
if correlation_decisions is None or correlation_decisions == []:
raise ValueError(
"Correlated groups found but no correlation_decisions provided. "
"Pass correlation_decisions=[clean_groups: bool, method: str, scale_groups: bool] "
"e.g. [True, '2', True]"
)
else:
clean_groups = correlation_decisions[0]
method = correlation_decisions[1]
scale_groups = correlation_decisions[2]
columns_to_drop = []
if clean_groups:
if method == '1' or method == 1:
for group in groups:
group_list = list(group)
group_variances = data[group_list].var()
max_var_col = group_variances.idxmax()
if print_details:
print("\nUsing highest variance method:")
print(f"\nGroup: {group_list}")
print(f"Variances: {group_variances}")
print(f"Keeping: {max_var_col} (variance: {group_variances[max_var_col]:.2f})")
if scale_groups:
scaling_factor = np.sqrt(len(group_list))
if print_details:
print(f"Scaling by sqrt({len(group_list)}) = {scaling_factor:.2f}")
data_scaled[max_var_col] *= scaling_factor
columns_to_drop.extend([col for col in group_list if col != max_var_col])
elif method == '2' or method == 2:
for group in groups:
group_list = list(group)
group_data = data_scaled[group_list]
# Apply PCA
pca = PCA(n_components=1)
pc1 = pca.fit_transform(group_data)
# Create new column name
new_col = f"PC1_{'_'.join(group_list)}"
if print_details:
print("\nUsing PCA with new components:")
print(f"\nGroup: {group_list}")
print(f"Creating new component: {new_col}")
print(f"Explained variance ratio: {pca.explained_variance_ratio_[0]:.2%}")
# Scale PC if requested
if scale_groups:
scaling_factor = np.sqrt(len(group_list))
if print_details:
print(f"Scaling by sqrt({len(group_list)}) = {scaling_factor:.2f}")
pc1 *= scaling_factor
data_scaled[new_col] = pc1.ravel()
columns_to_drop.extend(group_list)
elif method == '3' or method == 3:
for group in groups:
group_list = list(group)
group_data = data_scaled[group_list]
# Apply PCA
pca = PCA(n_components=1)
pc1 = pca.fit_transform(group_data)
# Find variable most correlated with PC1
correlations = [np.corrcoef(pc1.ravel(), group_data[col])[0,1] for col in group_list]
max_cor_idx = np.argmax(np.abs(correlations))
max_cor_col = group_list[max_cor_idx]
if print_details:
print("\nUsing PCA representative method:")
print(f"\nGroup: {group_list}")
print(f"PC1 explained variance ratio: {pca.explained_variance_ratio_[0]:.2%}")
print(f"Correlations with PC1: {dict(zip(group_list, correlations))}")
print(f"Keeping: {max_cor_col} (correlation: {correlations[max_cor_idx]:.2f})")
if scale_groups:
scaling_factor = np.sqrt(len(group_list))
if print_details:
print(f"Scaling by sqrt({len(group_list)}) = {scaling_factor:.2f}")
data_scaled[max_cor_col] *= scaling_factor
columns_to_drop.extend([col for col in group_list if col != max_cor_col])
if print_details:
print(f"\nDropping {len(columns_to_drop)} columns from scaled data: {columns_to_drop}")
data_scaled = data_scaled.drop(columns=columns_to_drop)
return [data_scaled,scaler, data], {
'correlation_matrix': corr_matrix,
'high_correlations': high_corr,
'groups': groups
}
def plot_time_series(data,labels,var_name,n_clusters,save_path,format,identifier,algo):
# Convert 8.25 cm to inches and maintain ratio
width_inches = FIGURE_WIDTH_CM / CM_TO_INCHES
if len(data) > LARGE_DATA_THRESHOLD:
width_inches = width_inches * 2
# Set publication-quality plotting parameters
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
'figure.figsize': (width_inches, width_inches*FIGURE_RATIO),
'font.family': 'serif',
'font.size': 8,
'axes.labelsize': 8,
'axes.titlesize': 8,
'xtick.labelsize': 7,
'ytick.labelsize': 7,
'legend.fontsize': 7,
'lines.markersize': 4,
'lines.linewidth': 1,
'grid.alpha': 0.3
})
# Create figure and apply formatting
fig, ax = plt.subplots()
max_colors = 8 # Set2 colormap has 8 distinct colors
colors = plt.cm.Set2(np.linspace(0, 1, max_colors))
markers = ['+', 'x', '*', '^', 'v', '<', '>', 's'] # Backup markers when colors repeat
# Plot clusters with consistent styling
for i in range(n_clusters):
mask = labels == i
time_points = np.arange(len(data))[mask]
values = data.loc[mask, var_name]
# If we've exceeded the number of colors, start cycling markers
current_marker = '+' if i < max_colors else markers[((i - max_colors) % len(markers))]
ax.scatter(time_points, values,
marker=current_marker,
color=colors[i % max_colors],
alpha=.8,
s=SCATTER_SIZE)
# Format axes
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.grid(True, linestyle='--', alpha=0.3)
ax.tick_params(direction='out', length=TICK_LENGTH, width=TICK_WIDTH)
ax.set_xlabel('Time')
ax.set_ylabel(f'Value (standardized) of {var_name}')
ax.set_title(f'Time Series Clustering\n{algo}, {n_clusters} clusters')
plt.tight_layout()
# Save plot with consistent settings
plt.savefig(f'{save_path}/timeseries_clustering_{algo}_{n_clusters}_{identifier}.{format}',
dpi=SAVE_DPI,
bbox_inches='tight',
pad_inches=0.1)
plt.close()
def plot_correlation_matrix(corr_matrix, save_path=None):
"""
Plot correlation matrix as a heatmap.
Parameters:
corr_matrix: Pandas DataFrame with correlation matrix
save_path: Path to save the plot (optional)
"""
plt.figure(figsize=CORRELATION_FIGSIZE)
# Create heatmap
plt.imshow(corr_matrix, cmap='RdBu', aspect='equal', vmin=-1, vmax=1)
# Add labels
plt.colorbar()
plt.xticks(range(len(corr_matrix.columns)), corr_matrix.columns, rotation=LABEL_ROTATION)
plt.yticks(range(len(corr_matrix.columns)), corr_matrix.columns)
# Vectorized text placement
n_cols = len(corr_matrix.columns)
for i, j in np.ndindex(n_cols, n_cols):
plt.text(j, i, f'{corr_matrix.iloc[i, j]:.2f}',
ha='center', va='center')
plt.title('Correlation Matrix of Time Series')
plt.tight_layout()
if save_path:
plt.savefig(save_path)
plt.close()
[docs]
def cluster_TS(grid, n_clusters, time_series=None, central_market=None, algorithm='kmeans',
cv_threshold=0, correlation_threshold=0.8, print_details=False,
correlation_decisions=None, critical_idx=None, base_critical_ratio=0.5, scaler_type='robust',
forced_centers=None, **kwargs):
"""Cluster time-series profiles into representative operating states.
Runs correlation-based reduction (:func:`identify_correlations`) and then the
selected clustering algorithm, optionally weighting a set of "critical"
rows more heavily.
Parameters
----------
grid : Grid
Grid whose time series are clustered.
n_clusters : int
Number of representative states (clusters) to produce.
time_series, central_market : list, optional
Time-series selection and central-market references.
algorithm : str, optional
One of ``'kmeans'``, ``'kmedoids'``, ``'ward'``, ``'pam_hierarchical'``
(default ``'kmeans'``).
cv_threshold, correlation_threshold : float, optional
Coefficient-of-variation and correlation thresholds for reduction.
critical_idx : list, optional
Indices treated as critical (clustered separately).
base_critical_ratio : float or int, optional
Fraction (or count) of clusters reserved for critical rows.
scaler_type : str, optional
Scaler used before clustering (default ``'robust'``).
**kwargs
Extra algorithm-specific options, e.g. ``random_state``, ``n_init``,
``max_iter`` (kmeans) or ``method``, ``init``, ``metric`` (kmedoids).
"""
algorithm = algorithm.lower()
valid_algorithms = {'kmeans', 'kmeans_medoids', 'ward', 'pam_hierarchical', 'kmedoids'}
if algorithm not in valid_algorithms:
print(f"Algorithm {algorithm} not found, using Kmeans")
algorithm='kmeans'
[data_scaled,scaler, data],_ = identify_correlations(grid,time_series=time_series,
correlation_threshold=correlation_threshold,
cv_threshold=cv_threshold,
central_market=central_market,
print_details=print_details,
correlation_decisions=correlation_decisions)
if critical_idx and n_clusters > 1:
idx_all = data.index
crit_rows = [i for i in critical_idx if i in idx_all]
rest_rows = [i for i in idx_all if i not in crit_rows]
if isinstance(base_critical_ratio, int):
n_crit = base_critical_ratio
n_rest = n_clusters - n_crit
else:
n_crit = max(1, min(int(round(n_clusters * base_critical_ratio)), len(crit_rows)))
n_rest = max(1, min(n_clusters - n_crit, len(rest_rows)))
if n_rest < 1 and len(rest_rows) > 0:
n_rest, n_crit = 1, max(1, min(n_clusters - 1, len(crit_rows)))
data_crit = data.loc[crit_rows, :]
data_rest = data.loc[rest_rows, :]
data_scaled_crit = data_scaled.loc[crit_rows, :]
data_scaled_rest = data_scaled.loc[rest_rows, :]
n_crit,crit_clusters, crit_returns, crit_info = _run_clustering_algorithm(grid, n_crit, data_crit, data_scaled_crit, scaler, print_details, algorithm, scaler_type, **kwargs)
if n_rest > 0 :
n_rest,rest_clusters, rest_returns, rest_info = _run_clustering_algorithm(grid, n_rest, data_rest, data_scaled_rest, scaler, print_details, algorithm, scaler_type, **kwargs)
n_clusters = n_crit + n_rest
clusters = pd.concat([crit_clusters, rest_clusters],ignore_index=True)
returns = crit_returns + rest_returns
_, labels_crit = crit_info
_, labels_rest = rest_info
labels_rest_off = labels_rest + n_crit
labels = np.empty(len(data), dtype=labels_rest.dtype)
labels[crit_rows] = labels_crit
labels[rest_rows] = labels_rest_off
clusters['Weight'] = clusters['Cluster Count'].astype(float) / (len(crit_rows)+len(rest_rows))
else:
n_clusters,clusters, returns, data_info=n_crit,crit_clusters, crit_returns, crit_info
data_scaled, labels = data_info
else:
n_clusters,clusters, returns, data_info = _run_clustering_algorithm(
grid,
n_clusters,
data,
data_scaled,
scaler,
print_details,
algorithm,
forced_centers=forced_centers,
**kwargs,
)
data_scaled, labels = data_info
grid.Clusters[n_clusters] = {}
grid.Clusters[n_clusters]['Weight'] = clusters['Weight'].to_numpy(dtype=float)
grid.Clusters[n_clusters]['Cluster Count'] = clusters['Cluster Count'].values
cluster_idx = {k: np.where(labels == k)[0].tolist() for k in np.unique(labels)}
grid.Clusters[n_clusters]['Cluster idx'] = cluster_idx
grid.Clusters[n_clusters]['Labels'] = labels
grid.Clusters[n_clusters]['Representatives'] = clusters.copy()
for ts in grid.Time_series:
if not hasattr(ts, 'data_clustered') or not isinstance(ts.data_clustered, dict):
ts.data_clustered = {}
name = ts.name
ts.data_clustered[n_clusters] = clusters[name].to_numpy(dtype=float)
data_info = [data, data_scaled, labels]
return n_clusters, clusters, returns, data_info
def _run_clustering_algorithm(grid, n_clusters, data, data_scaled, scaler, print_details, algorithm, scaler_type='robust', **kwargs):
forced_centers = kwargs.pop('forced_centers', None)
if algorithm == 'kmeans':
use_medoids = kwargs.pop('use_medoids', False)
clusters, returns, data_info = cluster_Kmeans(grid, n_clusters, data, [data_scaled, scaler],
print_details=print_details, use_medoids=use_medoids, scaler_type=scaler_type, **kwargs)
elif algorithm == 'kmeans_medoids':
clusters, returns, data_info = cluster_Kmeans(grid, n_clusters, data, [data_scaled, scaler],
print_details=print_details, use_medoids=True, scaler_type=scaler_type,
forced_centers=forced_centers, **kwargs)
elif algorithm == 'ward':
clusters, returns, data_info = cluster_Ward(grid, n_clusters, data, [data_scaled, scaler],
print_details=print_details, scaler_type=scaler_type, **kwargs)
elif algorithm == 'kmedoids':
clusters, returns, data_info = cluster_Kmedoids(grid, n_clusters, data, [data_scaled, scaler],
print_details=print_details, scaler_type=scaler_type,
forced_centers=forced_centers, **kwargs)
elif algorithm == 'pam_hierarchical':
clusters, returns, data_info = cluster_PAM_Hierarchical(grid, n_clusters, data, [data_scaled, scaler],
print_details=print_details, scaler_type=scaler_type, **kwargs)
else:
raise ValueError(f"Unsupported clustering algorithm: {algorithm}")
return n_clusters,clusters, returns, data_info
def _process_clusters(grid, data, cluster_centers, representative_indices=None):
"""
Process clustering results and update grid with cluster information.
Parameters:
-----------
grid : pyflow_acdc.Grid
The power system grid object to be updated with cluster information
data : pandas.DataFrame
Time series data used for clustering
cluster_centers : numpy.ndarray
Array containing the centroids/medoids of each cluster
representative_indices : array-like, optional
Original data row indices of the representative points (medoids).
For k-medoids these are the actual time-step indices; for centroid-
based methods this is None (centroids are synthetic).
Returns:
--------
clusters : pandas.DataFrame
DataFrame with cluster representatives, counts, weights, and
optionally the representative index.
"""
new_columns = [col for col in data.columns if col != 'Cluster']
n_clusters = len(cluster_centers)
# Create DataFrame with cluster centers
clusters = pd.DataFrame(cluster_centers, columns=new_columns)
# Calculate cluster counts and weights
# Filter out noise points (negative labels) for density-based clustering
cluster_counts = data['Cluster'].value_counts().sort_index()
cluster_counts = cluster_counts[cluster_counts.index >= 0] # Remove noise points (-1)
total_count = len(data[data['Cluster'] >= 0]) # Only count non-noise points for weights
cluster_weights = cluster_counts / total_count if total_count > 0 else cluster_counts * 0
# Add counts and weights to clusters DataFrame
clusters.insert(0, 'Cluster Count', cluster_counts.values)
clusters.insert(1, 'Weight', cluster_weights.values)
if representative_indices is not None:
clusters.insert(0, 'Rep. Index', representative_indices)
return clusters
def _validate_forced_centers(data, n_clusters, forced_centers):
if forced_centers is None:
return []
forced_list = list(forced_centers)
forced_unique = list(dict.fromkeys(forced_list))
if len(forced_unique) > n_clusters:
raise ValueError(
f"forced_centers has {len(forced_unique)} unique indices, larger than n_clusters={n_clusters}"
)
missing = [idx for idx in forced_unique if idx not in data.index]
if missing:
raise ValueError(
"forced_centers contains indices not present in clustering data: "
+ ", ".join(str(x) for x in missing[:10])
+ ("..." if len(missing) > 10 else "")
)
return forced_unique
def _assign_to_representatives(data, data_scaled, representative_indices):
data_scaled_arr = np.asarray(data_scaled)
rep_pos = [data.index.get_loc(idx) for idx in representative_indices]
rep_scaled = data_scaled_arr[rep_pos, :]
diff = data_scaled_arr[:, None, :] - rep_scaled[None, :, :]
dist2 = np.sum(diff * diff, axis=2)
labels = np.argmin(dist2, axis=1).astype(int)
inertia = float(np.sum(dist2[np.arange(len(labels)), labels]))
return labels, inertia
def cluster_Kmedoids(grid, n_clusters, data, scaling_data =None, method='fasterpam',
init='build', max_iter=MAX_ITERATIONS, print_details=False,
random_state=None, metric='euclidean', scaler_type="robust", forced_centers=None):
"""
Perform K-Medoids clustering on the data.
Parameters:
-----------
grid : Grid object
The grid object to update
n_clusters : int
Number of clusters
data : pandas.DataFrame
Data to cluster
scaling_data : tuple, optional
(scaled_data, scaler) if already scaled
method : str, default='alternate'
{'alternate', 'pam', 'fasterpam', 'fastpam1'} Algorithm to use
init : str, default='build'
{'random', 'heuristic', 'k-medoids++', 'build', 'first'} Initialization method
max_iter : int, default=MAX_ITERATIONS
Maximum number of iterations
print_details : bool, default=False
Whether to print detailed clustering information
random_state : int, optional
Random state for reproducibility
metric : str, default='euclidean'
Distance metric ('euclidean', 'manhattan', 'cosine', etc.)
"""
data_scaled, scaler = _prepare_scaled_data(data, scaling_data, scaler_type)
data_scaled = np.asarray(data_scaled) # single-point coercion
forced = _validate_forced_centers(data, n_clusters, forced_centers)
if forced:
n_remaining = n_clusters - len(forced)
start_time = time.perf_counter()
learned_indices = []
if n_remaining > 0:
rest_mask = ~data.index.isin(forced)
rest_data = data.loc[rest_mask]
if len(rest_data) < n_remaining:
raise ValueError(
f"Not enough non-forced samples ({len(rest_data)}) for n_remaining={n_remaining}"
)
rest_scaled = np.asarray(data_scaled)[np.asarray(rest_mask), :]
kmedoids = KMedoids(
n_clusters=n_remaining,
method=method,
init=init,
max_iter=max_iter,
random_state=random_state,
metric=metric
)
kmedoids.fit(rest_scaled)
learned_indices = list(rest_data.index[kmedoids.medoid_indices_])
rep_indices = list(forced) + learned_indices
labels, inertia = _assign_to_representatives(data, data_scaled, rep_indices)
time_taken = time.perf_counter() - start_time
cluster_centers = data.loc[rep_indices].values
else:
# Fit KMedoids on scaled data
kmedoids = KMedoids(
n_clusters=n_clusters,
method=method,
init=init,
max_iter=max_iter,
random_state=random_state,
metric=metric
)
# Time only the actual clustering execution
start_time = time.perf_counter()
labels = kmedoids.fit_predict(data_scaled)
time_taken = time.perf_counter() - start_time
# Get medoid indices
medoid_indices = kmedoids.medoid_indices_
rep_indices = list(data.index[medoid_indices])
# Get cluster centers (medoids) in original scale
cluster_centers = data.iloc[medoid_indices].values
inertia = float(kmedoids.inertia_)
# Print clustering results
cluster_sizes = pd.Series(labels).value_counts().sort_index().values
specific_info = {
"Scaler": scaler_type,
"Cluster sizes": cluster_sizes,
"Method": method,
"Initialization": init,
"Metric": metric,
"Inertia": inertia
}
if forced:
specific_info["Forced centers"] = [int(x) if isinstance(x, (np.integer, int)) else x for x in forced]
# Calculate CoV from cluster sizes
CoV = np.std(cluster_sizes)/np.mean(cluster_sizes) if len(cluster_sizes) > 0 else 0
# Evaluate clustering quality
try:
db_metrics = _evaluate_clustering(data_scaled, labels)
specific_info['Davies-Bouldin (combined)'] = db_metrics['davies_bouldin_combined']
specific_info['Davies-Bouldin (value)'] = db_metrics['davies_bouldin_value']
specific_info['Davies-Bouldin (seasonal)'] = db_metrics['davies_bouldin_seasonal']
except ValueError as e:
# If evaluation fails, continue without DB metrics
if print_details:
print(f"Warning: Could not evaluate clustering quality: {e}")
# Save clustering results to grid
save_clustering_results(grid, "K-medoids", n_clusters, specific_info, CoV, time_taken)
if print_details:
print_clustering_results("K-medoids", n_clusters, specific_info)
data['Cluster'] = labels
processed_results = _process_clusters(grid, data, cluster_centers, representative_indices=rep_indices)
return processed_results, [CoV, inertia], [data_scaled, labels]
def cluster_Kmeans(grid, n_clusters, data, scaling_data=None, print_details=False, use_medoids=False, scaler_type='robust', forced_centers=None):
"""
Perform K-means clustering on the data.
Parameters:
-----------
grid : Grid object
The grid object to update
n_clusters : int
Number of clusters
data : pandas.DataFrame
Data to cluster (filtered columns only)
scaling_data : tuple, optional
(scaled_data, scaler) if already scaled
print_details : bool
Whether to print clustering details
use_medoids : bool, default=False
If True, use actual data points (medoids) as cluster centers
If False, use means (centroids) as cluster centers
"""
data_scaled, scaler = _prepare_scaled_data(data, scaling_data,scaler_type)
forced = []
if use_medoids:
forced = _validate_forced_centers(data, n_clusters, forced_centers)
kmeans_inertia = None
kmeans_n_iter = None
if use_medoids and forced:
n_remaining = n_clusters - len(forced)
start_time = time.perf_counter()
learned_indices = []
if n_remaining > 0:
rest_mask = ~data.index.isin(forced)
rest_data = data.loc[rest_mask]
if len(rest_data) < n_remaining:
raise ValueError(
f"Not enough non-forced samples ({len(rest_data)}) for n_remaining={n_remaining}"
)
rest_scaled = np.asarray(data_scaled)[np.asarray(rest_mask), :]
kmeans_rest = KMeans(n_clusters=n_remaining)
rest_labels = kmeans_rest.fit_predict(rest_scaled)
kmeans_inertia = float(kmeans_rest.inertia_)
kmeans_n_iter = int(kmeans_rest.n_iter_)
for i in range(n_remaining):
cluster_data = rest_data[rest_labels == i]
from sklearn.metrics import pairwise_distances
distances = pairwise_distances(cluster_data)
medoid_idx = distances.sum(axis=1).argmin()
learned_indices.append(cluster_data.index[medoid_idx])
rep_indices = list(forced) + learned_indices
labels, forced_inertia = _assign_to_representatives(data, data_scaled, rep_indices)
time_taken = time.perf_counter() - start_time
else:
# Fit KMeans on scaled data (filtered columns)
kmeans = KMeans(n_clusters=n_clusters)
# Time only the actual clustering execution
start_time = time.perf_counter()
labels = kmeans.fit_predict(data_scaled)
time_taken = time.perf_counter() - start_time
kmeans_inertia = float(kmeans.inertia_)
kmeans_n_iter = int(kmeans.n_iter_)
if use_medoids and forced:
rep_indices = list(rep_indices)
cluster_centers = data.loc[rep_indices].values
else:
all_centers = []
rep_indices = [] if use_medoids else None
for i in range(n_clusters):
cluster_mask = labels == i
cluster_data = data[cluster_mask]
if use_medoids:
# Use medoid (actual data point closest to cluster center)
from sklearn.metrics import pairwise_distances
distances = pairwise_distances(cluster_data)
medoid_idx = distances.sum(axis=1).argmin()
cluster_center = cluster_data.iloc[medoid_idx]
all_centers.append(cluster_center.values)
rep_indices.append(cluster_data.index[medoid_idx])
else:
# Use mean (centroid)
cluster_means = cluster_data.mean()
all_centers.append(cluster_means.values)
cluster_centers = np.array(all_centers)
# Print clustering results
cluster_label = "K-means-medoids" if use_medoids else "K-means"
cluster_sizes = pd.Series(labels).value_counts().sort_index().values
specific_info = {
"Scaler": scaler_type,
"Cluster sizes": cluster_sizes,
"Inertia": kmeans_inertia if kmeans_inertia is not None else float('nan'),
"n_iter": kmeans_n_iter if kmeans_n_iter is not None else -1,
"Center type": "medoids" if use_medoids else "means"
}
if use_medoids and forced:
specific_info["Forced centers"] = [int(x) if isinstance(x, (np.integer, int)) else x for x in forced]
# Calculate CoV from cluster sizes
CoV = np.std(cluster_sizes)/np.mean(cluster_sizes) if len(cluster_sizes) > 0 else 0
# Evaluate clustering quality
try:
db_metrics = _evaluate_clustering(data_scaled, labels)
specific_info['Davies-Bouldin (combined)'] = db_metrics['davies_bouldin_combined']
specific_info['Davies-Bouldin (value)'] = db_metrics['davies_bouldin_value']
specific_info['Davies-Bouldin (seasonal)'] = db_metrics['davies_bouldin_seasonal']
except ValueError as e:
# If evaluation fails, continue without DB metrics
if print_details:
print(f"Warning: Could not evaluate clustering quality: {e}")
# Save clustering results to grid
save_clustering_results(grid, cluster_label, n_clusters, specific_info, CoV, time_taken)
if print_details:
print_clustering_results(cluster_label, n_clusters, specific_info)
data['Cluster'] = labels
processed_results = _process_clusters(grid, data, cluster_centers, representative_indices=rep_indices)
inertia_out = kmeans_inertia if kmeans_inertia is not None else forced_inertia
n_iter_out = kmeans_n_iter if kmeans_n_iter is not None else -1
return processed_results, [CoV, inertia_out, n_iter_out], [data_scaled, labels]
def cluster_Ward(grid, n_clusters, data, scaling_data=None, print_details=False, scaler_type='robust'):
"""
Perform Ward's hierarchical clustering using AgglomerativeClustering.
Parameters:
-----------
grid : Grid object
The grid object to update
n_clusters : int
Number of clusters
data : pandas.DataFrame
Data to cluster
"""
data_scaled, scaler = _prepare_scaled_data(data, scaling_data,scaler_type)
# Fit clustering
ward = AgglomerativeClustering(
n_clusters=n_clusters,
linkage='ward',
compute_distances=True
)
# Time only the actual clustering execution
start_time = time.perf_counter()
labels = ward.fit_predict(data_scaled)
time_taken = time.perf_counter() - start_time
# Calculate cluster centers
all_centers = []
for i in range(n_clusters):
cluster_mask = labels == i
cluster_means = data[cluster_mask].mean()
all_centers.append(cluster_means)
cluster_centers = np.array(all_centers)
# Get cluster sizes
cluster_sizes = pd.Series(labels).value_counts().sort_index().values
# Get additional metrics
distances = ward.distances_
specific_info = {
"Scaler": scaler_type,
"Cluster sizes": cluster_sizes,
"Maximum merge distance": float(max(distances)) if len(distances) > 0 else 0,
"Average merge distance": float(np.mean(distances)) if len(distances) > 0 else 0
}
# Calculate CoV from cluster sizes
CoV = np.std(cluster_sizes)/np.mean(cluster_sizes) if len(cluster_sizes) > 0 else 0
# Evaluate clustering quality
try:
db_metrics = _evaluate_clustering(data_scaled, labels)
specific_info['Davies-Bouldin (combined)'] = db_metrics['davies_bouldin_combined']
specific_info['Davies-Bouldin (value)'] = db_metrics['davies_bouldin_value']
specific_info['Davies-Bouldin (seasonal)'] = db_metrics['davies_bouldin_seasonal']
except ValueError as e:
# If evaluation fails, continue without DB metrics
if print_details:
print(f"Warning: Could not evaluate clustering quality: {e}")
# Save clustering results to grid
save_clustering_results(grid, "Ward hierarchical", n_clusters, specific_info, CoV, time_taken)
if print_details:
print_clustering_results("Ward hierarchical", n_clusters, specific_info)
data['Cluster'] = labels
processed_results = _process_clusters(grid, data, cluster_centers)
return processed_results, CoV, [data_scaled, labels]
def cluster_PAM_Hierarchical(grid, n_clusters, data, scaling_data=None, print_details=False, scaler_type='robust'):
"""
Perform PAM-based hierarchical clustering using AgglomerativeClustering.
Parameters:
-----------
grid : Grid object
The grid object to update
n_clusters : int
Number of clusters
data : pandas.DataFrame
Data to cluster
scaling_data : tuple, optional
(scaled_data, scaler) if already scaled
print_details : bool
Whether to print clustering details
"""
data_scaled, scaler = _prepare_scaled_data(data, scaling_data,scaler_type)
# Fit clustering using manhattan distance (typical for PAM)
HierarchicalMedoid = AgglomerativeClustering(
n_clusters=n_clusters,
linkage='average',
metric='manhattan',
compute_distances=True
)
# Time only the actual clustering execution
start_time = time.perf_counter()
labels = HierarchicalMedoid.fit_predict(data_scaled)
time_taken = time.perf_counter() - start_time
# Find medoid indices for all clusters
medoid_indices = []
for i in range(n_clusters):
cluster_mask = labels == i
cluster_data = data[cluster_mask]
if len(cluster_data) > 0:
distances = pairwise_distances(
cluster_data,
metric='manhattan'
)
medoid_idx = cluster_data.index[distances.sum(axis=1).argmin()]
medoid_indices.append(medoid_idx)
# Get cluster centers using medoid indices
cluster_centers = data.iloc[medoid_indices].values
# Get cluster sizes
cluster_sizes = pd.Series(labels).value_counts().sort_index().values
# Get additional metrics
distances = HierarchicalMedoid.distances_
specific_info = {
"Scaler": scaler_type,
"Cluster sizes": cluster_sizes,
"Maximum merge distance": float(max(distances)) if len(distances) > 0 else 0,
"Average merge distance": float(np.mean(distances)) if len(distances) > 0 else 0
}
# Calculate CoV from cluster sizes
CoV = np.std(cluster_sizes)/np.mean(cluster_sizes) if len(cluster_sizes) > 0 else 0
# Evaluate clustering quality
try:
db_metrics = _evaluate_clustering(data_scaled, labels)
specific_info['Davies-Bouldin (combined)'] = db_metrics['davies_bouldin_combined']
specific_info['Davies-Bouldin (value)'] = db_metrics['davies_bouldin_value']
specific_info['Davies-Bouldin (seasonal)'] = db_metrics['davies_bouldin_seasonal']
except ValueError as e:
# If evaluation fails, continue without DB metrics
if print_details:
print(f"Warning: Could not evaluate clustering quality: {e}")
# Save clustering results to grid
save_clustering_results(grid, "PAM hierarchical", n_clusters, specific_info, CoV, time_taken)
if print_details:
print_clustering_results("PAM hierarchical", n_clusters, specific_info)
data['Cluster'] = labels
processed_results = _process_clusters(grid, data, cluster_centers, representative_indices=medoid_indices)
return processed_results, CoV, [data_scaled, labels]
def save_clustering_results(grid, algorithm, n_clusters, specific_info, CoV, time_taken=None):
"""Helper function to save clustering results to grid.Clustering_information."""
if not hasattr(grid, 'Clustering_information'):
grid.Clustering_information = {}
# Create a dictionary with all clustering information
clustering_result = {
'algorithm': algorithm,
'n_clusters': n_clusters,
'CoV': CoV,
'specific_info': {}
}
# Add time taken if provided
if time_taken is not None:
clustering_result['time taken'] = time_taken
# Convert specific_info to a serializable format
for key, value in specific_info.items():
if isinstance(value, (int, str, float, bool)):
clustering_result['specific_info'][key] = value
elif isinstance(value, (list, np.ndarray)):
clustering_result['specific_info'][key] = list(value)
# Also calculate statistics for cluster sizes
if key == "Cluster sizes":
clustering_result['specific_info']['Cluster sizes average'] = float(np.mean(value))
clustering_result['specific_info']['Cluster sizes std'] = float(np.std(value))
elif isinstance(value, tuple):
clustering_result['specific_info'][key] = {
'count': value[0],
'percentage': value[1]
}
else:
clustering_result['specific_info'][key] = str(value)
# Store in Clustering_information with a key based on algorithm and n_clusters
key_name = f'technique_{algorithm}_{n_clusters}'
grid.Clustering_information[key_name] = clustering_result
# Also store the most recent result for easy access
grid.Clustering_information['latest_technique'] = clustering_result
def print_clustering_results(algorithm, n_clusters, specific_info):
"""Helper function to print clustering results in a standardized format."""
print(f"\n{algorithm} clustering results:")
print(f"- Number of clusters: {n_clusters}")
CoV=0
# Print algorithm-specific information
for key, value in specific_info.items():
if isinstance(value, (int, str)):
print(f"- {key}: {value}")
elif isinstance(value, float):
print(f"- {key}: {value:.2f}")
elif isinstance(value, list):
print(f"- {key}: {value}")
if key == "Cluster sizes":
CoV = np.std(value)/np.mean(value)
print(f" • Average: {np.mean(value):.1f}")
print(f" • Std dev: {np.std(value):.1f}")
print(f" • CoV : {CoV:.1f}")
elif isinstance(value, tuple):
count, percentage = value
print(f"- {key}: {count} ({percentage:.1f}%)")
return CoV
[docs]
def run_clustering_analysis(grid, save_path='clustering_results',algorithms=None,n_clusters_list=None,time_series=None,print_details=False,ts_options=None,correlation_decisions=None,plotting=False, plotting_options=None,identifier=None):
"""Sweep clustering algorithms and cluster counts for exploratory analysis.
Runs :func:`cluster_TS` for each ``(algorithm, n_clusters)`` pair, collects
quality metrics, optionally saves representative-period plots, and writes
``clustering_summary_<identifier>.csv`` under ``save_path``.
Parameters
----------
grid : Grid
Grid with attached time series.
save_path : str
Output directory for CSV summaries and optional plots.
algorithms : list of str, optional
Clustering methods passed to :func:`cluster_TS` (default includes
``kmeans``, ``kmedoids``, ``ward``, ``pam_hierarchical``).
n_clusters_list : list of int, optional
Cluster counts to test (defaults to ``DEFAULT_CLUSTER_NUMBERS``).
time_series : list, optional
TS types to include (empty list keeps grid defaults).
print_details : bool
Verbose clustering diagnostics from :func:`cluster_TS`.
ts_options : list, optional
``[central_market, cv_threshold, correlation_threshold]`` for filtering.
correlation_decisions : list, optional
Passed through to :func:`identify_correlations`.
plotting : bool
When ``True``, save time-series plots per sweep step.
plotting_options : list, optional
``[variable_name_or_None, file_extension]`` for plots.
identifier : str, optional
Suffix for output filenames.
Returns
-------
pandas.DataFrame
One row per successful ``(algorithm, n_clusters)`` run with timing and
quality metrics.
"""
if algorithms is None:
algorithms = ['kmeans', 'kmedoids', 'ward', 'pam_hierarchical']
if n_clusters_list is None:
n_clusters_list = DEFAULT_CLUSTER_NUMBERS
if time_series is None:
time_series = []
if ts_options is None:
ts_options = [None, 0, 0.8]
if correlation_decisions is None:
correlation_decisions = [True, '2', True]
if plotting_options is None:
plotting_options = [None, '.png']
results = {
'algorithm': [],
'n_clusters': [],
'time_taken': [],
'Coefficient of Variation': [],
'inertia': [],
'davies_bouldin_combined': [],
'davies_bouldin_value': [],
'davies_bouldin_seasonal': []
}
for algo in algorithms:
print(f"\nTesting {algo}...")
for n in n_clusters_list:
print(f" Clusters: {n}")
start_time = time.perf_counter()
try:
n_clusters,_,CoV,data_info = cluster_TS(grid, n_clusters= n, time_series=time_series,central_market=ts_options[0],algorithm=algo, cv_threshold=ts_options[1] ,correlation_threshold=ts_options[2],print_details=print_details,correlation_decisions=correlation_decisions)
data,data_scaled,labels = data_info
if algo in ('kmeans', 'kmeans_medoids'):
CoV, inertia, n_iter_ = CoV
elif algo == 'kmedoids':
CoV, inertia = CoV
else:
inertia = 0
time_taken = time.perf_counter() - start_time
metrics = _evaluate_clustering(data_scaled, labels)
db_score = metrics['davies_bouldin_combined']
value_db_score = metrics['davies_bouldin_value']
season_db_score = metrics['davies_bouldin_seasonal']
if plotting:
if plotting_options[0] is None:
covs = np.std(data_scaled, axis=0) / np.mean(np.abs(data_scaled), axis=0)
highest_cov_idx = np.argmax(covs)
var_name = data.columns[highest_cov_idx]
print(f"Plotting time series with highest CoV: {var_name} (CoV = {covs.iloc[highest_cov_idx]:.3f})")
else:
var_name = plotting_options[0]
highest_cov_idx = data.columns.get_loc(var_name)
print(f"Plotting specified time series: {var_name}")
plot_time_series(data,labels,var_name,n_clusters,save_path,plotting_options[1],identifier,algo)
results['algorithm'].append(algo)
results['n_clusters'].append(n)
results['time_taken'].append(time_taken)
results['Coefficient of Variation'].append(CoV)
results['inertia'].append(inertia)
results['davies_bouldin_combined'].append(db_score)
results['davies_bouldin_value'].append(value_db_score)
results['davies_bouldin_seasonal'].append(season_db_score)
print(f" Time: {time_taken:.2f}s")
except Exception as e:
print(f" Error with {algo}, n={n}: {str(e)}")
continue
df_results = pd.DataFrame(results)
Path(save_path).mkdir(parents=True, exist_ok=True)
# Updated summary to use correct columns
summary_df = df_results[['algorithm', 'n_clusters', 'time_taken', 'Coefficient of Variation','inertia','davies_bouldin_combined','davies_bouldin_value','davies_bouldin_seasonal']]
summary_df.to_csv(f'{save_path}/clustering_summary_{identifier}.csv', index=False)
return df_results
# Usage:
# results = run_clustering_analysis(grid)
# To analyze results:
def plot_clustering_results(df=None, results_path='clustering_results', format='svg',identifier=None):
"""
Plot clustering analysis results with publication-quality formatting.
Parameters:
-----------
df : pandas.DataFrame, optional
DataFrame containing clustering results. If None, loads from results_path
results_path : str, default='clustering_results'
Path to save the generated plots
format : str, default='svg'
Output format for plots ('svg', 'png', etc.)
"""
# Convert 8.25 cm to inches and maintain ratio
width_inches = FIGURE_WIDTH_CM / CM_TO_INCHES
ratio = 6/10 # Original height/width ratio
height_inches = width_inches * ratio
# Set publication-quality plotting parameters
plt.style.use('seaborn-v0_8-whitegrid')
plt.rcParams.update({
'figure.figsize': (width_inches, height_inches),
'font.family': 'serif',
'font.size': 8,
'axes.labelsize': 8,
'axes.titlesize': 8,
'xtick.labelsize': 7,
'ytick.labelsize': 7,
'legend.fontsize': 7,
'lines.markersize': 4,
'lines.linewidth': 1,
'grid.alpha': 0.3
})
if df is None:
df = pd.read_csv(f'{results_path}/clustering_summary.csv')
def format_axes(ax):
"""Apply consistent formatting to plot axes"""
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.grid(True, linestyle='--', alpha=0.3)
ax.tick_params(direction='out', length=TICK_LENGTH, width=TICK_WIDTH)
# Define consistent color palette
algorithms = df['algorithm'].unique()
colors = plt.cm.Set2(np.linspace(0, 1, len(algorithms)))
markers = ['o', 's', '^', 'D', 'v', '<', '>', 'p']
# Create plots with consistent styling
metrics = [
('time_taken', 'Time (seconds)',10),
('Coefficient of Variation', 'Coefficient of Variation',2),
('inertia', 'Inertia',INERTIA_PLOT_LIMIT),
('davies_bouldin_combined', 'Davies-Bouldin Index',2.5),
('davies_bouldin_value', 'Davies-Bouldin Index',2.5),
('davies_bouldin_seasonal', 'Davies-Bouldin Index',2.5)
]
for metric, ylabel,ymax in metrics:
fig, ax = plt.subplots()
for idx, algo in enumerate(algorithms):
data = df[df['algorithm'] == algo]
ax.plot(data['n_clusters'], data[metric],
marker=markers[idx % len(markers)],
color=colors[idx],
label=algo,
markersize=4,
linewidth=1)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel(ylabel)
ax.set_ylim(0,ymax)
# Adjust legend
ax.legend(bbox_to_anchor=(0.5, LEGEND_Y_POSITION),
loc='upper center',
ncol=2,
frameon=False)
format_axes(ax)
plt.tight_layout()
# Save plot
metric_name = metric.lower().replace('_', '-')
plt.savefig(f'{results_path}/{metric_name}-comparison_{identifier}.{format}',
dpi=SAVE_DPI,
bbox_inches='tight',
pad_inches=0.1)
plt.close()
[docs]
def run_clustering_analysis_and_plot(grid,algorithms=None,n_clusters_list=None,path='clustering_results',time_series=None,print_details=False,ts_options=None,correlation_decisions=None,plotting_options=None,identifier=None):
if algorithms is None:
algorithms = ['kmeans', 'kmedoids', 'ward', 'pam_hierarchical']
if n_clusters_list is None:
n_clusters_list = DEFAULT_CLUSTER_NUMBERS
if time_series is None:
time_series = []
if ts_options is None:
ts_options = [None, 0, 0.8]
if correlation_decisions is None:
correlation_decisions = [True, '2', True]
if plotting_options is None:
plotting_options = [None, 'svg']
results = run_clustering_analysis(grid,path,algorithms,n_clusters_list,time_series,print_details,ts_options,correlation_decisions,plotting=True, plotting_options=plotting_options,identifier=identifier)
plot_clustering_results(results,path,format=plotting_options[1],identifier=identifier)
def run_elbow_analysis(
grid,
n_clusters_list=None,
algorithm='kmeans',
save_path='clustering_results',
time_series=None,
print_details=False,
print_step=True,
ts_options=None,
correlation_decisions=None,
forced_centers=None,
format='svg',
identifier=None,
save_clusters_path=None,
):
"""
Run elbow analysis (inertia vs number of clusters) and save plot + CSV.
"""
if n_clusters_list is None:
n_clusters_list = [2, 4, 6, 8, 12, 16, 24]
if time_series is None:
time_series = []
if ts_options is None:
ts_options = [None, 0, 0.8]
if correlation_decisions is None:
correlation_decisions = [True, '2', True]
algorithm = algorithm.lower()
if algorithm not in {'kmeans', 'kmeans_medoids', 'kmedoids'}:
raise ValueError(
f"Elbow analysis supports only kmeans, kmeans_medoids, kmedoids. Got: {algorithm}"
)
rows = []
if save_clusters_path is not None:
Path(save_clusters_path).mkdir(parents=True, exist_ok=True)
def _export_clusters_payload(grid_obj, n_clusters):
cluster_data = grid_obj.Clusters[n_clusters]
representatives = cluster_data.get("Representatives", pd.DataFrame())
reps_data = representatives.to_dict(orient="list") if isinstance(representatives, pd.DataFrame) else {}
payload = {
"n_clusters": int(n_clusters),
"weight": [float(x) for x in cluster_data.get("Weight", [])],
"cluster_count": [int(x) for x in cluster_data.get("Cluster Count", [])],
"labels": [int(x) for x in cluster_data.get("Labels", [])],
"cluster_idx": {str(k): [int(i) for i in v] for k, v in cluster_data.get("Cluster idx", {}).items()},
"representatives": {"data": reps_data},
"time_series_clustered": {},
}
for ts in grid_obj.Time_series:
clustered_dict = getattr(ts, "data_clustered", {})
if isinstance(clustered_dict, dict) and n_clusters in clustered_dict:
payload["time_series_clustered"][ts.name] = [float(x) for x in clustered_dict[n_clusters]]
return payload
total_steps = len(n_clusters_list)
for idx, n in enumerate(n_clusters_list, start=1):
if print_step:
print(
f"[run_elbow_analysis] algorithm={algorithm} "
f"step={idx}/{total_steps} requested_k={int(n)}"
)
n_used, _, metrics, _ = cluster_TS(
grid,
n_clusters=int(n),
time_series=time_series,
central_market=ts_options[0],
algorithm=algorithm,
cv_threshold=ts_options[1],
correlation_threshold=ts_options[2],
print_details=print_details,
correlation_decisions=correlation_decisions,
forced_centers=forced_centers,
)
if print_step:
print(
f"[run_elbow_analysis] algorithm={algorithm} "
f"step={idx}/{total_steps} completed used_k={int(n_used)}"
)
cov_value, inertia_value = metrics[0], metrics[1]
latest = getattr(grid, 'Clustering_information', {}).get('latest_technique', {})
specific_info = latest.get('specific_info', {}) if isinstance(latest, dict) else {}
db_combined = specific_info.get('Davies-Bouldin (combined)')
db_value = specific_info.get('Davies-Bouldin (value)')
db_seasonal = specific_info.get('Davies-Bouldin (seasonal)')
time_taken = latest.get('time taken') if isinstance(latest, dict) else None
rows.append(
{
'algorithm': algorithm,
'k_requested': int(n),
'n_clusters': int(n_used),
'Coefficient of Variation': float(cov_value),
'inertia': float(inertia_value),
'davies_bouldin_combined': float(db_combined) if db_combined is not None else float('nan'),
'davies_bouldin_value': float(db_value) if db_value is not None else float('nan'),
'davies_bouldin_seasonal': float(db_seasonal) if db_seasonal is not None else float('nan'),
'time_taken_s': float(time_taken) if time_taken is not None else float('nan'),
}
)
if save_clusters_path is not None:
payload = _export_clusters_payload(grid, int(n_used))
payload["cluster_algorithm"] = algorithm
save_json = Path(save_clusters_path) / f"clusters_{algorithm}_k{int(n_used)}.json"
with open(save_json, "w", encoding="utf-8") as fh:
json.dump(payload, fh, indent=2)
df_elbow = pd.DataFrame(rows).sort_values('n_clusters').reset_index(drop=True)
Path(save_path).mkdir(parents=True, exist_ok=True)
suffix = identifier if identifier is not None else algorithm
csv_path = Path(save_path) / f'elbow_summary_{suffix}.csv'
df_elbow.to_csv(csv_path, index=False)
plt.style.use('seaborn-v0_8-whitegrid')
fig, ax = plt.subplots()
ax.plot(
df_elbow['n_clusters'],
df_elbow['inertia'],
marker='o',
linewidth=1.5,
label=algorithm,
)
ax.set_xlabel('Number of Clusters')
ax.set_ylabel('Inertia')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
ax.grid(True, linestyle='--', alpha=0.3)
ax.legend(frameon=False)
plt.tight_layout()
plot_path = Path(save_path) / f'elbow_plot_{suffix}.{format}'
plt.savefig(plot_path, dpi=SAVE_DPI, bbox_inches='tight', pad_inches=0.1)
plt.close(fig)
return df_elbow
def Time_series_cluster_relationship(grid, ts1_name=None, ts2_name=None,price_zone=None,ts_type=None, algorithm='kmeans',
take_into_account_time_series=None,
number_of_clusters=2, path='clustering_results',
format='svg',print_details=False):
"""
Plot two time series with their cluster assignments in different colors.
"""
n_clusters, clusters, returns, data_info = cluster_TS(
grid, number_of_clusters,time_series=take_into_account_time_series, algorithm=algorithm,print_details=False)
data,data_scaled,labels = data_info
if ts1_name is not None:
ts1 = grid.Time_series[grid.Time_series_dic[ts1_name]].data
if ts2_name is not None:
ts2 = grid.Time_series[grid.Time_series_dic[ts2_name]].data
plot_clustered_timeseries_single(ts1,ts2,algorithm,n_clusters,path,labels,ts1_name,ts2_name)
return
else:
for ts in grid.Time_series.values():
if ts.name != ts1_name:
ts2 = ts.data
ts2_name = ts.name
plot_clustered_timeseries_single(ts1,ts2,algorithm,n_clusters,path,labels,ts1_name,ts2_name)
return
elif price_zone is not None:
PZ = grid.Price_Zones_dic[price_zone]
# Collect all time series in a list
ts_list = []
ts_names = []
for ts_idx in grid.Price_Zones[PZ].TS_dict.values():
if ts_idx is None:
continue
ts = grid.Time_series[ts_idx]
ts_list.append(ts.data)
ts_names.append(ts.name)
# Create plots for all pairs
for i, ts1 in enumerate(ts_list):
for j, ts2 in enumerate(ts_list[i+1:], start=i+1):
plot_clustered_timeseries_single(
ts1=ts1,
ts2=ts2,
algorithm=algorithm,
n_clusters=n_clusters,
path=path,
labels=labels,
ts1_name=ts_names[i],
ts2_name=ts_names[j]
)
elif ts_type is not None:
# Collect all time series of the specified type
ts_list = []
ts_names = []
for ts in grid.Time_series:
if ts.type == ts_type:
ts_list.append(ts.data)
ts_names.append(ts.name)
# Create plots for all pairs
for i, ts1 in enumerate(ts_list):
for j, ts2 in enumerate(ts_list[i+1:], start=i+1):
plot_clustered_timeseries_single(
ts1=ts1,
ts2=ts2,
algorithm=algorithm,
n_clusters=n_clusters,
path=path,
labels=labels,
ts1_name=ts_names[i],
ts2_name=ts_names[j]
)
else:
print('No valid input provided')
def plot_clustered_timeseries_single(ts1,ts2,algorithm,n_clusters,path,labels,ts1_name,ts2_name):
# Get the time series data
# Set up figure dimensions
width_inches = FIGURE_WIDTH_CM / CM_TO_INCHES
height_inches = width_inches
# Set global plotting parameters
plt.rcParams.update({
'figure.figsize': (width_inches, height_inches),
'font.size': 8,
'axes.labelsize': 8,
'axes.titlesize': 8,
'xtick.labelsize': 8,
'ytick.labelsize': 8,
'legend.fontsize': 8,
'lines.markersize': 4,
'lines.linewidth': 1
})
# Create color map for clusters
colors = plt.cm.tab10(np.linspace(0, 1, n_clusters))
# Plot time series relationship
plt.figure()
for i in range(n_clusters):
mask = labels == i
plt.plot(ts1[mask], ts2[mask], 'o',
color=colors[i], label=f'Cluster {i}')
plt.xlabel(ts1_name)
plt.ylabel(ts2_name)
plt.legend()
plt.savefig(f'{path}/clustered_relationship_{algorithm}_{n_clusters}.png')
plt.show()
plt.close()
def find_medoid(cluster_data):
"""Helper function to find medoid of a cluster"""
distances = pairwise_distances(cluster_data, metric='manhattan')
medoid_idx = distances.sum(axis=1).argmin()
return cluster_data.index[medoid_idx]
def _evaluate_clustering(data_scaled, labels, time_resolution_hours=DEFAULT_TIME_RESOLUTION_HOURS,
seasonal_period_hours=DEFAULT_SEASONAL_PERIOD_HOURS):
"""
Evaluate time series clustering using standard DB index plus temporal and seasonal components.
Parameters:
-----------
data_scaled : array-like
Scaled data for clustering
labels : array-like
Cluster labels
time_resolution_hours : float, default=1
Time resolution of the data in hours (e.g., 0.25 for 15-min, 1 for hourly, 24 for daily)
seasonal_period_hours : float, default=168
Seasonal period to analyze in hours (e.g., 168 for weekly, 8760 for yearly)
"""
def temporal_seasonal_scores(X, labels):
"""Calculate seasonal component using DB-style calculation"""
unique_labels = np.unique(labels)
n_clusters = len(unique_labels)
# Calculate cluster centers for seasonal patterns
seasonal_centers = []
for label in unique_labels:
cluster_points = X[labels == label]
# Seasonal pattern - now configurable
seasonal_period_points = int(seasonal_period_hours / time_resolution_hours)
if cluster_points.shape[0] > seasonal_period_points:
# Reshape to seasonal periods
periods = cluster_points.shape[0] // seasonal_period_points * seasonal_period_points
reshaped = cluster_points[:periods].reshape(-1, seasonal_period_points, X.shape[1])
seasonal_pattern = np.mean(reshaped, axis=0)
seasonal_centers.append(np.mean(seasonal_pattern, axis=0))
else:
seasonal_centers.append(np.mean(cluster_points, axis=0))
seasonal_centers = np.array(seasonal_centers)
# Calculate DB scores
season_scores = []
for i in range(n_clusters):
cluster_i = X[labels == unique_labels[i]]
if len(cluster_i) <= 1:
continue
# Calculate within-cluster scatter
season_scatter_i = np.std(cluster_i)
max_season_ratio = 0
for j in range(n_clusters):
if i != j:
cluster_j = X[labels == unique_labels[j]]
if len(cluster_j) <= 1:
continue
# Calculate between-cluster separation
season_sep = np.linalg.norm(seasonal_centers[i] - seasonal_centers[j])
season_scatter_j = np.std(cluster_j)
if season_sep > 0:
season_ratio = (season_scatter_i + season_scatter_j) / season_sep
max_season_ratio = max(max_season_ratio, season_ratio)
if max_season_ratio > 0:
season_scores.append(max_season_ratio)
# Return zeros for temporal and the seasonal score
return np.mean(season_scores) if season_scores else 0
# Convert DataFrame to numpy array if needed
if isinstance(data_scaled, pd.DataFrame):
data_scaled_array = data_scaled.values
else:
data_scaled_array = np.array(data_scaled)
# Get standard DB score from sklearn
standard_db = davies_bouldin_score(data_scaled_array, labels)
# Get temporal and seasonal components
seasonal_db = temporal_seasonal_scores(data_scaled_array, labels)
# Combine scores (equal weights)
combined_db = (standard_db + seasonal_db) / 2
return {
'davies_bouldin_combined': combined_db,
'davies_bouldin_value': standard_db,
'davies_bouldin_seasonal': seasonal_db
}
def cluster_Kmedoids_auto(grid, data, scaling_data=None, kmin=2, kmax=20,
method='dynmsc', random_state=None, metric='euclidean',
print_details=False, scaler_type='robust'):
"""
Perform K-Medoids clustering with automatic cluster number selection using DynMSC.
"""
from kmedoids import dynmsc
data_scaled, scaler = _prepare_scaled_data(data, scaling_data,scaler_type)
# Compute distance matrix if needed
if metric != 'precomputed':
from sklearn.metrics.pairwise import pairwise_distances
dist_matrix = pairwise_distances(data_scaled, metric=metric)
else:
dist_matrix = data_scaled
# Run DynMSC for automatic cluster selection
result = dynmsc(dist_matrix, kmax, kmin)
optimal_k = result.bestk
if print_details:
print(f"Optimal number of clusters: {optimal_k}")
print(f"Medoid Silhouette scores: {result.losses}")
print(f"Range of k tested: {result.rangek}")
# Now run clustering with the optimal number of clusters
return cluster_Kmedoids(grid, optimal_k, data, scaling_data,
method='fasterpam', random_state=random_state,
metric=metric, print_details=print_details)
def compare_kmedoids_methods(grid, n_clusters, data, scaling_data=None,
methods=None,
metric='euclidean', print_details=False, scaler_type='robust'):
"""
Compare different K-Medoids methods.
"""
if methods is None:
methods = ['fasterpam', 'fastpam1', 'pam', 'alternate']
results = {}
for method in methods:
try:
start_time = time.perf_counter()
method_results = cluster_Kmedoids(grid, n_clusters, data, scaling_data,
method=method, metric=metric,
print_details=False, scaler_type=scaler_type)
time_taken = time.perf_counter() - start_time
results[method] = {
'CoV': method_results[1][0],
'inertia': method_results[1][1],
'time': time_taken
}
except Exception as e:
print(f"Error with method {method}: {e}")
results[method] = None
if print_details:
print("\nK-Medoids Methods Comparison:")
print("=" * 50)
for method, metrics in results.items():
if metrics is not None:
print(f"\n{method.upper()}:")
print(f" Coefficient of Variation: {metrics['CoV']:.4f}")
print(f" Inertia: {metrics['inertia']:.4f}")
print(f" Time: {metrics['time']:.3f}s")
return results
[docs]
def cluster_analysis(grid,clustering_options):
clustering = False
if clustering_options is not None:
"""
clustering_options = {
'n_clusters': 1,
'time_series': [],
'central_market': [],
'thresholds': [cv_threshold,correlation_threshold],
'print_details': True/False,
'correlation_decisions': [correlation_cleaning = True/False,method = '1/2/3',scale_groups = True/False],
'cluster_algorithm': 'Kmeans/Ward/Kmedoids/PAM_Hierarchical/Kmeans_Medoids'
'critical_idx': [], # List of indices of the time series to be considered as critical
'base_critical_ratio': 0.5 # Ratio of the critical time series to the total time series a value of 0.75 in 8 clusters means
or int means directly the number of clusters to be created for the critical time series.
that the critical time series will be clustered into 6 clusters and the rest of the time series will be clustered into 2 clusters.
}
"""
precomputed = clustering_options.get('precomputed_clusters')
precomputed_path = clustering_options.get('precomputed_clusters_path')
if precomputed is not None or precomputed_path is not None:
n = load_precomputed_clusters_to_grid(
grid,
precomputed=precomputed,
precomputed_path=precomputed_path,
fallback_n_clusters=clustering_options.get('n_clusters', 0),
)
return n, True
n = clustering_options['n_clusters']
time_series = clustering_options['time_series'] if 'time_series' in clustering_options else []
central_market = clustering_options['central_market'] if 'central_market' in clustering_options else []
thresholds = clustering_options['thresholds'] if 'thresholds' in clustering_options else [0,0.8]
print_details = clustering_options['print_details'] if 'print_details' in clustering_options else False
correlation_decisions = clustering_options['correlation_decisions'] if 'correlation_decisions' in clustering_options else [False,'1',False]
algo = clustering_options['cluster_algorithm'] if 'cluster_algorithm' in clustering_options else 'Kmeans'
critical_idx = clustering_options['critical_idx'] if 'critical_idx' in clustering_options else []
base_critical_ratio = clustering_options['base_critical_ratio'] if 'base_critical_ratio' in clustering_options else 0.5
forced_centers = clustering_options.get('forced_centers')
n_clusters,_,_,_ = cluster_TS(
grid,
n_clusters=n,
time_series=time_series,
central_market=central_market,
algorithm=algo,
cv_threshold=thresholds[0],
correlation_threshold=thresholds[1],
print_details=print_details,
correlation_decisions=correlation_decisions,
critical_idx=critical_idx,
base_critical_ratio=base_critical_ratio,
forced_centers=forced_centers,
)
clustering = True
else:
n_clusters = len(grid.Time_series[0].data)
return n_clusters,clustering
[docs]
def load_precomputed_clusters_to_grid(grid, precomputed=None, precomputed_path=None, fallback_n_clusters=0):
if precomputed is None:
if precomputed_path is None:
raise ValueError("Either 'precomputed' or 'precomputed_path' must be provided")
with open(precomputed_path, 'r', encoding='utf-8') as fh:
precomputed = json.load(fh)
n = int(precomputed.get('n_clusters', fallback_n_clusters))
if n <= 0:
raise ValueError("Invalid precomputed clustering payload: missing/invalid 'n_clusters'")
ts_clustered = precomputed.get('time_series_clustered', {})
missing_ts = [ts.name for ts in grid.Time_series if ts.name not in ts_clustered]
if missing_ts:
raise ValueError(
"Precomputed clustering payload is missing clustered values for time series: "
+ ", ".join(missing_ts[:10])
+ ("..." if len(missing_ts) > 10 else "")
)
cluster_idx_raw = precomputed.get('cluster_idx', {})
cluster_idx = {int(k): v for k, v in cluster_idx_raw.items()}
reps_payload = precomputed.get('representatives', {})
reps_data = reps_payload.get('data', {})
representatives = pd.DataFrame(reps_data)
grid.Clusters[n] = {
'Weight': np.array(precomputed.get('weight', []), dtype=float),
'Cluster Count': np.array(precomputed.get('cluster_count', [])),
'Cluster idx': cluster_idx,
'Labels': np.array(precomputed.get('labels', [])),
'Representatives': representatives,
}
for ts in grid.Time_series:
if not hasattr(ts, 'data_clustered') or not isinstance(ts.data_clustered, dict):
ts.data_clustered = {}
ts.data_clustered[n] = np.array(ts_clustered[ts.name], dtype=float)
return n