Spring-Mass System with Hybrid Digital Twinning¶
This Jupyter Notebook demonstrates how to use a Hybrid Digital Twinning model with a simple spring system as a toy example. The model considers mass (m) and stiffness (k) as variable parameters to showcase the twinning process.
System Overview¶
The spring-mass system follows Hooke’s Law and Newton’s Second Law:
$$ F = -k x $$
where:
- ( k ) is the spring stiffness,
- ( x ) is the displacement from equilibrium,
- ( m ) is the mass of the attached object.
Analytical Solution¶
For an ideal undamped system, the displacement at any given time step is given by:
$$ u(T) = u_0 \cos \left( \sqrt{\frac{k}{m}} T \right) + \frac{v_0}{\sqrt{\frac{k}{m}}} \sin \left( \sqrt{\frac{k}{m}} T \right) $$
where:
- ( u_0 ) is the initial displacement,
- ( v_0 ) is the initial velocity,
- ( k ) and ( m ) can change dynamically in the twinning process.
Hybrid Digital Twinning Approach¶
- The model updates parameters (( k, m )) dynamically.
- It integrates physical equations with real-time data.
- Simulations showcase how the digital twin adapts to variations over multiple time steps (from ( T = 0 ) to ( T = 10 )).
Visualizations¶
This notebook includes:
✅ System state at each time step from ( T = 0 ) to ( T = 10 ).
✅ Phase-space representation (velocity vs. displacement at each time step).
✅ 3D plots to visualize parameter influence over the entire range of time steps.
Imports¶
import sys
import os
parent_dir = os.path.abspath("../../libraries/surrogate_modelling")
sys.path.insert(0, parent_dir)
from distributions import *
from simparameter import SimParameter
from simparameter_set import SimParamSet
from surrogate_model import SurrogateModel
import pandas as pd
from digital_twin import DigitalTwin
import utils
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
Generate data¶
Function to calculate¶
def spring_solve(q_i, x0=1, v0=0, d=0, T=10):
T=T+1
# Solves the displacement of a damped spring-mass system at a given time step T.
m = q_i[:,0] # Mass values from input array
k = q_i[:,1] # Stiffness values from input array
alpha = d / m # Damping per unit mass
D = (k / m) - (alpha ** 2) # Adjusted natural frequency squared
omega = np.sqrt(D) # Damped angular frequency
# Compute displacement at time T using the damped harmonic motion formula
xt = np.exp(-alpha * T) * (x0 * np.cos(omega * T) + (v0 + alpha * x0) / omega * np.sin(omega * T))
xt = xt.reshape(-1, 1) # Ensure output is a column vector
return xt
# Example usage:
x0, v0, d, T = 1, 0, 0, 10
spring_solve(np.array([[2, 2]]), x0, v0, d, T)
array([[0.0044257]])
Create SimParameters and SimParameterSet¶
# Define simulation parameters for mass (m) and stiffness (k),
# each following a uniform distribution within a given range.
P1 = SimParameter('m', UniformDistribution(0.5, 2.5)) # Mass (m) sampled from a uniform distribution between 0.5 and 2.5
P2 = SimParameter('k', UniformDistribution(0.5, 2.5)) # Stiffness (k) sampled from a uniform distribution between 0.5 and 2.5
# Create a set to store simulation parameters.
Q = SimParamSet()
# Add the defined parameters (mass and stiffness) to the parameter set.
Q.add(P1)
Q.add(P2)
Sampling¶
# Generate samples for uncertain parameters
num_samples = 15000 # Number of samples to generate
Q_i = Q.sample(num_samples, method='QMC_Halton', random_seed=1997) # Sample the uncertain parameters using the Halton Quasi-Monte Carlo method
# Create a DataFrame from the generated samples for easier handling and visualization
Q_df = pd.DataFrame(Q_i, columns=Q.param_names()) # Convert the sampled parameters to a DataFrame
# Generate target values (Quantity of Interest) for each sample over multiple time steps
time_steps = 10 # Define the number of time steps to compute (from T=0 to T=10)
QoI_i = np.zeros((num_samples, time_steps)) # Initialize a matrix to store the QoI values for each sample and time step
# For each time step, solve the spring system and store the results in QoI_i
for time_step in range(time_steps):
# Solve the spring system for each sample at the current time step 'T'
QoI_i[:, time_step] = spring_solve(Q_i, T=time_step).reshape(-1) # Solve for the displacement at time 'T'
# QoI_names: Create labels for each time step (e.g., 't_0', 't_1', ..., 't_9')
QoI_names = [f"t_{i}" for i in range(time_steps)] # Create column names for the time steps
# Convert the QoI values into a DataFrame for easier analysis and visualization
QoI_df = pd.DataFrame(QoI_i, columns=QoI_names) # Store the QoI values (displacement over time) in a DataFrame
1 sample point¶
# Generate a single sample for the uncertain parameters (mass 'm' and stiffness 'k')
q_sample = Q.sample(1, random_seed=1) # Sample a single set of uncertain parameters (m and k), with a fixed random seed for reproducibility
# Generate the measurable response (Quantity of Interest) at each time step for the sampled parameters
qoi_sample = np.zeros((1, time_steps)) # Initialize an array to store the response values for each time step
for time_step in range(time_steps):
# Solve the spring system for the sampled parameters at each time step
qoi_sample[:, time_step] = spring_solve(q_sample, T=time_step).reshape(-1) # Solve and store the displacement for each time step
# Convert the sampled uncertain parameters into a DataFrame for easy visualization and further analysis
q_sample_df = pd.DataFrame(q_sample, columns=Q.param_names()) # Create a DataFrame from the sampled parameters, labeling columns with their respective names (e.g., 'm', 'k')
qoi_sample_df = pd.DataFrame(qoi_sample, columns=QoI_names) # Create a DataFrame for the QoI values over the time steps
Model¶
Choosing the Model¶
In this notebook, we implement the model using one of the following methods: gPCE, DNN, or GBT.
gPCE (Generalized Polynomial Chaos Expansion): Models uncertainty by expressing the output as a polynomial expansion, with coefficients determined by projecting the system's response onto orthogonal polynomials.
DNN (Deep Neural Networks): Machine learning models composed of interconnected neurons, capable of learning complex, non-linear patterns from data.
GBT (Gradient Boosting Trees): Combines weak decision trees into a strong predictive model, iteratively correcting errors from previous trees for robust performance.
method = "gPCE"
Set configurations
# Model configurations
match method:
# DNN model configurations
case "DNN":
config = {
'init_config' : {
'layers': [
{'neurons': 512, 'activation': 'relu', 'dropout': 0.2},
{'neurons': 256, 'activation': 'sigmoid', 'dropout': 0.2},
{'neurons': 256, 'activation': 'sigmoid', 'dropout': 0.2},
{'neurons': 128, 'activation': 'relu', 'dropout': None},
],
'outputAF': 'tanh'
},
'train_config' : {
'optimizer': 'Adam',
'loss': 'MSE',
'epochs': 100,
'batch_size': 32,
'k_fold': None,
'early_stopping': {
'patience': 25,
'min_delta': 0.0001}
}
}
# gPCE model configurations
case "gPCE":
config = {
'init_config' : {
'p' : 4
},
'train_config' : {
'k_fold': 9
}
}
# GBT model configurations
case "GBT":
config = {
'init_config' : {
'gbt_method': 'xgboost'
},
'train_config' : {
'max_depth': 6,
'num_of_iter': 250,
'k_fold': 9
}
}
split_config = {
'train_test_ratio': 0.2,
'random_seed': 1997,
'split_type': 'no_shuffle'
}
Define model¶
# Initialize surrogate model
# This creates an instance of the SurrogateModel class using the provided sampling data (Q),
model = SurrogateModel(Q, QoI_names, method, **config['init_config'])
# Split data into training and testing sets
model.train_test_split(Q_df, QoI_df, **split_config)
# Train the model
model.train(model.X_train, model.y_train, **config['train_config'])
----- Training started for 'gPCE' model ----- Fold 1/9 Fold 2/9 Fold 3/9 Fold 4/9 Fold 5/9 Fold 6/9 Fold 7/9 Fold 8/9 Fold 9/9 Average train loss: 0.09726606527760, Average valid loss: 0.09886207363781 ----- Training ended for 'gPCE' model -----
# get mean and variance of surrogate model
mean, var = model.get_mean_and_var()
mean, var
(tensor([ 0.4764, -0.3808, -0.6245, -0.3399, -0.0138, 0.1741, 0.1899, 0.0810,
-0.0093, -0.0230], dtype=torch.float64),
tensor([0.0827, 0.1848, 0.1951, 0.4172, 0.4543, 0.4202, 0.3161, 0.2680, 0.2853,
0.2138], dtype=torch.float64))
Calculate Sobol Sensitivities¶
# set max_index for Sobol sensitivity analysis
max_index = 2
partial_variance, sobol_index = model.get_sobol_sensitivity(max_index)
# Plot Sobol Sensitivity Index
# The 'plot_sobol_sensitivity' method generates a plot of Sobol Sensitivity indices, which quantify
# the contribution of each input parameter to the output uncertainty.
# - max_index: Specifies the maximum index of parameters to consider in the plot.
fig = model.plot_sobol_sensitivity(max_index=max_index)
Calculate SHAP values¶
# Calculate SHAP (Shapley Additive Explanations) values
# The 'get_shap_values' method computes the SHAP values for the model's test data (model.X_test).
# SHAP values explain the contribution of each input feature to the prediction for each test sample.
# - model.X_test.iloc[:100]: The first 100 samples from the test set are selected for SHAP value computation.
# SHAP values help to interpret the model's decisions and understand the impact of each input parameter on the output.
shap_values = await model.get_shap_values(model.X_test)
Message: sample size for shap values is set to 100.
0%| | 0/100 [00:00<?, ?it/s]
# Plot SHAP Single Waterfall Plot
# The 'plot_shap_single_waterfall' method generates a SHAP waterfall plot for a single test sample,
# visualizing how each feature contributes to the model's prediction for that sample.
# - q_sample_df: A DataFrame containing the sample data (e.g., a specific parameter set) for which the SHAP values are calculated.
# - param_name: The name of the quantity of interest (QoI) being analyzed ('t_1' in this case).
# The SHAP waterfall plot shows the cumulative effect of each feature on the model's prediction, helping to interpret individual predictions.
fig = await model.plot_shap_single_waterfall(q=q_sample_df, param_name='t_4')
0%| | 0/1 [00:00<?, ?it/s]
# Generate SHAP (Shapley values) waterfall plots for multiple features
# 'q_sample_df' contains the uncertain parameters sample, which will be passed as input to the SHAP model.
# The 'plot_shap_multiple_waterfalls' function will generate individual waterfall plots for each feature (parameter) in the sample.
fig = await model.plot_shap_multiple_waterfalls(q=q_sample_df)
0%| | 0/1 [00:00<?, ?it/s]
# Plot SHAP Beeswarm Plot
# The 'plot_shap_beeswarm' method generates a SHAP beeswarm plot, which visualizes the distribution of SHAP values
# for a range of test samples, showing the impact of each feature on the model’s predictions.
# - q: The test data (model.X_test.iloc[:100]) is selected for which SHAP values are calculated and visualized.
# - param_name: The name of the quantity of interest (QoI) for which SHAP values are computed ('t_1' in this case).
# The SHAP beeswarm plot shows how different input features influence the predictions across multiple samples,
# helping to understand the global feature importance and the relationships between features and the target.
fig = await model.plot_shap_beeswarm(param_name='t_4')
# Make a prediction using the surrogate model
# - q_sample_df: A DataFrame containing the input parameter values for which the model will generate a prediction.
model.predict(q_sample_df)
array([[ 0.35854904, -0.75747261, -0.86274116, 0.1521514 , 0.81406105,
0.41960657, -0.23382514, -0.46611543, -0.28440111, 0.05413414]])
Update¶
Synthetic measurement¶
# Generate synthetic measurement data for uncertain parameters (mass 'm' and stiffness 'k')
# Sample uncertain parameters (treated as synthetic measurements)
q_true = Q.sample(1, random_seed=1) # Sample a set of uncertain parameters
# Initialize an array to store system response values at each time step
qoi_true = np.zeros((1, time_steps))
# Solve spring system for each time step and store results
for time_step in range(time_steps):
qoi_true[:, time_step] = spring_solve(q_true, T=time_step).reshape(-1)
# Convert uncertain parameters and system response to DataFrames
q_true_df = pd.DataFrame(q_true, columns=Q.param_names()) # Parameters DataFrame
qoi_true_df = pd.DataFrame(qoi_true, columns=QoI_names) # System response DataFrame
# Define measurement error
s = 0.02 # Standard deviation for noise
sigma = (np.ones(time_steps) * s).reshape(1, -1) # Measurement error for each time step
sigma = pd.DataFrame(sigma, columns=QoI_names) # Convert to DataFrame
# Generate random noise based on sigma
E = utils.generate_stdrn_simparamset(sigma.values.flatten())
eps = E.sample(1) # Sample noise
# Add noise to true values to create synthetic measurement
qoi_m = qoi_true + eps # Noisy measurement
qoi_m_df = pd.DataFrame(qoi_m, columns=QoI_names) # Convert noisy measurements to DataFrame
print(q_true, qoi_true)
[[1.33404401 1.94064899]] [[ 0.35665238 -0.74559816 -0.8884911 0.11183323 0.96826227 0.57883286 -0.55537804 -0.97498666 -0.14008458 0.87506366]]
Set parameters¶
# For Update
# The standard deviation of the error is computed using the statistics from the QoI DataFrame
sigma = QoI_df.describe().loc['std'] / 2 # Adjust standard deviation for each parameter in the QoI_df
# Generate the simulation parameter set (random noise) based on the updated measurement error (sigma)
E = utils.generate_stdrn_simparamset(sigma.values.flatten()) # Generate random noise according to sigma values
# Set up the parameters for the Markov Chain Monte Carlo (MCMC) simulation
nwalkers = 64 # Number of walkers (parallel chains) in the MCMC process
nburn = 500 # Number of burn-in steps: these are discarded, as they help the MCMC chain reach equilibrium
niter = 100 # Total number of iterations for the MCMC process
# Initialize the Digital Twin model with the surrogate model and simulation parameter set (E)
DT = DigitalTwin(model, E) # Create a DigitalTwin instance that integrates the surrogate model and the random noise (E)
# Update the Digital Twin model with the synthetic measurement data and run MCMC sampling
DT.update(qoi_m_df, nwalkers=nwalkers, nburn=nburn, niter=niter)
# The 'update' method updates the Digital Twin using the synthetic measurement data (qoi_m_df)
# and performs MCMC sampling with the specified parameters (number of walkers, burn-in steps, and iterations)
MCMC creating Burning period
100%|██████████| 500/500 [00:24<00:00, 20.46it/s]
MCMC running
100%|██████████| 100/100 [00:05<00:00, 19.00it/s]
--- 29.752365827560425 seconds ---
# Get the posterior mean and variance from the Digital Twin model
mean_of_post, variance_of_post = DT.get_mean_and_var_of_posterior()
# This returns the mean and variance of the posterior distribution after updating the model with data.
# Get the Maximum A Posteriori (MAP) estimate of the parameters
map = DT.get_MAP()
# This retrieves the most likely parameter values based on the updated model.
# Plot the MCMC sampling results along with the MAP and true parameter values
fig = utils.plot_MCMC(model, DT, nwalkers=nwalkers, map_point=map)