Implementing custom sampler for Stochastic objects#
The Working with Stochastic objects documentation teaches how to work with stochastic objects, discussing the standard initializations, how to create objects and interpret outputs. Our goal here is to show how to build a custom sampler that gives complete control of the distributions used.
Custom Sampler#
Rocketpy provides a CustomSampler
abstract class which works as the backbone of
a custom sampler. We begin by first importing it and some other useful modules
from rocketpy import CustomSampler
from matplotlib import pyplot as plt
import numpy as np
In order to use it, we must create a new class that inherits from
it and it must implement two methods: sample and reset_seed. The sample
method has one argument, n_samples, and must return a list with n_samples
entries, each of which is a sample of the desired variable. The reset_seed method
has one argument, seed, which is used to reset the pseudorandom generators in order
to avoid unwanted dependency across samples. This is especially important when the
MonteCarlo
class is used in parallel mode.
Below, we give an example of how to implement a mixture of two Gaussian distributions.
class TwoGaussianMixture(CustomSampler):
"""Class to sample from a mixture of two Gaussian distributions
"""
def __init__(self, means_tuple, sd_tuple, prob_tuple, seed = None):
""" Creates a sampler for a mixture of two Gaussian distributions
Parameters
----------
means_tuple : 2-tuple
2-Tuple that contains the mean of each normal distribution of the
mixture
sd_tuple : 2-tuple
2-Tuple that contains the sd of each normal distribution of the
mixture
prob_tuple : 2-tuple
2-Tuple that contains the probability of each normal distribution
of the mixture. Its entries should be non-negative and sum up to 1.
"""
np.random.default_rng(seed)
self.means_tuple = means_tuple
self.sd_tuple = sd_tuple
self.prob_tuple = prob_tuple
def sample(self, n_samples = 1):
"""Samples from a mixture of two Gaussian
Parameters
----------
n_samples : int, optional
Number of samples, by default 1
Returns
-------
samples_list
List containing n_samples samples
"""
samples_list = [0] * n_samples
mixture_id_list = np.random.binomial(1, self.prob_tuple[0], n_samples)
for i, mixture_id in enumerate(mixture_id_list):
if mixture_id:
samples_list[i] = np.random.normal(self.means_tuple[0], self.sd_tuple[0])
else:
samples_list[i] = np.random.normal(self.means_tuple[1], self.sd_tuple[1])
return samples_list
def reset_seed(self, seed=None):
"""Resets all associated random number generators
Parameters
----------
seed : int, optional
Seed for the random number generator.
"""
np.random.default_rng(seed)
This is an example of a distribution that is not implemented in numpy. Note that it is a general distribution, so we can use it for many different variables.
Note
You can check more information about the mixture of Gaussian distributions here <https://en.wikipedia.org/wiki/Mixture_model#Gaussian_mixture_model>. Intuitively, if you think of a single Gaussian as a bell curve distribution, the mixture distribution resembles two bell curves superimposed, each with mode at their respective mean.
Example: Gaussian Mixture for Total Impulse#
In order to use the new created sampler in a stochastic object, we first need to build an object. In this example, we will consider a case where the distribution of the total impulse is a mixture of two gaussian with mean parameters \((6000, 7000)\), standard deviations \((300, 100)\) and mixture probabilities \((0.7, 0.3)\).
means_tuple = (6000, 7000)
sd_tuple = (300, 100)
prob_tuple = (0.7, 0.3)
total_impulse_sampler = TwoGaussianMixture(means_tuple, sd_tuple, prob_tuple)
Finally, we can create StochasticSolidMotor
object as we did in the example of
Working with Stochastic objects, but we pass the sampler object instead for the total_impulse
argument
from rocketpy import SolidMotor, StochasticSolidMotor
motor = SolidMotor(
thrust_source="../data/motors/cesaroni/Cesaroni_M1670.eng",
dry_mass=1.815,
dry_inertia=(0.125, 0.125, 0.002),
nozzle_radius=33 / 1000,
grain_number=5,
grain_density=1815,
grain_outer_radius=33 / 1000,
grain_initial_inner_radius=15 / 1000,
grain_initial_height=120 / 1000,
grain_separation=5 / 1000,
grains_center_of_mass_position=0.397,
center_of_dry_mass_position=0.317,
nozzle_position=0,
burn_time=3.9,
throat_radius=11 / 1000,
coordinate_system_orientation="nozzle_to_combustion_chamber",
)
stochastic_motor = StochasticSolidMotor(
solid_motor=motor,
burn_start_time=(0, 0.1, "binomial"),
grains_center_of_mass_position=0.001,
grain_density=10,
grain_separation=1 / 1000,
grain_initial_height=1 / 1000,
grain_initial_inner_radius=0.375 / 1000,
grain_outer_radius=0.375 / 1000,
throat_radius=0.5 / 1000,
nozzle_radius=0.5 / 1000,
nozzle_position=0.001,
total_impulse=total_impulse_sampler, # total impulse using custom sampler!
)
stochastic_motor.visualize_attributes()
Reporting the attributes of the `StochasticSolidMotor` object:
Constant Attributes:
burn_out_time 3.9
center_of_dry_mass_position 0.317
coordinate_system_orientation nozzle_to_combustion_chamber
dry_I_11 0.125
dry_I_12 0
dry_I_13 0
dry_I_22 0.125
dry_I_23 0
dry_I_33 0.002
dry_mass 1.815
grain_number 5
interpolate linear
thrust_source [[0, 0], [0.055, 100.0], [0.092, 1500.0], [0.1, 2000.0], [0.15, 2200.0], [0.2, 1800.0], [0.5, 1950.0], [1.0, 2034.0], [1.5, 2000.0], [2.0, 1900.0], [2.5, 1760.0], [2.9, 1700.0], [3.0, 1650.0], [3.3, 530.0], [3.4, 350.0], [3.9, 0.0]]
Stochastic Attributes:
burn_start_time 0.00000 ± 0.10000 (binomial)
grain_density 1815.00000 ± 10.00000 (normal)
grain_initial_height 0.12000 ± 0.00100 (normal)
grain_initial_inner_radius 0.01500 ± 0.00038 (normal)
grain_outer_radius 0.03300 ± 0.00038 (normal)
grain_separation 0.00500 ± 0.00100 (normal)
grains_center_of_mass_position 0.39700 ± 0.00100 (normal)
nozzle_position 0.00000 ± 0.00100 (normal)
nozzle_radius 0.03300 ± 0.00050 (normal)
throat_radius 0.01100 ± 0.00050 (normal)
Stochastic Attributes with Custom user samplers:
total_impulse TwoGaussianMixture
Let’s generate some random motors and check the distribution of the total impulse
total_impulse_samples = [
stochastic_motor.create_object().total_impulse for _ in range(200)
]
plt.hist(total_impulse_samples, density = True, bins = 30)
(array([6.87965055e-05, 0.00000000e+00, 1.37593011e-04, 1.37593011e-04,
4.12779033e-04, 2.75186022e-04, 5.50372044e-04, 3.43982528e-04,
4.81575539e-04, 8.25558066e-04, 6.19168550e-04, 5.50372044e-04,
8.25558066e-04, 1.03194758e-03, 8.25558066e-04, 5.50372044e-04,
5.50372044e-04, 6.87965055e-05, 4.81575539e-04, 3.43982528e-04,
1.37593011e-04, 6.87965055e-05, 2.06389517e-04, 1.37593011e-04,
7.56761561e-04, 1.51352312e-03, 8.25558066e-04, 7.56761561e-04,
2.06389517e-04, 6.87965055e-05]),
array([5095.27292783, 5167.95103787, 5240.62914791, 5313.30725796,
5385.985368 , 5458.66347804, 5531.34158808, 5604.01969813,
5676.69780817, 5749.37591821, 5822.05402826, 5894.7321383 ,
5967.41024834, 6040.08835839, 6112.76646843, 6185.44457847,
6258.12268852, 6330.80079856, 6403.4789086 , 6476.15701864,
6548.83512869, 6621.51323873, 6694.19134877, 6766.86945882,
6839.54756886, 6912.2256789 , 6984.90378895, 7057.58189899,
7130.26000903, 7202.93811908, 7275.61622912]),
<BarContainer object of 30 artists>)

Introducing dependency between parameters#
Although probabilistic independency between samples, i.e. different flight runs, is desired for Monte Carlo simulations, it is often important to be able to introduce dependency between parameters. A clear example of this is wind speed: if we know the wind speed in the x-axis, then our forecast model might tells us that the wind speed y-axis is more likely to be positive than negative, or vice-versa. These parameters are then correlated, and using samplers that model these correlations make the Monte Carlo analysis more robust.
When we use the default numpy samplers, the Monte Carlo analysis samples the parameters independently from each other. However, using custom samplers, we can introduce dependency and correlation! It might be a bit tricky, but we will show how it can be done. First, let us import the modules required
from rocketpy import Environment, StochasticEnvironment
from datetime import datetime, timedelta
Assume we want to model the x and y axis wind speed as a Bivariate Gaussian with parameters \(\mu = (1, 1)\) and variance-covariance matrix \(\Sigma = \begin{bmatrix} 0.2 & 0.17 \\ 0.17 & 0.3 \end{bmatrix}\). This will make the correlation between the speeds be of \(0.7\).
Now, in order to correlate the parameters using different custom samplers, the key trick is to create a common generator that will be used by both. The code below implements an example of such a generator
class BivariateGaussianGenerator:
"""Bivariate Gaussian generator used across custom samplers
"""
def __init__(self, mean, cov, seed = None):
"""Initializes the generator
Parameters
----------
mean : tuple, list
Tuple or list with mean of bivariate Gaussian
cov : np.array
Variance-Covariance matrix
seed : int, optional
Number to seed random generator, by default None
"""
np.random.default_rng(seed)
self.samples_list = []
self.samples_generated = 0
self.used_samples_x = 0
self.used_samples_y = 0
self.mean = mean
self.cov = cov
self.generate_samples(1000)
def generate_samples(self, n_samples = 1):
"""Generate samples from bivariate Gaussian and append to sample list
Parameters
----------
n_samples : int, optional
Number of samples to be generated, by default 1
"""
samples_generated = np.random.multivariate_normal(self.mean, self.cov, n_samples)
self.samples_generated += n_samples
self.samples_list += list(samples_generated)
def reset_seed(self, seed=None):
np.random.default_rng(seed)
def get_samples(self, n_samples, axis):
if axis == "x":
if self.samples_generated < self.used_samples_x:
self.generate_samples(n_samples)
samples_list = [
sample[0] for sample in self.samples_list[self.used_samples_x:(self.used_samples_x + n_samples)]
]
if axis == "y":
if self.samples_generated < self.used_samples_y:
self.generate_samples(n_samples)
samples_list = [
sample[1] for sample in self.samples_list[self.used_samples_y:(self.used_samples_y + n_samples)]
]
self.update_info(n_samples, axis)
return samples_list
def update_info(self, n_samples, axis):
"""Updates the information of the used samples
Parameters
----------
n_samples : int
Number of samples used
axis : str
Which axis was sampled
"""
if axis == "x":
self.used_samples_x += n_samples
if axis == "y":
self.used_samples_y += n_samples
This generator samples from the bivariate Gaussian and stores then in a samples_list attribute. Then, the idea is to create two samplers for the wind speeds that will share an object of this class and their sampling methods only get samples from the stored sample list.
class WindXSampler(CustomSampler):
"""Samples from X"""
def __init__(self, bivariate_gaussian_generator):
self.generator = bivariate_gaussian_generator
def sample(self, n_samples=1):
samples_list = self.generator.get_samples(n_samples, "x")
return samples_list
def reset_seed(self, seed=None):
self.generator.reset_seed(seed)
class WindYSampler(CustomSampler):
"""Samples from Y"""
def __init__(self, bivariate_gaussian_generator):
self.generator = bivariate_gaussian_generator
def sample(self, n_samples=1):
samples_list = self.generator.get_samples(n_samples, "y")
return samples_list
def reset_seed(self, seed=None):
self.generator.reset_seed(seed)
Then, we create the objects
mean = [1, 2]
cov_mat = [[0.2, 0.171], [0.171, 0.3]]
bivariate_gaussian_generator = BivariateGaussianGenerator(mean, cov_mat)
wind_x_sampler = WindXSampler(bivariate_gaussian_generator)
wind_y_sampler = WindYSampler(bivariate_gaussian_generator)
With the sample objects created, we only need to create the stochastic objects and pass them as argument
spaceport_env = Environment(
latitude=32.990254,
longitude=-106.974998,
elevation=1400,
datum="WGS84",
)
spaceport_env.set_atmospheric_model("custom_atmosphere", wind_u = 1, wind_v = 1)
spaceport_env.set_date(datetime.now() + timedelta(days=1))
stochastic_environment = StochasticEnvironment(
environment=spaceport_env,
elevation=(1400, 10, "normal"),
gravity=None,
latitude=None,
longitude=None,
ensemble_member=None,
wind_velocity_x_factor=wind_x_sampler,
wind_velocity_y_factor=wind_y_sampler
)
Finally, let us check that if there is a correlation between the wind speed in the two axis
wind_velocity_x_samples = [0] * 200
wind_velocity_y_samples = [0] * 200
for i in range(200):
stochastic_environment.create_object()
wind_velocity_x_samples[i] = stochastic_environment.obj.wind_velocity_x(0)
wind_velocity_y_samples[i] = stochastic_environment.obj.wind_velocity_y(0)
np.corrcoef(wind_velocity_x_samples, wind_velocity_y_samples)
array([[1. , 0.72727534],
[0.72727534, 1. ]])