"""
Module for statistical jump models
"""
import numpy as np
import pandas as pd
import multiprocessing as mp
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import Counter
from operator import itemgetter
from sklearn import preprocessing
from sklearn.metrics import balanced_accuracy_score
from ..log_config import setup_logger
logger = setup_logger(__name__)
[docs]class DiscreteJumpModel:
"""
Statistical Jump Model with Discrete States
"""
def __init__(self) -> None:
pass
# Non-vectorized version
# def fixed_states_optimize(self, y, s, theta_guess=None, k=2):
# """
# Optimize the parameters of a discrete jump model with fixed states
# Args:
# y (np.ndarray): observed data (T x n_features)
# s (np.ndarray): state sequence (T x 1)
# theta_guess (np.ndarray): initial guess for theta (k x n_features)
# k (int): number of states
# Returns:
# theta (np.ndarray): optimized parameters
# prob.value (float): optimal value of the objective function
# """
# if not isinstance(y, np.ndarray) or not isinstance(s, np.ndarray):
# raise TypeError("y and s must be numpy arrays")
# T, n_features = y.shape
# # initialize variables from guess
# theta = cp.Variable((k, n_features))
# if theta_guess is not None:
# theta.value = theta_guess
# # solve optimization problem; essentially the k-means problem
# diff = [0.5 * cp.norm2(y[i, :] - theta[s[i], :]) ** 2 for i in range(T)]
# objective = cp.Minimize(cp.sum(diff))
# prob = cp.Problem(objective)
# prob.solve()
# return theta.value, prob.value
[docs] def fixed_states_optimize(self, y, s, k=2):
"""
Optimize the parameters of a discrete jump model with states fixed first.
Args:
y (np.ndarray): Observed data of shape (T x n_features).
s (np.ndarray): State sequence of shape (T x 1).
theta_guess (np.ndarray): Initial guess for theta of shape (k x n_features).
k (int): Number of states.
Returns:
tuple:
- np.ndarray: Optimized parameters of shape (k x n_features).
- float: Optimal value of the objective function.
"""
if not isinstance(y, np.ndarray) or not isinstance(s, np.ndarray):
raise TypeError("y and s must be numpy arrays")
T, n_features = y.shape
theta = np.zeros((k, n_features))
# Compute the optimal centroids (theta) for each state
for state in range(k):
assigned_data_points = y[s == state]
if assigned_data_points.size > 0:
theta[state] = assigned_data_points.mean(axis=0)
# Compute the objective value
objective_value = 0.5 * np.sum(
[np.linalg.norm(y[i, :] - theta[s[i], :]) ** 2 for i in range(T)]
)
return theta, objective_value
# Non-vectorized version
# def generate_loss_matrix(self, y, theta, k=2):
# """
# Generate the loss matrix for a discrete jump model for fixed theta
# Args:
# y (np.ndarray): observed data (T x n_features)
# theta (np.ndarray): parameters (k x n_features)
# k (int): number of states
# Returns:
# loss (np.ndarray): loss matrix (T x k)
# """
# T = y.shape[0]
# loss = np.zeros((T, k))
# for i in range(T):
# for j in range(k):
# # norm is the L2 norm by default
# loss[i, j] = 0.5 * np.linalg.norm(y[i, :] - theta[j, :]) ** 2
# return loss
[docs] def generate_loss_matrix(self, y, theta):
"""
Generate the loss matrix for a discrete jump model for fixed theta
Args:
y (np.ndarray): observed data (T x n_features)
theta (np.ndarray): parameters (k x n_features)
k (int): number of states
Returns:
loss (np.ndarray): loss matrix (T x k)
"""
# Expand dimensions to broadcast subtraction across y and theta
diff = y[:, np.newaxis, :] - theta[np.newaxis, :, :]
# Compute squared L2 norm along the last axis (n_features)
loss = 0.5 * np.sum(diff**2, axis=-1)
return loss
# Non-vectorized version
# def fixed_theta_optimize(self, lossMatrix, lambda_, k=2):
# """
# Optimize the state sequence of a discrete jump model with fixed parameters
# Args:
# lossMatrix (np.ndarray): loss matrix (T x k)
# lambda_ (float): regularization parameter
# k (int): number of states
# Returns:
# s (np.ndarray): optimal state sequence (T x 1)
# v (float): optimal value of the objective function
# """
# state_choices = list(range(k))
# T, n_states = lossMatrix.shape
# V = np.zeros((T, n_states))
# V[0, :] = lossMatrix[0, :]
# for t in range(1, T):
# for k in range(n_states):
# V[t, k] = lossMatrix[t, k] + min(
# V[t - 1, :]
# + lambda_ * np.abs(state_choices[k] - np.array(state_choices))
# )
# # backtrack to find optimal state sequence
# v = V[-1, :].min()
# s = np.zeros(T, dtype=int)
# s[-1] = state_choices[V[-1, :].argmin()]
# for t in range(T - 2, -1, -1):
# s[t] = state_choices[
# np.argmin(V[t, :] + lambda_ * np.abs(state_choices - s[t + 1]))
# ]
# return s, v
[docs] def fixed_theta_optimize(self, lossMatrix, lambda_):
"""
Optimize the state sequence of a discrete jump model with fixed parameters
Args:
lossMatrix (np.ndarray): loss matrix (T x k)
lambda_ (float): regularization parameter
Returns:
s (np.ndarray): optimal state sequence (T,)
v (float): optimal value of the objective function
"""
T, n_states = lossMatrix.shape
V = np.zeros((T, n_states))
V[0, :] = lossMatrix[0, :]
state_choices = np.arange(n_states)
for t in range(1, T):
# Using broadcasting to compute the regularization term for all states at once
regularization = lambda_ * np.abs(
state_choices[:, np.newaxis] - state_choices
)
V[t, :] = lossMatrix[t, :] + \
(V[t - 1, :] + regularization).min(axis=1)
# backtrack to find optimal state sequence
v = V[-1, :].min()
s = np.zeros(T, dtype=int)
s[-1] = state_choices[V[-1, :].argmin()]
for t in range(T - 2, -1, -1):
s[t] = state_choices[
np.argmin(V[t, :] + lambda_ * np.abs(state_choices - s[t + 1]))
]
return s, v
[docs] def initialize_kmeans_plusplus(self, data, k):
"""
Initialize the centroids using the k-means++ method.
Args:
data: ndarray of shape (n_samples, n_features)
k: number of clusters
Returns:
centroids: ndarray of shape (k, n_features)
"""
initial_idx = np.random.choice(data.shape[0], 1)
centroids = [data[initial_idx]]
for _ in range(k - 1):
squared_distances = np.min(
[np.sum((data - centroid) ** 2, axis=1)
for centroid in centroids],
axis=0,
)
prob = squared_distances / squared_distances.sum()
next_centroid_idx = np.random.choice(data.shape[0], 1, p=prob)
centroids.append(data[next_centroid_idx])
return np.array(centroids)
[docs] def classify_data_to_states(self, data, centroids):
"""
Classify data points to the states based on the centroids.
Args:
data: ndarray of shape (n_samples, n_features)
centroids: centroids or means of the states, ndarray of shape (k, n_features)
Returns:
state_assignments: ndarray of shape (n_samples,), indices of the states \
to which each data point is assigned
"""
distances = np.array(
[np.sum((data - centroid) ** 2, axis=1) for centroid in centroids]
)
state_assignments = np.argmin(distances, axis=0)
return state_assignments
[docs] def infer_states_stats(self, ts_returns, states):
"""
Compute the mean and standard deviation of returns for each state
Args:
ts_returns (np.ndarray): observed returns (T x 1)
states (np.ndarray): state sequence (T x 1)
Returns:
state_features (dict): mean and standard deviation of returns for each state
"""
if not isinstance(ts_returns, np.ndarray) or not isinstance(states, np.ndarray):
ts_returns = np.array(ts_returns)
states = np.array(states, dtype=int)
stats = dict()
for state in set(states):
idx = np.where(states == state)[0]
stats[state] = (np.mean(ts_returns[idx]), np.std(ts_returns[idx]))
return stats
[docs] def remapResults(self, optimized_s, optimized_theta, ts_returns):
"""
Remap the results of the optimization.
We would like the states to be in increasing order of the volatility of returns.
This is because vol has smaller variance than returns, a warning is triggered if \
the states identified by volatility and returns are different.
"""
res_s = list()
res_theta = list()
n = len(optimized_s)
for i in range(n):
# idx = np.argsort(optimized_theta[i][:, 0])[::-1]
# whichever has the lowest volatility is assigned to state 0
states_features = self.infer_states_stats(
ts_returns, optimized_s[i])
idx_vol = np.argsort(
[states_features[state][1] for state in states_features]
)
idx_ret = np.argsort(
[states_features[state][0] for state in states_features]
)[::-1]
if not np.array_equal(idx_vol, idx_ret):
logger.warning(
"States identified by volatility ranks and returns ranks are different!"
)
# remap the states
idx_mapping = {old_idx: new_idx for new_idx,
old_idx in enumerate(idx_vol)}
# if only one state, no need to remap
if len(idx_mapping) == 1:
remapped_s = optimized_s[i]
remapped_theta = optimized_theta[i]
else:
remapped_s = [idx_mapping[_] for _ in optimized_s[i]]
remapped_theta = optimized_theta[i][idx_vol, :]
# append the remapped results
res_s.append(remapped_s)
res_theta.append(remapped_theta)
return res_s, res_theta
[docs] def cleanResults(self, raw_result, ts_returns, rearrange=False):
"""
Clean the results of the optimization.
This extracts the best results from the ten trials based on the loss.
"""
optimized_s = list(map(itemgetter(0), raw_result))
optimized_loss = list(map(itemgetter(1), raw_result))
optimized_theta = list(map(itemgetter(2), raw_result))
if rearrange:
optimized_s, optimized_theta = self.remapResults(
optimized_s, optimized_theta, ts_returns
)
idx = int(np.array(optimized_loss).argmin())
best_s = optimized_s[idx]
best_loss = optimized_loss[idx]
best_theta = optimized_theta[idx]
res_dict = {
"best_state_sequence": best_s,
"best_loss": best_loss,
"best_theta": best_theta,
"all_state_sequence": optimized_s,
"all_loss": optimized_loss,
"all_theta": optimized_theta,
}
return res_dict
[docs] def single_run(self, y, k, lambda_):
"""
Run a single trial of the optimization. Each trial uses a different \
initialization of the centroids.
Args:
y (np.ndarray): observed data (T x n_features)
k (int): number of states
lambda_ (float): regularization parameter
Returns:
cur_s (np.ndarray): optimal state sequence (T x 1)
loss (float): optimal value of the objective function
cur_theta (np.ndarray): optimal parameters (k x n_features)
"""
# initialize centroids using k-means++
centroids = self.initialize_kmeans_plusplus(y, k)
cur_s = self.classify_data_to_states(y, centroids)
hist_s = [cur_s]
# the coordinate descent algorithm
for i in range(30):
cur_theta, _ = self.fixed_states_optimize(y, cur_s, k=k)
lossMatrix = self.generate_loss_matrix(y, cur_theta)
cur_s, loss = self.fixed_theta_optimize(lossMatrix, lambda_)
if cur_s.tolist() == hist_s[-1].tolist():
break
else:
hist_s.append(cur_s)
logger.debug(
f"Single run completes after {i} iterations with loss {loss}")
return cur_s, loss, cur_theta
[docs] def fit(self, y, k=2, lambda_=100, rearrange=False, n_trials=10):
"""
fit discrete jump model
NOTE:
A multiprocessing implementation is used to speed up the optimization
Ten trials with k means++ initialization are ran
Args:
y (np.ndarray): observed data (T x n_features)
k (int): number of states
lambda_ (float): regularization parameter
rearrange (bool): whether to rearrange the states in increasing order of \
volatility
Returns:
best_s (np.ndarray): optimal state sequence (T x 1)
best_loss (float): optimal value of the objective function
best_theta (np.ndarray): optimal parameters (k x n_features)
optimized_s (list): state sequences from all trials (10 x T)
optimized_loss (list): objective function values from all trials (10 x 1)
optimized_theta (list): parameters from all trials (10 x k x n_features)
"""
args = [(y, k, lambda_)] * n_trials
# mp.cpu_count()
pool = mp.Pool(n_trials)
res = pool.starmap(self.single_run, args)
pool.close()
res = self.cleanResults(res, y[:, 0], rearrange)
states_stats = self.infer_states_stats(
y[:, 0], res["best_state_sequence"])
logger.info(
f"Mean and Volatility by inferred states:\n {states_stats}")
return res
[docs] def evaluate(self, true, pred, plot=False):
"""
Evaluate the model using balanced accuracy score
Args:
true (np.ndarray): true state sequence (T x 1)
pred (np.ndarray): predicted state sequence (T x 1)
plot (bool): whether to plot the true and predicted state sequences
Returns:
res (dict): evaluation results
"""
true_len = len(true)
pred_len = len(pred)
if plot:
plt.plot(true, label="True")
plt.plot(pred, label="Pred")
plt.title("True and Predicted State Sequences")
plt.xlabel("Time")
plt.ylabel("State")
plt.legend()
plt.show()
res = {"BAC": balanced_accuracy_score(
true[true_len - pred_len:], pred)}
return res
[docs]class ContinuousJumpModel(DiscreteJumpModel):
"""
Continuous Jump Model with Soft State Assignments
"""
[docs] def fixed_states_optimize(self, y, s, k=None):
"""
Optimize theta given fixed states
Args:
y: (T, n_features) array of observations
s: (T, k) array of state assignments
Returns:
theta: (k, n_features) array of optimal parameters
Note:
s is assumed to have each row sum to 1
"""
_, n_features = y.shape
_, k = s.shape
theta = np.zeros((k, n_features))
for state in range(k):
weights = s[:, state]
theta[state] = np.sum(
y * weights[:, np.newaxis], axis=0) / np.sum(weights)
return theta
[docs] def generate_loss_matrix(self, y, theta):
"""Identical to the loss function in the discrete case
"""
diff = y[:, np.newaxis, :] - theta[np.newaxis, :, :]
loss = 0.5 * np.sum(diff**2, axis=-1)
return loss
[docs] def generate_C(self, k, grid_size=0.05):
"""Uniformly sample of states distributed on a grid
Args:
k (int): number of states
Returns:
matrix (np.ndarray): K x N matrix of states
"""
N = int(1 / grid_size) ** k
matrix = np.random.rand(k, N)
matrix /= matrix.sum(axis=0)
return matrix
[docs] def fixed_theta_optimize(self, lossMatrix, lambda_, C):
"""Optimize the state sequence of a continuous jump model with fixed parameters
Args:
lossMatrix (np.ndarray): loss matrix (T x K)
C (np.ndarray): K x N matrix of states
lambda_ (float): regularization parameter
Returns:
s_hat (np.ndarray): optimal state sequence with probability dist (T x K)
v_hat (float): loss value
"""
T, K = lossMatrix.shape
_, N = C.shape
L_tilde = lossMatrix @ C
Lambda = np.array(
[[lambda_ / 4 * np.linalg.norm(c_i - c_j, ord=1)**2 for c_j in C.T] for c_i in C.T])
V_tilde = np.zeros((T, N))
V_tilde[0, :] = L_tilde[0, :]
for t in range(1, T):
for i in range(N):
V_tilde[t, i] = L_tilde[t, i] + \
np.min(V_tilde[t-1, :] + Lambda[:, i])
s_hat = np.zeros((T, K))
i_hat = np.argmin(V_tilde[-1, :])
v_hat = np.min(V_tilde[-1, :])
s_hat[-1] = C[:, i_hat]
for t in range(T-2, -1, -1):
i_hat = np.argmin(V_tilde[t, :] + Lambda[:, i_hat])
s_hat[t] = C[:, i_hat]
return s_hat, v_hat
[docs] def fit(self, y, k=2, lambda_=100, rearrange=False, n_trials=10, max_iter=20):
if rearrange:
raise NotImplementedError("The rearrange function has not been \
implemented.")
# lists to keep best loss and state sequence across trials
best_trial_loss = list()
best_trial_states = list()
for _ in range(n_trials):
centroids = self.initialize_kmeans_plusplus(y, k)
cur_s = self.classify_data_to_states(y, centroids)
second_col = 1 - cur_s
cur_s = np.column_stack((cur_s, second_col))
cur_loss = float('inf')
best_states = cur_s
best_loss = cur_loss
no_improvement_counter = 0 # Counter for consecutive iterations without improvement
for _ in range(max_iter):
cur_theta = self.fixed_states_optimize(y, cur_s, k) # Assuming 2 states
lossMatrix = self.generate_loss_matrix(y, cur_theta)
C = self.generate_C(k)
cur_s, cur_loss = self.fixed_theta_optimize(lossMatrix,
lambda_=lambda_,
C=C)
# Check if the current solution is better than the best known solution
if cur_loss < best_loss:
best_loss = cur_loss
best_states = cur_s
no_improvement_counter = 0 # Reset the counter if there's improvement
else:
no_improvement_counter += 1 # Increment the counter if no improvement
# Check for convergence
if no_improvement_counter >= 3:
best_trial_loss.append(best_loss)
best_trial_states.append(best_states)
break
final_best_loss = min(best_trial_loss)
final_best_states = best_trial_states[best_trial_loss.index(final_best_loss)]
return final_best_states, final_best_loss
[docs]class FeatureGenerator:
"""
Enrich univaraite time series with features
"""
def __init__(self) -> None:
pass
[docs] def enrich_features(self, time_series):
"""
Enrich a single time series with features
Args:
time_series (np.ndarray): time series (T x 1)
Returns:
features (np.ndarray): features (T x n_features)
"""
df = pd.DataFrame({"ts": time_series})
# Features 1-3
df["observation"] = df["ts"]
df["abs_change"] = df["ts"].diff().abs()
df["prev_abs_change"] = df["abs_change"].shift()
# Features 4-9 for w=6 and w=14
for w in [6, 14]:
roll = df["ts"].rolling(window=w)
df[f"centered_mean_{w}"] = roll.mean()
df[f"centered_std_{w}"] = roll.std()
half_w = w // 2
df[f"left_mean_{w}"] = df["ts"].rolling(
window=half_w).mean().shift(half_w)
df[f"left_std_{w}"] = df["ts"].rolling(
window=half_w).std().shift(half_w)
df[f"right_mean_{w}"] = df["ts"].rolling(window=half_w).mean()
df[f"right_std_{w}"] = df["ts"].rolling(window=half_w).std()
# Drop the original time series column
df = df.drop(columns=["ts"])
# Drop the first w rows where features are NaN
# df = df[~np.isnan(df).any(axis=1)]
df = df.dropna(how="any")
return df.values
[docs] def standarize_features(self, X):
"""
Standarize features using sklearn's StandardScaler
"""
return preprocessing.StandardScaler().fit_transform(X)
[docs]class SimulationGenerator:
"""
Generate simulated returns that follows a Hidden Markov process.
"""
def __init__(self) -> None:
pass
[docs] def stationary_distribution(self, transition_matrix):
"""
Computes the stationary distribution for a given Markov transition matrix.
Args:
transition_matrix (numpy array): The Markov transition matrix.
Returns:
numpy array: The stationary distribution.
"""
size = len(transition_matrix)
# Create a matrix subtracted from the identity matrix
Q = np.eye(size) - transition_matrix.T
# Append a ones row to handle the constraint sum(pi) = 1
Q = np.vstack([Q, np.ones(size)])
# Create the target matrix (last entry is 1 for the sum(pi) = 1 constraint)
b = np.zeros(size + 1)
b[-1] = 1
# Solve the linear system
pi = np.linalg.lstsq(Q, b, rcond=None)[0]
return pi
[docs] def simulate_markov(self, transition_matrix, initial_distribution, steps):
"""
Simulates a Markov process.
Args:
transition_matrix (numpy array): The Markov transition matrix.
initial_distribution (numpy array): The initial state distribution.
steps (int): The number of steps to simulate.
Returns:
states (list): The states at each step.
"""
state = np.random.choice(
len(initial_distribution), p=initial_distribution)
states = [state]
for _ in range(steps):
state = np.random.choice(
len(transition_matrix[state]), p=transition_matrix[state]
)
states.append(state)
return states
[docs] def generate_conditional_data(self, states, parameters):
"""
Generate data using normal distribution conditional on the states.
Args:
states (list): The list of states
parameters (dict): Parameters for each state with means and \
standard deviations
Returns:
data (list): Simulated data conditional on the states.
"""
data = []
for state in states:
mu, sigma = parameters[state]
value = np.random.normal(mu, sigma)
data.append(value)
return data
[docs] def run(self, steps, transition_matrix, norm_params):
"""
Run the simulation, return the simulated states and conditional data
NOTE:
States are forced to cover all states, if not, re-run the simulation
Args:
steps (int): number of steps to simulate
transition_matrix (np.ndarray): transition matrix (k x k)
norm_params (dict): parameters for the normal distribution for each state
Returns:
simulated_states (list): simulated states
simulated_data (list): simulated data conditional on states
"""
initial_distribution = self.stationary_distribution(transition_matrix)
logger.info(
f"Step 1: Initial (stationary) distribution: {initial_distribution}"
)
simulated_states = self.simulate_markov(
transition_matrix, initial_distribution, steps
)
# sanity check for the simulated states
count_states = Counter(simulated_states)
if len(count_states) != len(transition_matrix):
logger.info(
"The simulated states do not cover all states. Re-run.")
return self.run(steps, transition_matrix, norm_params)
logger.info(f"Step 2: Simulated states: {count_states}")
simulated_data = self.generate_conditional_data(
simulated_states, norm_params)
logger.info(
"Step 3: Generate simulated return data conditional on states.")
return simulated_states, simulated_data
[docs]class TestingUtils:
"""
Parameters and plotting functions for testing
"""
def __init__(self) -> None:
pass
[docs] def daily(self):
"""
Parameters for simulated daily return data, sourced from the paper
"""
transition_matrix = np.array(
[
[0.99788413, 0.00211587], # From state 0 to states 0 and 1
[0.01198743, 0.98801257], # From state 1 to states 0 and 1
]
)
norm_parameters = {
0: (0.000615, 0.007759155881924271), # mu1, sigma1
1: (-0.000785, 0.017396608864948364), # mu2, sigma2
}
return transition_matrix, norm_parameters
[docs] def plot_returns(self, returns, shade_list=None):
"""
Plot both the cumulative returns and returns on separate subplots sharing the x-axis.
Args:
returns (np.ndarray): An array of returns.
"""
if not isinstance(returns, np.ndarray):
returns = np.array(returns)
# Create a color palette
palette = plt.get_cmap("Set1")
# Compute cumulative returns
cumulative_returns = np.cumprod(1 + returns) - 1
# Create subplots sharing the x-axis
fig, (ax1, ax2) = plt.subplots(2, 1, sharex=True, figsize=(8, 6))
# Plot cumulative returns on the first subplot
ax1.plot(
cumulative_returns,
label="Cumulative Returns",
color=palette(1),
linestyle="-",
)
ax1.set_ylabel("Cumulative Returns")
ax1.set_title("Cumulative Returns")
# Plot returns on the second subplot
ax2.plot(returns, label="Returns", color=palette(2))
ax2.set_xlabel("Time")
ax2.set_ylabel("Returns")
ax2.set_title("Returns")
if shade_list is not None:
# Shade regions corresponding to clusters of 1s in the shade_list
start_shade = len(returns) - len(shade_list)
start_region = None
for i, val in enumerate(shade_list):
if val == 1 and start_region is None:
start_region = start_shade + i
elif val == 0 and start_region is not None:
ax1.axvspan(start_region, start_shade +
i, color="gray", alpha=0.3)
ax2.axvspan(start_region, start_shade +
i, color="gray", alpha=0.3)
start_region = None
# If the last cluster extends to the end of the shade_list
if start_region is not None:
ax1.axvspan(
start_region, start_shade + len(shade_list), color="gray", alpha=0.3
)
ax2.axvspan(
start_region, start_shade + len(shade_list), color="gray", alpha=0.3
)
fig.tight_layout()
plt.show()
[docs] def plot_state_probs(self, states, prices):
"""plot the state probabilities and stock prices on the same plot
Args:
states (np.ndarray): An n x k array of state probabilities.
prices (pd.DataFrame): A series of prices, indexed by date.
"""
if not isinstance(states, np.ndarray):
states = np.array(states)
if not isinstance(prices.index, pd.DatetimeIndex):
raise ValueError("The index of prices must be a DatetimeIndex.")
fig, ax = plt.subplots()
ax.plot(prices.index[len(prices) - len(states):], states[:, 1], label='State Probability')
ax.set_xlabel('Time')
ax.set_ylabel('State Probability')
ax2 = ax.twinx()
ax2.plot(prices.index, prices.values, color='green', label='Stock Price')
ax2.set_ylabel('Price')
# Use AutoDateLocator and DateFormatter for x-axis labels
locator = mdates.AutoDateLocator()
formatter = mdates.DateFormatter("%Y-%b")
ax.xaxis.set_major_locator(locator)
ax.xaxis.set_major_formatter(formatter)
fig.autofmt_xdate() # Rotate and align the tick labels
handles, labels = ax.get_legend_handles_labels()
handles2, labels2 = ax2.get_legend_handles_labels()
ax.legend(handles + handles2, labels + labels2, loc='upper right')
[docs] def plot_averages(self, data_dict):
"""
Plot the average of numbers for each key in the dictionary \
using a line plot with x-axis labels in the form of 10^x.
Args:
data_dict (dict): A dictionary where keys are labels \
and values are lists of numbers.
"""
# Compute averages
labels = sorted(
data_dict.keys(), key=float
) # Sort the keys by their float value
averages = [
sum(values) / len(values)
for key in labels
for values in [data_dict[key]]
]
# Plot
plt.plot(labels, averages, marker="o")
plt.xlabel("Lambda")
plt.ylabel("Balanced Accuracy")
plt.title("Lambda vs. Average BAC")
# Adjust x-axis labels to only show integer powers
int_powers = [
label for label in labels if np.log10(float(label)).is_integer()
]
plt.xticks(
int_powers,
[f"$10^{{{int(np.log10(float(label)))}}}$" for label in int_powers],
)
plt.grid(True, which="both", ls="--", c="0.8")
plt.show()