Source code for rocketpy.EnvironmentAnalysis

# -*- coding: utf-8 -*-

__author__ = "Patrick Sampaio, Giovani Hidalgo Ceotto, Guilherme Fernandes Alves, Franz Masatoshi Yuri, Mateus Stano Junqueira,"
__copyright__ = "Copyright 20XX, RocketPy Team"
__license__ = "MIT"

import bisect
import datetime
import json
import warnings
from collections import defaultdict

import ipywidgets as widgets
import jsonpickle
import matplotlib.ticker as mtick
import netCDF4
import numpy as np
import pytz
from cftime import num2pydate
from IPython.display import HTML
from matplotlib import pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.animation import PillowWriter as ImageWriter
from scipy import stats
from windrose import WindroseAxes
from rocketpy.Environment import Environment

from rocketpy.Function import Function
from rocketpy.units import convert_units


[docs]class EnvironmentAnalysis: """Class for analyzing the environment. List of properties currently implemented: - average max/min temperature at surface level - record max/min temperature at surface level - temperature progression throughout the day - temperature profile over an average day - average max wind gust at surface level - record max wind gust at surface level - average, 1, 2, 3 sigma wind profile - average day wind rose - animation of how average wind rose evolves throughout an average day - animation of how wind profile evolves throughout an average day - pressure profile over an average day - wind velocity x profile over average day - wind velocity y profile over average day - wind speed profile over an average day - average max surface 100m wind speed - average max surface 10m wind speed - average min surface 100m wind speed - average min surface 10m wind speed - average sustained surface100m wind along day - average sustained surface10m wind along day - maximum surface 10m wind speed - average cloud base height - percentage of days with no cloud coverage - percentage of days with precipitation You can also visualize all those attributes by exploring some of the methods: - plot of wind gust distribution (should be Weibull) - plot wind profile over average day - plot sustained surface wind speed distribution over average day - plot wind gust distribution over average day - plot average day wind rose all hours - plot average day wind rose specific hour - plot average pressure profile - plot average surface10m wind speed along day - plot average sustained surface100m wind speed along day - plot average temperature along day - plot average wind speed profile - plot surface10m wind speed distribution - animate wind profile over average day - animate sustained surface wind speed distribution over average day - animate wind gust distribution over average day - animate average wind rose - animation of who wind gust distribution evolves over average day - allInfo All items listed are relevant to either 1. participant safety 2. launch operations (range closure decision) 3. rocket performance How does this class work? - The class is initialized with a start_date, end_date, start_hour and end_hour. - The class then parses the weather data from the start date to the end date. Always parsing the data from start_hour to end_hour. - The class then calculates the average max/min temperature, average max wind gust, and average day wind rose. - The class then allows for plotting the average max/min temperature, average max wind gust, and average day wind rose. Remaining TODOs: - Make 'windSpeedLimit' a dynamic/flexible variable """ def __init__( self, start_date, end_date, latitude, longitude, start_hour=0, end_hour=24, surfaceDataFile=None, pressureLevelDataFile=None, timezone=None, unit_system="metric", forecast_date=None, forecast_args=None, maxExpectedAltitude=None, ): """Constructor for the EnvironmentAnalysis class. Parameters ---------- start_date : datetime.datetime Start date and time of the analysis. When parsing the weather data from the source file, only data after this date will be parsed. end_date : datetime.datetime End date and time of the analysis. When parsing the weather data from the source file, only data before this date will be parsed. latitude : float Latitude coordinate of the location where the analysis will be carried out. longitude : float Longitude coordinate of the location where the analysis will be carried out. start_hour : int, optional Starting hour of the analysis. When parsing the weather data from the source file, only data after this hour will be parsed. end_hour : int, optional End hour of the analysis. When parsing the weather data from the source file, only data before this hour will be parsed. surfaceDataFile : str, optional Path to the netCDF file containing the surface data. pressureLevelDataFile : str, optional Path to the netCDF file containing the pressure level data. timezone : str, optional Name of the timezone to be used when displaying results. To see all available time zones, import pytz and run print(pytz.all_timezones). Default time zone is the local time zone at the latitude and longitude specified. unit_system : str, optional Unit system to be used when displaying results. Options are: SI, metric, imperial. Default is metric. forecast_date : datetime.date, optional Date for the forecast models. It will be requested the environment forecast for multiple hours within that specified date. forecast_args : dictionary, optional Arguments for setting the forecast on the Environment class. With this argument it is possible to change the forecast model being used. maxExpectedAltitude : float, optional Maximum expected altitude for your analysis. This is used to calculate plot limits from pressure level data profiles. If None is set, the maximum altitude will be calculated from the pressure level data. Default is None. Returns ------- None """ warnings.warn( "Please notice this class is still under development, and some features may not work as expected as they were not exhaustively tested yet. In case of having any trouble, please raise an issue at https://github.com/RocketPy-Team/RocketPy/issues" ) # Save inputs self.start_date = start_date self.end_date = end_date self.start_hour = start_hour self.end_hour = end_hour self.latitude = latitude self.longitude = longitude self.surfaceDataFile = surfaceDataFile self.pressureLevelDataFile = pressureLevelDataFile self.preferred_timezone = timezone self.unit_system = unit_system self.maxExpectedAltitude = maxExpectedAltitude # Manage units and timezones self.__init_data_parsing_units() self.__find_preferred_timezone() self.__localize_input_dates() # Parse data files, surface goes first to calculate elevation self.surfaceDataDict = {} self.parseSurfaceData() self.pressureLevelDataDict = {} self.parsePressureLevelData() # Convert units self.set_unit_system(unit_system) # Initialize result variables self.average_max_temperature = 0 self.average_min_temperature = 0 self.record_max_temperature = 0 self.record_min_temperature = 0 self.average_max_wind_gust = 0 self.maximum_wind_gust = 0 self.wind_gust_distribution = None self.average_temperature_along_day = Function(0) self.average_wind_profile = Function(0) self.average_wind_profile_at_given_hour = None self.average_wind_heading_profile = Function(0) self.average_wind_heading_profile_at_given_hour = Function(0) self.max_wind_speed = None self.min_wind_speed = None self.wind_speed_per_hour = None self.wind_direction_per_hour = None # Run calculations self.process_data() # Processing forecast self.forecast = None if forecast_date: self.forecast = {} hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: hour_datetime = datetime.datetime( year=forecast_date.year, month=forecast_date.month, day=forecast_date.day, hour=int(hour), ) Env = Environment( railLength=5, date=hour_datetime, latitude=self.latitude, longitude=self.longitude, elevation=self.elevation, ) forecast_args = forecast_args or {"type": "Forecast", "file": "GFS"} Env.setAtmosphericModel(**forecast_args) self.forecast[hour] = Env def __bilinear_interpolation(self, x, y, x1, x2, y1, y2, z11, z12, z21, z22): """ Bilinear interpolation. Source: GitHub Copilot """ return ( z11 * (x2 - x) * (y2 - y) + z21 * (x - x1) * (y2 - y) + z12 * (x2 - x) * (y - y1) + z22 * (x - x1) * (y - y1) ) / ((x2 - x1) * (y2 - y1)) def __init_surface_dictionary(self): # Create dictionary of file variable names to process surface data self.surfaceFileDict = { "surface100mWindVelocityX": "u100", "surface100mWindVelocityY": "v100", "surface10mWindVelocityX": "u10", "surface10mWindVelocityY": "v10", "surfaceTemperature": "t2m", "cloudBaseHeight": "cbh", "surfaceWindGust": "i10fg", "surfacePressure": "sp", "totalPrecipitation": "tp", } def __init_pressure_level_dictionary(self): # Create dictionary of file variable names to process pressure level data self.pressureLevelFileDict = { "geopotential": "z", "windVelocityX": "u", "windVelocityY": "v", "temperature": "t", } def __getNearestIndex(self, array, value): """Find nearest index of the given value in the array. Made for latitudes and longitudes, supporting arrays that range from -180 to 180 or from 0 to 360. TODO: improve docs by giving one example Parameters ---------- array : array value : float Returns ------- index : int """ # Create value convention if np.min(array) < 0: # File uses range from -180 to 180, make sure value follows convention value = value if value < 180 else value % 180 - 180 # Example: 190 => -170 else: # File probably uses range from 0 to 360, make sure value follows convention value = value % 360 # Example: -10 becomes 350 # Find index if array[0] < array[-1]: # Array is sorted correctly, find index # Deal with sorted array index = bisect.bisect(array, value) else: # Array is reversed, no big deal, just bisect reversed one and subtract length index = len(array) - bisect.bisect_left(array[::-1], value) # Apply fix if index == len(array) and array[index - 1] == value: # If value equal the last array entry, fix to avoid being considered out of grid index = index - 1 return index def __timeNumToDateString(self, timeNum, units, calendar="gregorian"): """Convert time number (usually hours before a certain date) into two strings: one for the date (example: 2022.04.31) and one for the hour (example: 14). See cftime.num2date for details on units and calendar. Automatically converts time number from UTC to local timezone based on lat,lon coordinates. """ dateTimeUTC = num2pydate(timeNum, units, calendar=calendar) dateTimeUTC = dateTimeUTC.replace(tzinfo=pytz.UTC) dateTime = dateTimeUTC.astimezone(self.preferred_timezone) dateString = f"{dateTime.year}.{dateTime.month}.{dateTime.day}" hourString = f"{dateTime.hour}" return dateString, hourString, dateTime def __extractSurfaceDataValue( self, surfaceData, variable, indices, lonArray, latArray ): """Extract value from surface data netCDF4 file. Performs bilinear interpolation along longitude and latitude.""" timeIndex, lonIndex, latIndex = indices variableData = surfaceData[variable] # Get values for variable on the four nearest points z11 = variableData[timeIndex, lonIndex - 1, latIndex - 1] z12 = variableData[timeIndex, lonIndex - 1, latIndex] z21 = variableData[timeIndex, lonIndex, latIndex - 1] z22 = variableData[timeIndex, lonIndex, latIndex] # Compute interpolated value on desired lat lon pair value = self.__bilinear_interpolation( x=self.longitude, y=self.latitude, x1=lonArray[lonIndex - 1], x2=lonArray[lonIndex], y1=latArray[latIndex - 1], y2=latArray[latIndex], z11=z11, z12=z12, z21=z21, z22=z22, ) return value def __extractPressureLevelDataValue( self, pressureLevelData, variable, indices, lonArray, latArray ): """Extract value from surface data netCDF4 file. Performs bilinear interpolation along longitude and latitude.""" timeIndex, lonIndex, latIndex = indices variableData = pressureLevelData[variable] # Get values for variable on the four nearest points z11 = variableData[timeIndex, :, lonIndex - 1, latIndex - 1] z12 = variableData[timeIndex, :, lonIndex - 1, latIndex] z21 = variableData[timeIndex, :, lonIndex, latIndex - 1] z22 = variableData[timeIndex, :, lonIndex, latIndex] # Compute interpolated value on desired lat lon pair value_list_as_a_function_of_pressure_level = self.__bilinear_interpolation( x=self.longitude, y=self.latitude, x1=lonArray[lonIndex - 1], x2=lonArray[lonIndex], y1=latArray[latIndex - 1], y2=latArray[latIndex], z11=z11, z12=z12, z21=z21, z22=z22, ) return value_list_as_a_function_of_pressure_level def __compute_height_above_sea_level(self, geopotential): """Compute height above sea level from geopotential. Source: https://en.wikipedia.org/wiki/Geopotential """ R = 63781370 # Earth radius in m g = 9.80665 # Gravity acceleration in m/s^2 geopotential_height = geopotential / g return R * geopotential_height / (R - geopotential_height) def __compute_height_above_ground_level(self, geopotential, elevation): """Compute height above ground level from geopotential and elevation.""" return self.__compute_height_above_sea_level(geopotential) - elevation def __check_coordinates_inside_grid(self, lonIndex, latIndex, lonArray, latArray): if ( lonIndex == 0 or lonIndex > len(lonArray) - 1 or latIndex == 0 or latIndex > len(latArray) - 1 ): raise ValueError( f"Latitude and longitude pair {(self.latitude, self.longitude)} is outside the grid available in the given file, which is defined by {(latArray[0], lonArray[0])} and {(latArray[-1], lonArray[-1])}." ) def __localize_input_dates(self): if self.start_date.tzinfo is None: self.start_date = self.preferred_timezone.localize(self.start_date) if self.end_date.tzinfo is None: self.end_date = self.preferred_timezone.localize(self.end_date) def __find_preferred_timezone(self): if self.preferred_timezone is None: try: import timezonefinder as TimezoneFinder except ImportError: raise ImportError( "The timezonefinder package is required to automatically " + "determine local timezone based on lat,lon coordinates. " + "Please specify the desired timezone using the `timezone` " + "argument when initializing the EnvironmentAnalysis class " + "or install timezonefinder with `pip install timezonefinder`." ) # Use local timezone based on lat lon pair tf = TimezoneFinder() self.preferred_timezone = pytz.timezone( tf.timezone_at(lng=self.longitude, lat=self.latitude) ) elif isinstance(self.preferred_timezone, str): self.preferred_timezone = pytz.timezone(self.preferred_timezone) def __init_data_parsing_units(self): """Define units for pressure level and surface data parsing""" self.current_units = { "height_ASL": "m", "pressure": "hPa", "temperature": "K", "windDirection": "deg", "windHeading": "deg", "windSpeed": "m/s", "windVelocityX": "m/s", "windVelocityY": "m/s", "surface100mWindVelocityX": "m/s", "surface100mWindVelocityY": "m/s", "surface10mWindVelocityX": "m/s", "surface10mWindVelocityY": "m/s", "surfaceTemperature": "K", "cloudBaseHeight": "m", "surfaceWindGust": "m/s", "surfacePressure": "Pa", "totalPrecipitation": "m", } # Create a variable to store updated units when units are being updated self.updated_units = self.current_units.copy() def __init_unit_system(self): """Initialize preferred units for output (SI, metric or imperial).""" if self.unit_system_string == "metric": self.unit_system = { "length": "m", "velocity": "m/s", "acceleration": "g", "mass": "kg", "time": "s", "pressure": "hPa", "temperature": "degC", "angle": "deg", "precipitation": "mm", "wind_speed": "m/s", } elif self.unit_system_string == "imperial": self.unit_system = { "length": "ft", "velocity": "mph", "acceleration": "ft/s^2", "mass": "lb", "time": "s", "pressure": "inHg", "temperature": "degF", "angle": "deg", "precipitation": "in", "wind_speed": "mph", } else: # Default to SI self.unit_system = { "length": "m", "velocity": "m/s", "acceleration": "m/s^2", "mass": "kg", "time": "s", "pressure": "Pa", "temperature": "K", "angle": "rad", "precipitation": "m", "wind_speed": "m/s", } def set_unit_system(self, unit_system="metric"): self.unit_system_string = unit_system self.__init_unit_system() self.convertPressureLevelData() self.convertSurfaceData() self.current_units = self.updated_units.copy() @staticmethod def _find_two_closest_integer_factors(number): """Find the two closest integer factors of a number. Parameters ---------- number: int Returns ------- list[int] """ number_sqrt = number**0.5 if isinstance(number_sqrt, int): return number_sqrt, number_sqrt else: guess = int(number_sqrt) while True: if number % guess == 0: return guess, number // guess else: guess -= 1 def _beaufort_wind_scale(self, units, max_wind_speed=None): """Returns a list of bins equivalent to the Beaufort wind scale in the desired unit system. Parameters ---------- units: str Desired units for wind speed. Options are: "knot", "mph", "m/s", "ft/s: and "km/h". max_wind_speed: float Maximum wind speed to be included in the scale. Should be expressed in the same unit as the units parameter. Returns ------- list[float] """ beaufort_wind_scale_knots = np.array( [0, 1, 3, 6, 10, 16, 21, 27, 33, 40, 47, 55, 63, 71] ) beaufort_wind_scale = beaufort_wind_scale_knots * convert_units( 1, "knot", units ) beaufort_wind_scale_truncated = beaufort_wind_scale[ np.where(beaufort_wind_scale <= max_wind_speed) ] if beaufort_wind_scale[1] < 1: return np.round(beaufort_wind_scale_truncated, 1) else: return np.round(beaufort_wind_scale_truncated, 0)
[docs] def parsePressureLevelData(self): """ Parse pressure level data from a weather file. Sources of information: - https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-pressure-levels?tab=form Must get the following variables from a ERA5 file: - Geopotential - U-component of wind - V-component of wind - Temperature Must compute the following for each date and hour available in the dataset: - pressure = Function(..., inputs="Height Above Sea Level (m)", outputs="Pressure (Pa)") - temperature = Function(..., inputs="Height Above Sea Level (m)", outputs="Temperature (K)") - windDirection = Function(..., inputs="Height Above Sea Level (m)", outputs="Wind Direction (Deg True)") - windHeading = Function(..., inputs="Height Above Sea Level (m)", outputs="Wind Heading (Deg True)") - windSpeed = Function(..., inputs="Height Above Sea Level (m)", outputs="Wind Speed (m/s)") - windVelocityX = Function(..., inputs="Height Above Sea Level (m)", outputs="Wind Velocity X (m/s)") - windVelocityY = Function(..., inputs="Height Above Sea Level (m)", outputs="Wind Velocity Y (m/s)") Return a dictionary with all the computed data with the following structure: pressureLevelDataDict: { "date" : { "hour": { "data": ..., "data": ... }, "hour": { "data": ..., "data": ... } }, "date" : { "hour": { "data": ..., "data": ... }, "hour": { "data": ..., "data": ... } } } """ # Setup dictionary used to read weather file self.__init_pressure_level_dictionary() # Read weather file pressureLevelData = netCDF4.Dataset(self.pressureLevelDataFile) # Get time, pressure levels, latitude and longitude data from file timeNumArray = pressureLevelData.variables["time"] pressureLevelArray = pressureLevelData.variables["level"] lonArray = pressureLevelData.variables["longitude"] latArray = pressureLevelData.variables["latitude"] # Determine latitude and longitude range for pressure level file self.pressureLevelInitLat = latArray[0] self.pressureLevelEndLat = latArray[-1] self.pressureLevelInitLon = lonArray[0] self.pressureLevelEndLon = lonArray[-1] # Find index needed for latitude and longitude for specified location lonIndex = self.__getNearestIndex(lonArray, self.longitude) latIndex = self.__getNearestIndex(latArray, self.latitude) # Can't handle lat and lon out of grid self.__check_coordinates_inside_grid(lonIndex, latIndex, lonArray, latArray) # Loop through time and save all values for timeIndex, timeNum in enumerate(timeNumArray): dateString, hourString, dateTime = self.__timeNumToDateString( timeNum, timeNumArray.units, calendar="gregorian" ) # Check if date is within analysis range if not (self.start_date <= dateTime < self.end_date): continue if not (self.start_hour <= dateTime.hour < self.end_hour): continue # Make sure keys exist if dateString not in self.pressureLevelDataDict: self.pressureLevelDataDict[dateString] = {} if hourString not in self.pressureLevelDataDict[dateString]: self.pressureLevelDataDict[dateString][hourString] = {} # Extract data from weather file indices = (timeIndex, lonIndex, latIndex) # Retrieve geopotential first and compute altitudes geopotentialArray = self.__extractPressureLevelDataValue( pressureLevelData, self.pressureLevelFileDict["geopotential"], indices, lonArray, latArray, ) heightAboveSeaLevelArray = self.__compute_height_above_ground_level( geopotentialArray, self.elevation ) # Loop through wind components and temperature, get value and convert to Function for key, value in self.pressureLevelFileDict.items(): valueArray = self.__extractPressureLevelDataValue( pressureLevelData, value, indices, lonArray, latArray ) variablePointsArray = np.array([heightAboveSeaLevelArray, valueArray]).T variableFunction = Function( variablePointsArray, inputs="Height Above Ground Level (m)", # TODO: Check if it is really AGL or ASL here, see 3 lines above outputs=key, extrapolation="constant", ) self.pressureLevelDataDict[dateString][hourString][ key ] = variableFunction # Create function for pressure levels pressurePointsArray = np.array( [heightAboveSeaLevelArray, pressureLevelArray] ).T pressureFunction = Function( pressurePointsArray, inputs="Height Above Sea Level (m)", outputs="Pressure (Pa)", extrapolation="constant", ) self.pressureLevelDataDict[dateString][hourString][ "pressure" ] = pressureFunction # Create function for wind speed levels windVelocityXArray = self.__extractPressureLevelDataValue( pressureLevelData, self.pressureLevelFileDict["windVelocityX"], indices, lonArray, latArray, ) windVelocityYArray = self.__extractPressureLevelDataValue( pressureLevelData, self.pressureLevelFileDict["windVelocityY"], indices, lonArray, latArray, ) windSpeedArray = np.sqrt( np.square(windVelocityXArray) + np.square(windVelocityYArray) ) windSpeedPointsArray = np.array( [heightAboveSeaLevelArray, windSpeedArray] ).T windSpeedFunction = Function( windSpeedPointsArray, inputs="Height Above Sea Level (m)", outputs="Wind Speed (m/s)", extrapolation="constant", ) self.pressureLevelDataDict[dateString][hourString][ "windSpeed" ] = windSpeedFunction # Create function for wind heading levels windHeadingArray = ( np.arctan2(windVelocityXArray, windVelocityYArray) * (180 / np.pi) % 360 ) windHeadingPointsArray = np.array( [heightAboveSeaLevelArray, windHeadingArray] ).T windHeadingFunction = Function( windHeadingPointsArray, inputs="Height Above Sea Level (m)", outputs="Wind Heading (Deg True)", extrapolation="constant", ) self.pressureLevelDataDict[dateString][hourString][ "windHeading" ] = windHeadingFunction # Create function for wind direction levels windDirectionArray = (windHeadingArray - 180) % 360 windDirectionPointsArray = np.array( [heightAboveSeaLevelArray, windDirectionArray] ).T windDirectionFunction = Function( windDirectionPointsArray, inputs="Height Above Sea Level (m)", outputs="Wind Direction (Deg True)", extrapolation="constant", ) self.pressureLevelDataDict[dateString][hourString][ "windDirection" ] = windDirectionFunction return self.pressureLevelDataDict
[docs] def parseSurfaceData(self): """ Parse surface data from a weather file. Currently only supports files from ECMWF. You can download a file from the following website: https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-single-levels?tab=form Must get the following variables: - surface elevation: self.elevation = float # Select 'Geopotential' - 2m temperature: surfaceTemperature = float - Surface pressure: surfacePressure = float - 10m u-component of wind: surface10mWindVelocityX = float - 10m v-component of wind: surface10mWindVelocityY = float - 100m u-component of wind: surface100mWindVelocityX = float - 100m V-component of wind: surface100mWindVelocityY = float - Instantaneous 10m wind gust: surfaceWindGust = float - Total precipitation: totalPrecipitation = float - Cloud base height: cloudBaseHeight = float Return a dictionary with all the computed data with the following structure: surfaceDataDict: { "date" : { "hour": { "data": ..., ... }, ... }, ... } """ # Setup dictionary used to read weather file self.__init_surface_dictionary() # Read weather file surfaceData = netCDF4.Dataset(self.surfaceDataFile) # Get time, latitude and longitude data from file timeNumArray = surfaceData.variables["time"] lonArray = surfaceData.variables["longitude"] latArray = surfaceData.variables["latitude"] # Determine latitude and longitude range for surface level file self.singleLevelInitLat = latArray[0] self.singleLevelEndLat = latArray[-1] self.singleLevelInitLon = lonArray[0] self.singleLevelEndLon = lonArray[-1] # Find index needed for latitude and longitude for specified location lonIndex = self.__getNearestIndex(lonArray, self.longitude) latIndex = self.__getNearestIndex(latArray, self.latitude) # Can't handle lat and lon out of grid self.__check_coordinates_inside_grid(lonIndex, latIndex, lonArray, latArray) # Loop through time and save all values for timeIndex, timeNum in enumerate(timeNumArray): dateString, hourString, dateTime = self.__timeNumToDateString( timeNum, timeNumArray.units, calendar="gregorian" ) # Check if date is within analysis range if not (self.start_date <= dateTime < self.end_date): continue if not (self.start_hour <= dateTime.hour < self.end_hour): continue # Make sure keys exist if dateString not in self.surfaceDataDict: self.surfaceDataDict[dateString] = {} if hourString not in self.surfaceDataDict[dateString]: self.surfaceDataDict[dateString][hourString] = {} # Extract data from weather file indices = (timeIndex, lonIndex, latIndex) for key, value in self.surfaceFileDict.items(): self.surfaceDataDict[dateString][hourString][ key ] = self.__extractSurfaceDataValue( surfaceData, value, indices, lonArray, latArray ) # Get elevation, time index does not matter, use last one self.surface_geopotential = self.__extractSurfaceDataValue( surfaceData, "z", indices, lonArray, latArray ) self.elevation = self.__compute_height_above_sea_level( self.surface_geopotential ) return self.surfaceDataDict
[docs] def convertPressureLevelData(self): """Convert pressure level data to desired unit system.""" # Create conversion dict (key: to_unit) conversion_dict = { "pressure": self.unit_system["pressure"], "temperature": self.unit_system["temperature"], "windDirection": self.unit_system["angle"], "windHeading": self.unit_system["angle"], "windSpeed": self.unit_system["wind_speed"], "windVelocityX": self.unit_system["wind_speed"], "windVelocityY": self.unit_system["wind_speed"], } # Loop through dates for date in self.pressureLevelDataDict: for hour in self.pressureLevelDataDict[date]: for key, value in self.pressureLevelDataDict[date][hour].items(): # Skip geopotential x asl if key not in conversion_dict: continue # Convert x axis variable = convert_units( variable=value, from_unit=self.current_units["height_ASL"], to_unit=self.unit_system["length"], axis=0, ) # Update current units self.updated_units["height_ASL"] = self.unit_system["length"] # Convert y axis variable = convert_units( variable=value, from_unit=self.current_units[key], to_unit=conversion_dict[key], axis=1, ) # Update current units self.updated_units[key] = conversion_dict[key] # Save converted Function self.pressureLevelDataDict[date][hour][key] = variable
[docs] def convertSurfaceData(self): """Convert surface data to desired unit system.""" # Create conversion dict (key: from_unit, to_unit) conversion_dict = { "surface100mWindVelocityX": self.unit_system["wind_speed"], "surface100mWindVelocityY": self.unit_system["wind_speed"], "surface10mWindVelocityX": self.unit_system["wind_speed"], "surface10mWindVelocityY": self.unit_system["wind_speed"], "surfaceTemperature": self.unit_system["temperature"], "cloudBaseHeight": self.unit_system["length"], "surfaceWindGust": self.unit_system["wind_speed"], "surfacePressure": self.unit_system["pressure"], "totalPrecipitation": self.unit_system["precipitation"], } # Loop through dates for date in self.surfaceDataDict: for hour in self.surfaceDataDict[date]: for key, value in self.surfaceDataDict[date][hour].items(): variable = convert_units( variable=value, from_unit=self.current_units[key], to_unit=conversion_dict[key], ) self.surfaceDataDict[date][hour][key] = variable # Update current units self.updated_units[key] = conversion_dict[key] # Convert surface elevation self.elevation = convert_units( self.elevation, self.current_units["height_ASL"], self.unit_system["length"] ) self.updated_units["height_ASL"] = self.unit_system["length"]
# Calculations
[docs] def process_data(self): """Process data that is shown in the allInfo method.""" self.calculate_pressure_stats() self.calculate_average_max_temperature() self.calculate_average_min_temperature() self.calculate_record_max_temperature() self.calculate_record_min_temperature() self.calculate_average_max_wind_gust() self.calculate_maximum_wind_gust() self.calculate_maximum_surface_10m_wind_speed() self.calculate_average_max_surface_10m_wind_speed() self.calculate_average_min_surface_10m_wind_speed() self.calculate_record_max_surface_10m_wind_speed() self.calculate_record_min_surface_10m_wind_speed() self.calculate_average_max_surface_100m_wind_speed() self.calculate_average_min_surface_100m_wind_speed() self.calculate_record_max_surface_100m_wind_speed() self.calculate_record_min_surface_100m_wind_speed() self.calculate_percentage_of_days_with_precipitation() self.calculate_average_cloud_base_height() self.calculate_min_cloud_base_height() self.calculate_percentage_of_days_with_no_cloud_coverage()
@property def cloud_base_height(self): cloud_base_height = [ dayDict[hour]["cloudBaseHeight"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] masked_elem = np.ma.core.MaskedConstant unmasked_cloud_base_height = [ np.inf if isinstance(elem, masked_elem) else elem for elem in cloud_base_height ] mask = [isinstance(elem, masked_elem) for elem in cloud_base_height] return np.ma.array(unmasked_cloud_base_height, mask=mask)
[docs] def calculate_pressure_stats(self): """Calculate pressure level statistics.""" # Surface pressure self.surface_pressure_list = [ dayDict[hour]["surfacePressure"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.average_surface_pressure = np.average(self.surface_pressure_list) self.std_surface_pressure = np.std(self.surface_pressure_list) # Pressure at 1000 feet self.pressure_at_1000ft_list = [ dayDict[hour]["pressure"]( convert_units(1000, "ft", self.current_units["height_ASL"]) ) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_pressure_at_1000ft = np.average(self.pressure_at_1000ft_list) self.std_pressure_at_1000ft = np.std(self.pressure_at_1000ft_list) # Pressure at 10000 feet self.pressure_at_10000ft_list = [ dayDict[hour]["pressure"]( convert_units(10000, "ft", self.current_units["height_ASL"]) ) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_pressure_at_10000ft = np.average(self.pressure_at_10000ft_list) self.std_pressure_at_10000ft = np.std(self.pressure_at_10000ft_list) # Pressure at 30000 feet self.pressure_at_30000ft_list = [ dayDict[hour]["pressure"]( convert_units(30000, "ft", self.current_units["height_ASL"]) ) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_pressure_at_30000ft = np.average(self.pressure_at_30000ft_list) self.std_pressure_at_30000ft = np.std(self.pressure_at_30000ft_list) return self.average_surface_pressure, self.std_surface_pressure
[docs] def calculate_average_cloud_base_height(self): """Calculate average cloud base height.""" self.mean_cloud_base_height = np.ma.mean(self.cloud_base_height) return self.mean_cloud_base_height
[docs] def calculate_min_cloud_base_height(self): """Calculate average cloud base height.""" self.min_cloud_base_height = np.ma.min( self.cloud_base_height, fill_value=np.inf ) return self.min_cloud_base_height
[docs] def calculate_percentage_of_days_with_no_cloud_coverage(self): """Calculate percentage of days with cloud coverage.""" self.percentage_of_days_with_no_cloud_coverage = np.ma.count( self.cloud_base_height ) / len(self.cloud_base_height) return self.percentage_of_days_with_no_cloud_coverage
[docs] def calculate_percentage_of_days_with_precipitation(self): """Computes the ratio between days with precipitation (> 10 mm) and total days.""" self.precipitation_per_day = [ sum([dayDict[hour]["totalPrecipitation"] for hour in dayDict.keys()]) for dayDict in self.surfaceDataDict.values() ] days_with_precipitation_count = 0 for precipitation in self.precipitation_per_day: if precipitation > convert_units( 10, "mm", self.unit_system["precipitation"] ): days_with_precipitation_count += 1 self.percentage_of_days_with_precipitation = ( days_with_precipitation_count / len(self.precipitation_per_day) ) return self.percentage_of_days_with_precipitation
def calculate_average_max_temperature(self): self.max_temperature_list = [ np.max([dayDict[hour]["surfaceTemperature"] for hour in dayDict.keys()]) for dayDict in self.surfaceDataDict.values() ] self.average_max_temperature = np.average(self.max_temperature_list) return self.average_max_temperature def calculate_average_min_temperature(self): self.min_temperature_list = [ np.min([dayDict[hour]["surfaceTemperature"] for hour in dayDict.keys()]) for dayDict in self.surfaceDataDict.values() ] self.average_min_temperature = np.average(self.min_temperature_list) return self.average_min_temperature def calculate_record_max_temperature(self): self.temperature_list = [ dayDict[hour]["surfaceTemperature"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_max_temperature = np.max(self.temperature_list) return self.record_max_temperature def calculate_record_min_temperature(self): self.temperature_list = [ dayDict[hour]["surfaceTemperature"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_min_temperature = np.min(self.temperature_list) return self.record_min_temperature def calculate_average_max_wind_gust(self): self.max_wind_gust_list = [ np.max([dayDict[hour]["surfaceWindGust"] for hour in dayDict.keys()]) for dayDict in self.surfaceDataDict.values() ] self.average_max_wind_gust = np.average(self.max_wind_gust_list) return self.average_max_wind_gust def calculate_maximum_wind_gust(self): self.wind_gust_list = [ dayDict[hour]["surfaceWindGust"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.max_wind_gust = np.max(self.wind_gust_list) return self.max_wind_gust def calculate_maximum_surface_10m_wind_speed(self): self.surface_10m_wind_speed_list = [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.max_surface_10m_wind_speed = np.max(self.surface_10m_wind_speed_list) return self.max_surface_10m_wind_speed def calculate_average_max_surface_10m_wind_speed(self): self.max_surface_10m_wind_speed_list = [ np.max( [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for hour in dayDict.keys() ] ) for dayDict in self.surfaceDataDict.values() ] self.average_max_surface_10m_wind_speed = np.average( self.max_surface_10m_wind_speed_list ) return self.average_max_surface_10m_wind_speed def calculate_average_min_surface_10m_wind_speed(self): self.min_surface_10m_wind_speed_list = [ np.min( [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for hour in dayDict.keys() ] ) for dayDict in self.surfaceDataDict.values() ] self.average_min_surface_10m_wind_speed = np.average( self.min_surface_10m_wind_speed_list ) return self.average_min_surface_10m_wind_speed def calculate_record_max_surface_10m_wind_speed(self): self.surface_10m_wind_speed = [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_max_surface_10m_wind_speed = np.max(self.surface_10m_wind_speed) return self.record_max_surface_10m_wind_speed def calculate_record_min_surface_10m_wind_speed(self): self.surface_10m_wind_speed = [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_min_surface_10m_wind_speed = np.min(self.surface_10m_wind_speed) return self.record_min_surface_10m_wind_speed def calculate_average_max_surface_100m_wind_speed(self): self.max_surface_100m_wind_speed_list = [ np.max( [ ( dayDict[hour]["surface100mWindVelocityX"] ** 2 + dayDict[hour]["surface100mWindVelocityY"] ** 2 ) ** 0.5 for hour in dayDict.keys() ] ) for dayDict in self.surfaceDataDict.values() ] self.average_max_surface_100m_wind_speed = np.average( self.max_surface_100m_wind_speed_list ) return self.average_max_surface_100m_wind_speed def calculate_average_min_surface_100m_wind_speed(self): self.min_surface_100m_wind_speed_list = [ np.min( [ ( dayDict[hour]["surface100mWindVelocityX"] ** 2 + dayDict[hour]["surface100mWindVelocityY"] ** 2 ) ** 0.5 for hour in dayDict.keys() ] ) for dayDict in self.surfaceDataDict.values() ] self.average_min_surface_100m_wind_speed = np.average( self.min_surface_100m_wind_speed_list ) return self.average_min_surface_100m_wind_speed def calculate_record_max_surface_100m_wind_speed(self): self.surface_100m_wind_speed = [ ( dayDict[hour]["surface100mWindVelocityX"] ** 2 + dayDict[hour]["surface100mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_max_surface_100m_wind_speed = np.max(self.surface_100m_wind_speed) return self.record_max_surface_100m_wind_speed def calculate_record_min_surface_100m_wind_speed(self): self.surface_100m_wind_speed = [ ( dayDict[hour]["surface100mWindVelocityX"] ** 2 + dayDict[hour]["surface100mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] self.record_min_surface_100m_wind_speed = np.min(self.surface_100m_wind_speed) return self.record_min_surface_100m_wind_speed
[docs] def plot_wind_gust_distribution(self): """Get all values of wind gust speed (for every date and hour available) and plot a single distribution. Expected result is a Weibull distribution. """ self.wind_gust_list = [ dayDict[hour]["surfaceWindGust"] for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] plt.figure() # Plot histogram plt.hist( self.wind_gust_list, bins=int(len(self.wind_gust_list) ** 0.5), density=True, histtype="stepfilled", alpha=0.2, label="Wind Gust Speed Distribution", ) # Plot weibull distribution c, loc, scale = stats.weibull_min.fit(self.wind_gust_list, loc=0, scale=1) x = np.linspace(0, np.max(self.wind_gust_list), 100) plt.plot( x, stats.weibull_min.pdf(x, c, loc, scale), "r-", linewidth=2, label="Weibull Distribution", ) # Label plot plt.ylabel("Probability") plt.xlabel(f"Wind gust speed ({self.unit_system['wind_speed']})") plt.title("Wind Gust Speed Distribution") plt.legend() plt.show() return None
[docs] def plot_surface10m_wind_speed_distribution(self, windSpeedLimit=False): """Get all values of sustained surface wind speed (for every date and hour available) and plot a single distribution. Expected result is a Weibull distribution. """ self.wind_speed_list = [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 for dayDict in self.surfaceDataDict.values() for hour in dayDict.keys() ] plt.figure() # Plot histogram plt.hist( self.wind_speed_list, bins=int(len(self.wind_speed_list) ** 0.5), density=True, histtype="stepfilled", alpha=0.2, label="Wind Gust Speed Distribution", ) # Plot weibull distribution c, loc, scale = stats.weibull_min.fit(self.wind_speed_list, loc=0, scale=1) x = np.linspace(0, np.max(self.wind_speed_list), 100) plt.plot( x, stats.weibull_min.pdf(x, c, loc, scale), "r-", linewidth=2, label="Weibull Distribution", ) if windSpeedLimit: plt.vlines( convert_units(20, "mph", self.unit_system["wind_speed"]), 0, 0.3, "g", (0, (15, 5, 2, 5)), label="Wind Speed Limit", ) # Plot Wind Speed Limit # Label plot plt.ylabel("Probability") plt.xlabel(f"Sustained surface wind speed ({self.unit_system['wind_speed']})") plt.title("Sustained Surface Wind Speed Distribution") plt.legend() plt.show() return None
[docs] def calculate_average_temperature_along_day(self): """Computes average temperature progression throughout the day, including sigma contours.""" # Flip dictionary to get hour as key instead of date historical_temperatures_each_hour = defaultdict(dict) for date, val in self.surfaceDataDict.items(): for hour, sub_val in val.items(): historical_temperatures_each_hour[hour][date] = sub_val[ "surfaceTemperature" ] self.average_temperature_at_given_hour = { hour: np.average(list(dates.values())) for hour, dates in historical_temperatures_each_hour.items() } self.average_temperature_sigmas_at_given_hour = { hour: np.std(list(dates.values())) for hour, dates in historical_temperatures_each_hour.items() } return ( self.average_temperature_at_given_hour, self.average_temperature_sigmas_at_given_hour, )
[docs] def plot_average_temperature_along_day(self): """Plots average temperature progression throughout the day, including sigma contours.""" # Compute values self.calculate_average_temperature_along_day() # Get handy arrays hours = np.fromiter(self.average_temperature_at_given_hour.keys(), np.float) temperature_mean = self.average_temperature_at_given_hour.values() temperature_mean = np.array(list(temperature_mean)) temperature_std = np.array( list(self.average_temperature_sigmas_at_given_hour.values()) ) temperatures_p1sigma = temperature_mean + temperature_std temperatures_m1sigma = temperature_mean - temperature_std temperatures_p2sigma = temperature_mean + 2 * temperature_std temperatures_m2sigma = temperature_mean - 2 * temperature_std plt.figure() # Plot temperature along day for each available date for hour_entries in self.surfaceDataDict.values(): plt.plot( [int(hour) for hour in hour_entries.keys()], [val["surfaceTemperature"] for val in hour_entries.values()], "gray", alpha=0.1, ) # Plot average temperature along day plt.plot(hours, temperature_mean, "r", label="$\\mu$") # Plot standard deviations temperature along day plt.plot(hours, temperatures_m1sigma, "b--", label=r"$\mu \pm \sigma$") plt.plot(hours, temperatures_p1sigma, "b--") plt.plot(hours, temperatures_p2sigma, "b--", alpha=0.5) plt.plot( hours, temperatures_m2sigma, "b--", label=r"$\mu \pm 2\sigma $", alpha=0.5 ) # Format plot plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True)) plt.gca().xaxis.set_major_formatter( lambda x, pos: "{0:02.0f}:{1:02.0f}".format(*divmod(x * 60, 60)) ) plt.autoscale(enable=True, axis="x", tight=True) plt.xlabel("Time (hours)") plt.ylabel(f"Temperature ({self.unit_system['temperature']})") plt.title("Average Temperature Along Day") plt.grid(alpha=0.25) plt.legend() plt.show()
[docs] def calculate_average_sustained_surface10m_wind_along_day(self): """Computes average sustained wind speed progression throughout the day, including sigma contours.""" # Flip dictionary to get hour as key instead of date historical_surface10m_wind_speeds_each_hour = defaultdict(dict) for date, val in self.surfaceDataDict.items(): for hour, sub_val in val.items(): historical_surface10m_wind_speeds_each_hour[hour][date] = ( sub_val["surface10mWindVelocityX"] ** 2 + sub_val["surface10mWindVelocityY"] ** 2 ) ** 0.5 self.average_surface10m_wind_speed_at_given_hour = { hour: np.average(list(dates.values())) for hour, dates in historical_surface10m_wind_speeds_each_hour.items() } self.average_surface10m_wind_speed_sigmas_at_given_hour = { hour: np.std(list(dates.values())) for hour, dates in historical_surface10m_wind_speeds_each_hour.items() } return ( self.average_surface10m_wind_speed_at_given_hour, self.average_surface10m_wind_speed_sigmas_at_given_hour, )
[docs] def plot_average_surface10m_wind_speed_along_day(self, windSpeedLimit=False): """Plots average surface wind speed progression throughout the day, including sigma contours.""" # Compute values self.calculate_average_sustained_surface10m_wind_along_day() # Get handy arrays hours = np.fromiter( self.average_surface10m_wind_speed_at_given_hour.keys(), np.float ) wind_speed_mean = self.average_surface10m_wind_speed_at_given_hour.values() wind_speed_mean = np.array(list(wind_speed_mean)) wind_speed_std = np.array( list(self.average_surface10m_wind_speed_sigmas_at_given_hour.values()) ) wind_speeds_p1sigma = wind_speed_mean + wind_speed_std wind_speeds_m1sigma = wind_speed_mean - wind_speed_std wind_speeds_p2sigma = wind_speed_mean + 2 * wind_speed_std wind_speeds_m2sigma = wind_speed_mean - 2 * wind_speed_std plt.figure() # Plot temperature along day for each available date for hour_entries in self.surfaceDataDict.values(): plt.plot( [int(hour) for hour in hour_entries.keys()], [ ( val["surface10mWindVelocityX"] ** 2 + val["surface10mWindVelocityY"] ** 2 ) ** 0.5 for val in hour_entries.values() ], "gray", alpha=0.1, ) # Plot average temperature along day plt.plot(hours, wind_speed_mean, "r", label="$\\mu$") # Plot standard deviations temperature along day plt.plot(hours, wind_speeds_m1sigma, "b--", label=r"$\mu \pm \sigma$") plt.plot(hours, wind_speeds_p1sigma, "b--") plt.plot(hours, wind_speeds_p2sigma, "b--", alpha=0.5) plt.plot( hours, wind_speeds_m2sigma, "b--", label=r"$\mu \pm 2\sigma $", alpha=0.5 ) # Format plot plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True)) plt.gca().xaxis.set_major_formatter( lambda x, pos: "{0:02.0f}:{1:02.0f}".format(*divmod(x * 60, 60)) ) plt.autoscale(enable=True, axis="x", tight=True) if windSpeedLimit: plt.hlines( convert_units(20, "mph", self.unit_system["wind_speed"]), min(hours), max(hours), "g", (0, (15, 5, 2, 5)), label="Wind Speed Limit", ) # Plot Wind Speed Limit plt.xlabel("Time (hours)") plt.ylabel(f"Surface Wind Speed ({self.unit_system['wind_speed']})") plt.title("Average Sustained Surface Wind Speed Along Day") plt.grid(alpha=0.25) plt.legend() plt.show()
[docs] def calculate_average_sustained_surface100m_wind_along_day(self): """Computes average sustained wind speed progression throughout the day, including sigma contours.""" # Flip dictionary to get hour as key instead of date historical_surface100m_wind_speeds_each_hour = defaultdict(dict) for date, val in self.surfaceDataDict.items(): for hour, sub_val in val.items(): historical_surface100m_wind_speeds_each_hour[hour][date] = ( sub_val["surface100mWindVelocityX"] ** 2 + sub_val["surface100mWindVelocityY"] ** 2 ) ** 0.5 self.average_surface100m_wind_speed_at_given_hour = { hour: np.average(list(dates.values())) for hour, dates in historical_surface100m_wind_speeds_each_hour.items() } self.average_surface100m_wind_speed_sigmas_at_given_hour = { hour: np.std(list(dates.values())) for hour, dates in historical_surface100m_wind_speeds_each_hour.items() } return ( self.average_surface100m_wind_speed_at_given_hour, self.average_surface100m_wind_speed_sigmas_at_given_hour, )
[docs] def plot_average_sustained_surface100m_wind_speed_along_day(self): """Plots average surface wind speed progression throughout the day, including sigma contours.""" # Compute values self.calculate_average_sustained_surface100m_wind_along_day() # Get handy arrays hours = np.fromiter( self.average_surface100m_wind_speed_at_given_hour.keys(), np.float ) wind_speed_mean = self.average_surface100m_wind_speed_at_given_hour.values() wind_speed_mean = np.array(list(wind_speed_mean)) wind_speed_std = np.array( list(self.average_surface100m_wind_speed_sigmas_at_given_hour.values()) ) wind_speeds_p1sigma = wind_speed_mean + wind_speed_std wind_speeds_m1sigma = wind_speed_mean - wind_speed_std wind_speeds_p2sigma = wind_speed_mean + 2 * wind_speed_std wind_speeds_m2sigma = wind_speed_mean - 2 * wind_speed_std plt.figure() # Plot temperature along day for each available date for hour_entries in self.surfaceDataDict.values(): plt.plot( [int(hour) for hour in hour_entries.keys()], [ ( val["surface100mWindVelocityX"] ** 2 + val["surface100mWindVelocityY"] ** 2 ) ** 0.5 for val in hour_entries.values() ], "gray", alpha=0.1, ) # Plot average temperature along day plt.plot(hours, wind_speed_mean, "r", label="$\\mu$") # Plot standard deviations temperature along day plt.plot(hours, wind_speeds_m1sigma, "b--", label=r"$\mu \pm \sigma$") plt.plot(hours, wind_speeds_p1sigma, "b--") plt.plot(hours, wind_speeds_p2sigma, "b--", alpha=0.5) plt.plot( hours, wind_speeds_m2sigma, "b--", label=r"$\mu \pm 2\sigma $", alpha=0.5 ) # Format plot plt.gca().xaxis.set_major_locator(plt.MaxNLocator(integer=True)) plt.gca().xaxis.set_major_formatter( lambda x, pos: "{0:02.0f}:{1:02.0f}".format(*divmod(x * 60, 60)) ) plt.autoscale(enable=True, axis="x", tight=True) plt.xlabel("Time (hours)") plt.ylabel(f"100m Wind Speed ({self.unit_system['wind_speed']})") plt.title("Average 100m Wind Speed Along Day") plt.grid(alpha=0.25) plt.legend() plt.show()
[docs] def plot_average_wind_speed_profile(self, clear_range_limits=False): """Average wind speed for all datetimes available.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) wind_speed_profiles = [ dayDict[hour]["windSpeed"](altitude_list) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_wind_speed_profile = np.mean(wind_speed_profiles, axis=0) # Plot plt.figure() plt.plot(self.average_wind_speed_profile, altitude_list, "r", label="$\\mu$") plt.plot( np.percentile(wind_speed_profiles, 50 - 34.1, axis=0), altitude_list, "b--", alpha=1, label="$\\mu \\pm \\sigma$", ) plt.plot( np.percentile(wind_speed_profiles, 50 + 34.1, axis=0), altitude_list, "b--", alpha=1, ) plt.plot( np.percentile(wind_speed_profiles, 50 - 47.4, axis=0), altitude_list, "b--", alpha=0.5, label="$\\mu \\pm 2\\sigma$", ) plt.plot( np.percentile(wind_speed_profiles, 50 + 47.7, axis=0), altitude_list, "b--", alpha=0.5, ) for wind_speed_profile in wind_speed_profiles: plt.plot(wind_speed_profile, altitude_list, "gray", alpha=0.01) plt.autoscale(enable=True, axis="x", tight=True) plt.autoscale(enable=True, axis="y", tight=True) if clear_range_limits: # Clear Sky Range Altitude Limits print(plt) xmin, xmax, ymin, ymax = plt.axis() plt.fill_between( [xmin, xmax], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) plt.fill_between( [xmin, xmax], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) plt.xlabel(f"Wind speed ({self.unit_system['wind_speed']})") plt.ylabel(f"Altitude AGL ({self.unit_system['length']})") plt.xlim(0, 360) plt.title("Average Wind speed Profile") plt.legend() plt.xlim(0, max(np.percentile(wind_speed_profiles, 50 + 49.85, axis=0))) plt.show()
[docs] def plot_average_wind_heading_profile(self, clear_range_limits=False): """Average wind heading for all datetimes available.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) wind_X_profiles = [ dayDict[hour]["windVelocityX"](altitude_list) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_wind_X_profile = np.mean(wind_X_profiles, axis=0) wind_Y_profiles = [ dayDict[hour]["windVelocityY"](altitude_list) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_wind_Y_profile = np.mean(wind_Y_profiles, axis=0) wind_heading_profiles = ( np.arctan2(wind_X_profiles, wind_Y_profiles) * 180 / np.pi % 360 ) self.average_wind_heading_profile = ( np.arctan2(self.average_wind_X_profile, self.average_wind_Y_profile) * 180 / np.pi % 360 ) # TODO: Add plot for wind X and wind Y profiles # Plot plt.figure() plt.plot(self.average_wind_heading_profile, altitude_list, "r", label="$\\mu$") # plt.plot( # np.percentile(wind_heading_profiles, 50 - 34.1, axis=0), # altitude_list, # "b--", # alpha=1, # label="$\\mu \\pm \\sigma$", # ) # plt.plot( # np.percentile(wind_heading_profiles, 50 + 34.1, axis=0), # altitude_list, # "b--", # alpha=1, # ) # plt.plot( # np.percentile(wind_heading_profiles, 50 - 47.4, axis=0), # altitude_list, # "b--", # alpha=0.5, # label="$\\mu \\pm 2\\sigma$", # ) # plt.plot( # np.percentile(wind_heading_profiles, 50 + 47.7, axis=0), # altitude_list, # "b--", # alpha=0.5, # ) # for wind_heading_profile in wind_heading_profiles: # plt.plot(wind_heading_profile, altitude_list, "gray", alpha=0.01) plt.autoscale(enable=True, axis="x", tight=True) plt.autoscale(enable=True, axis="y", tight=True) if clear_range_limits: # Clear Sky Range Altitude Limits print(plt) xmin, xmax, ymin, ymax = plt.axis() plt.fill_between( [xmin, xmax], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) plt.fill_between( [xmin, xmax], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) plt.xlabel(f"Wind heading ({self.unit_system['angle']})") plt.ylabel(f"Altitude AGL ({self.unit_system['length']})") plt.xlim(0, 360) plt.title("Average Wind heading Profile") plt.legend() plt.show()
[docs] def process_wind_speed_and_direction_data_for_average_day(self): """Process the wind_speed and wind_direction data to generate lists of all the wind_speeds recorded for a following hour of the day and also the wind direction. Also calculates the greater and the smallest wind_speed recorded Returns ------- None """ max_wind_speed = float("-inf") min_wind_speed = float("inf") days = list(self.surfaceDataDict.keys()) hours = list(self.surfaceDataDict[days[0]].keys()) windSpeed = {} windDir = {} for hour in hours: windSpeed[hour] = [] windDir[hour] = [] for day in days: try: hour_wind_speed = ( self.surfaceDataDict[day][hour]["surface10mWindVelocityX"] ** 2 + self.surfaceDataDict[day][hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 max_wind_speed = ( hour_wind_speed if hour_wind_speed > max_wind_speed else max_wind_speed ) min_wind_speed = ( hour_wind_speed if hour_wind_speed < min_wind_speed else min_wind_speed ) windSpeed[hour].append(hour_wind_speed) # Wind direction means where the wind is blowing from, 180 deg opposite from wind heading vx = self.surfaceDataDict[day][hour]["surface10mWindVelocityX"] vy = self.surfaceDataDict[day][hour]["surface10mWindVelocityY"] windDir[hour].append( (180 + (np.arctan2(vy, vx) * 180 / np.pi)) % 360 ) except KeyError: # Not all days have all hours stored, that is fine pass self.max_wind_speed = max_wind_speed self.min_wind_speed = min_wind_speed self.wind_speed_per_hour = windSpeed self.wind_direction_per_hour = windDir
[docs] def plot_average_pressure_profile(self, clear_range_limits=False): """Average wind speed for all datetimes available.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) pressure_profiles = [ dayDict[hour]["pressure"](altitude_list) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] self.average_pressure_profile = np.mean(pressure_profiles, axis=0) # Plot plt.figure() plt.plot(self.average_pressure_profile, altitude_list, "r", label="$\\mu$") plt.plot( np.percentile(pressure_profiles, 50 - 34.1, axis=0), altitude_list, "b--", alpha=1, label="$\\mu \\pm \\sigma$", ) plt.plot( np.percentile(pressure_profiles, 50 + 34.1, axis=0), altitude_list, "b--", alpha=1, ) plt.plot( np.percentile(pressure_profiles, 50 - 47.4, axis=0), altitude_list, "b--", alpha=0.5, label="$\\mu \\pm 2\\sigma$", ) plt.plot( np.percentile(pressure_profiles, 50 + 47.7, axis=0), altitude_list, "b--", alpha=0.5, ) for pressure_profile in pressure_profiles: plt.plot(pressure_profile, altitude_list, "gray", alpha=0.01) plt.autoscale(enable=True, axis="x", tight=True) plt.autoscale(enable=True, axis="y", tight=True) if clear_range_limits: # Clear Sky Range Altitude Limits print(plt) xmin, xmax, ymin, ymax = plt.axis() plt.fill_between( [xmin, xmax], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) plt.fill_between( [xmin, xmax], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) plt.xlabel(f"Pressure ({self.unit_system['pressure']})") plt.ylabel(f"Altitude AGL ({self.unit_system['length']})") plt.title("Average Pressure Profile") plt.legend() plt.xlim(0, max(np.percentile(pressure_profiles, 50 + 49.85, axis=0))) plt.show()
[docs] @staticmethod def plot_wind_rose( wind_direction, wind_speed, bins=None, title=None, fig=None, rect=None ): """Plot a windrose given the data. Parameters ---------- wind_direction: list[float] wind_speed: list[float] bins: 1D array or integer, optional number of bins, or a sequence of bins variable. If not set, bins=6, then bins=linspace(min(var), max(var), 6) title: str, optional Title of the plot fig: matplotlib.pyplot.figure, optional Returns ------- WindroseAxes """ ax = WindroseAxes.from_ax(fig=fig, rect=rect) ax.bar( wind_direction, wind_speed, bins=bins, normed=True, opening=0.8, edgecolor="white", ) ax.set_title(title) ax.set_legend() # Format the ticks (only integers, as percentage, at most 3 intervals) ax.yaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=3, prune="lower") ) ax.yaxis.set_major_formatter(mtick.PercentFormatter(decimals=0)) return ax
[docs] def plot_average_day_wind_rose_specific_hour(self, hour, fig=None): """Plot a specific hour of the average windrose Parameters ---------- hour: int fig: matplotlib.pyplot.figure Returns ------- None """ hour = str(hour) self.plot_wind_rose( self.wind_direction_per_hour[hour], self.wind_speed_per_hour[hour], bins=self._beaufort_wind_scale( self.unit_system["wind_speed"], max_wind_speed=self.max_wind_speed ), title=f"Wind Rose of an Average Day ({self.unit_system['wind_speed']}) - Hour {float(hour):05.2f}".replace( ".", ":" ), fig=fig, ) plt.show()
[docs] def plot_average_day_wind_rose_all_hours(self): """Plot wind roses for all hours of a day, in a grid like plot.""" # Get days and hours days = list(self.surfaceDataDict.keys()) hours = list(self.surfaceDataDict[days[0]].keys()) # Make sure necessary data has been calculated if not all( [ self.max_wind_speed, self.min_wind_speed, self.wind_speed_per_hour, self.wind_direction_per_hour, ] ): self.process_wind_speed_and_direction_data_for_average_day() # Figure settings windrose_side = 2.5 # inches vertical_padding_top = 1.5 # inches plot_padding = 0.18 # percentage ncols, nrows = self._find_two_closest_integer_factors(len(hours)) vertical_plot_area_percentage = ( nrows * windrose_side / (nrows * windrose_side + vertical_padding_top) ) # Create figure fig = plt.figure() fig.set_size_inches( ncols * windrose_side, nrows * windrose_side + vertical_padding_top ) bins = self._beaufort_wind_scale( self.unit_system["wind_speed"], max_wind_speed=self.max_wind_speed ) width = (1 - 2 * plot_padding) * 1 / ncols height = vertical_plot_area_percentage * (1 - 2 * plot_padding) * 1 / nrows # print(ncols, nrows) # print(ncols * windrose_side, nrows * windrose_side + vertical_padding_top) # print(vertical_plot_area_percentage) # print(width, height) for k, hour in enumerate(hours): i, j = len(hours) // nrows - k // ncols, k % ncols # Row count bottom up left = j * 1 / ncols + plot_padding / ncols bottom = ( vertical_plot_area_percentage * ((i - 2) / nrows + plot_padding / nrows) + 0.5 ) # print(left, bottom) ax = self.plot_wind_rose( self.wind_direction_per_hour[hour], self.wind_speed_per_hour[hour], bins=bins, title=f"{float(hour):05.2f}".replace(".", ":"), fig=fig, rect=[left, bottom, width, height], ) if k == 0: ax.legend( loc="upper center", # 0.8 is a magic number bbox_to_anchor=(ncols / 2 + 0.8, 1.55), fancybox=True, shadow=True, ncol=6, ) else: ax.legend().set_visible(False) fig.add_axes(ax) fig.suptitle( f"Wind Roses ({self.unit_system['wind_speed']})", fontsize=20, x=0.5, y=1 ) plt.show()
[docs] def animate_average_wind_rose(self, figsize=(8, 8), filename="wind_rose.gif"): """Animates the wind_rose of an average day. The inputs of a wind_rose are the location of the place where we want to analyze, (x,y,z). The data is assembled by hour, which means, the windrose of a specific hour is generated by bringing together the data of all of the days available for that specific hour. It's possible to change the size of the gif using the parameter figsize, which is the height and width in inches. Parameters ---------- figsize : array Returns ------- Image : ipywidgets.widgets.widget_media.Image """ days = list(self.surfaceDataDict.keys()) hours = list(self.surfaceDataDict[days[0]].keys()) if not all( [ self.max_wind_speed, self.min_wind_speed, self.wind_speed_per_hour, self.wind_direction_per_hour, ] ): self.process_wind_speed_and_direction_data_for_average_day() metadata = dict( title="windrose", artist="windrose", comment="""Made with windrose http://www.github.com/scls19fr/windrose""", ) writer = ImageWriter(fps=1, metadata=metadata) fig = plt.figure(facecolor="w", edgecolor="w", figsize=figsize) with writer.saving(fig, filename, 100): for hour in hours: self.plot_wind_rose( self.wind_direction_per_hour[hour], self.wind_speed_per_hour[hour], bins=self._beaufort_wind_scale( self.unit_system["wind_speed"], max_wind_speed=self.max_wind_speed, ), title=f"Wind Rose of an Average Day ({self.unit_system['wind_speed']}). Hour {float(hour):05.2f}".replace( ".", ":" ), fig=fig, ) writer.grab_frame() plt.clf() with open(filename, "rb") as file: image = file.read() fig_width, fig_height = plt.gcf().get_size_inches() * fig.dpi plt.close(fig) return widgets.Image( value=image, format="gif", width=fig_width, height=fig_height, )
[docs] def plot_wind_gust_distribution_over_average_day(self): """Plots shown in the animation of how the wind gust distribution varies throughout the day.""" # Gather animation data average_wind_gust_at_given_hour = {} for hour in list(self.surfaceDataDict.values())[0].keys(): wind_gust_values_for_this_hour = [] for dayDict in self.surfaceDataDict.values(): try: wind_gust_values_for_this_hour += [dayDict[hour]["surfaceWindGust"]] except KeyError: # Some day does not have data for the desired hour (probably the last one) # No need to worry, just average over the other days pass average_wind_gust_at_given_hour[hour] = wind_gust_values_for_this_hour # Create grid of plots for each hour hours = list(list(self.pressureLevelDataDict.values())[0].keys()) nrows, ncols = self._find_two_closest_integer_factors(len(hours)) fig = plt.figure(figsize=(ncols * 2, nrows * 2.2)) gs = fig.add_gridspec(nrows, ncols, hspace=0, wspace=0, left=0.12) axs = gs.subplots(sharex=True, sharey=True) x_min, x_max, y_min, y_max = 0, 0, 0, 0 for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: hour = hours[i * ncols + j] ax = axs[i, j] ax.set_title(f"{float(hour):05.2f}".replace(".", ":"), y=0.8) ax.hist( average_wind_gust_at_given_hour[hour], bins=int(len(average_wind_gust_at_given_hour[hour]) ** 0.5), density=True, histtype="stepfilled", alpha=0.2, label="Wind Gust Speed Distribution", ) ax.autoscale(enable=True, axis="y", tight=True) # Plot weibull distribution c, loc, scale = stats.weibull_min.fit( average_wind_gust_at_given_hour[hour], loc=0, scale=1 ) x = np.linspace(0, np.ceil(self.max_wind_gust), 100) ax.plot( x, stats.weibull_min.pdf(x, c, loc, scale), "r-", linewidth=2, label="Weibull Distribution", ) current_x_max = ax.get_xlim()[1] current_y_max = ax.get_ylim()[1] x_max = current_x_max if current_x_max > x_max else x_max y_max = current_y_max if current_y_max > y_max else y_max ax.label_outer() ax.grid() # Set x and y limits for the last axis. Since axes are shared, set to all ax.set_xlim(x_min, x_max) ax.set_ylim(y_min, y_max) ax.xaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=5, prune="lower") ) ax.yaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=4, prune="lower") ) # Set title and axis labels for entire figure handles, labels = ax.get_legend_handles_labels() fig.legend(handles, labels, loc="upper right") fig.suptitle("Average Wind Profile") fig.supxlabel(f"Wind Gust Speed ({self.unit_system['wind_speed']})") fig.supylabel("Probability") plt.show()
[docs] def animate_wind_gust_distribution_over_average_day(self): """Animation of how the wind gust distribution varies throughout the day.""" # Gather animation data wind_gusts_at_given_hour = {} for hour in list(self.surfaceDataDict.values())[0].keys(): wind_gust_values_for_this_hour = [] for dayDict in self.surfaceDataDict.values(): try: wind_gust_values_for_this_hour += [dayDict[hour]["surfaceWindGust"]] except KeyError: # Some day does not have data for the desired hour (probably the last one) # No need to worry, just average over the other days pass wind_gusts_at_given_hour[hour] = wind_gust_values_for_this_hour # Create animation fig, ax = plt.subplots(dpi=200) # Initialize animation artists: histogram and hour text hist_bins = np.linspace(0, np.ceil(self.max_wind_gust), 25) # Fix bins edges _, _, bar_container = plt.hist( [], bins=hist_bins, alpha=0.2, label="Wind Gust Speed Distribution", ) (ln,) = plt.plot( [], [], "r-", linewidth=2, label="Weibull Distribution", ) tx = plt.text( x=0.95, y=0.95, s="", verticalalignment="top", horizontalalignment="right", transform=ax.transAxes, fontsize=24, ) # Define function to initialize animation def init(): ax.set_xlim(0, np.ceil(self.max_wind_gust)) ax.set_ylim(0, 0.3) # TODO: parametrize ax.set_xlabel(f"Wind Gust Speed ({self.unit_system['wind_speed']})") ax.set_ylabel("Probability") ax.set_title("Wind Gust Distribution") # ax.grid(True) return (ln, *bar_container.patches, tx) # Define function which sets each animation frame def update(frame): # Update histogram data = frame[1] hist, _ = np.histogram(data, hist_bins, density=True) for count, rect in zip(hist, bar_container.patches): rect.set_height(count) # Update weibull distribution c, loc, scale = stats.weibull_min.fit(data, loc=0, scale=1) xdata = np.linspace(0, np.ceil(self.max_wind_gust), 100) ydata = stats.weibull_min.pdf(xdata, c, loc, scale) ln.set_data(xdata, ydata) # Update hour text tx.set_text(f"{float(frame[0]):05.2f}".replace(".", ":")) return (ln, *bar_container.patches, tx) for frame in wind_gusts_at_given_hour.items(): update(frame) animation = FuncAnimation( fig, update, frames=wind_gusts_at_given_hour.items(), interval=750, init_func=init, blit=True, ) plt.close(fig) return HTML(animation.to_jshtml())
[docs] def plot_sustained_surface_wind_speed_distribution_over_average_day( self, windSpeedLimit=False ): """Plots shown in the animation of how the sustained surface wind speed distribution varies throughout the day.""" # Gather animation data average_wind_speed_at_given_hour = {} for hour in list(self.surfaceDataDict.values())[0].keys(): wind_speed_values_for_this_hour = [] for dayDict in self.surfaceDataDict.values(): try: wind_speed_values_for_this_hour += [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 ] except KeyError: # Some day does not have data for the desired hour (probably the last one) # No need to worry, just average over the other days pass average_wind_speed_at_given_hour[hour] = wind_speed_values_for_this_hour # Create grid of plots for each hour hours = list(list(self.pressureLevelDataDict.values())[0].keys()) ncols, nrows = self._find_two_closest_integer_factors(len(hours)) fig = plt.figure(figsize=(ncols * 2, nrows * 2.2)) gs = fig.add_gridspec(nrows, ncols, hspace=0, wspace=0, left=0.12) axs = gs.subplots(sharex=True, sharey=True) x_min, x_max, y_min, y_max = 0, 0, 0, 0 for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: hour = hours[i * ncols + j] ax = axs[i, j] ax.set_title(f"{float(hour):05.2f}".replace(".", ":"), y=0.8) ax.hist( average_wind_speed_at_given_hour[hour], bins=int(len(average_wind_speed_at_given_hour[hour]) ** 0.5), density=True, histtype="stepfilled", alpha=0.2, label="Wind speed Speed Distribution", ) ax.autoscale(enable=True, axis="y", tight=True) # Plot weibull distribution c, loc, scale = stats.weibull_min.fit( average_wind_speed_at_given_hour[hour], loc=0, scale=1 ) x = np.linspace( 0, np.ceil(self.calculate_maximum_surface_10m_wind_speed()), 100 ) ax.plot( x, stats.weibull_min.pdf(x, c, loc, scale), "r-", linewidth=2, label="Weibull Distribution", ) current_x_max = ax.get_xlim()[1] current_y_max = ax.get_ylim()[1] x_max = current_x_max if current_x_max > x_max else x_max y_max = current_y_max if current_y_max > y_max else y_max ax.label_outer() ax.grid() # Set x and y limits for the last axis. Since axes are shared, set to all ax.set_xlim(x_min, x_max) ax.set_ylim(y_min, y_max) ax.xaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=5, prune="lower") ) ax.yaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=4, prune="lower") ) if windSpeedLimit: for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: # Clear Sky Range Altitude Limits j] ax = axs[i, j] ax.vlines( convert_units(20, "mph", self.unit_system["wind_speed"]), 0, ax.get_ylim()[1], "g", (0, (15, 5, 2, 5)), label="Wind Speed Limits", ) # Set title and axis labels for entire figure handles, labels = ax.get_legend_handles_labels() fig.legend(handles, labels, loc="upper right") fig.suptitle("Average Wind Profile") fig.supxlabel( f"Sustained Surface Wind Speed ({self.unit_system['wind_speed']})" ) fig.supylabel("Probability") plt.show()
[docs] def animate_sustained_surface_wind_speed_distribution_over_average_day( self, windSpeedLimit=False ): # TODO: getting weird results since the 0.3 on y axis is not parametrized """Animation of how the sustained surface wind speed distribution varies throughout the day.""" # Gather animation data surface_wind_speeds_at_given_hour = {} for hour in list(self.surfaceDataDict.values())[0].keys(): surface_wind_speed_values_for_this_hour = [] for dayDict in self.surfaceDataDict.values(): try: surface_wind_speed_values_for_this_hour += [ ( dayDict[hour]["surface10mWindVelocityX"] ** 2 + dayDict[hour]["surface10mWindVelocityY"] ** 2 ) ** 0.5 ] except KeyError: # Some day does not have data for the desired hour (probably the last one) # No need to worry, just average over the other days pass surface_wind_speeds_at_given_hour[ hour ] = surface_wind_speed_values_for_this_hour # Create animation fig, ax = plt.subplots(dpi=200) # Initialize animation artists: histogram and hour text hist_bins = np.linspace( 0, np.ceil(self.calculate_maximum_surface_10m_wind_speed()), 25 ) # Fix bins edges _, _, bar_container = plt.hist( [], bins=hist_bins, alpha=0.2, label="Sustained Surface Wind Speed Distribution", ) (ln,) = plt.plot( [], [], "r-", linewidth=2, label="Weibull Distribution", ) tx = plt.text( x=0.95, y=0.95, s="", verticalalignment="top", horizontalalignment="right", transform=ax.transAxes, fontsize=24, ) # Define function to initialize animation def init(): ax.set_xlim(0, np.ceil(self.calculate_maximum_surface_10m_wind_speed())) ax.set_ylim(0, 0.3) # TODO: parametrize ax.set_xlabel( f"Sustained Surface Wind Speed ({self.unit_system['wind_speed']})" ) ax.set_ylabel("Probability") ax.set_title("Sustained Surface Wind Distribution") # ax.grid(True) if windSpeedLimit: ax.vlines( convert_units(20, "mph", self.unit_system["wind_speed"]), 0, 0.3, # TODO: parametrize "g", (0, (15, 5, 2, 5)), label="Wind Speed Limit", ) # Plot Wind Speed Limit return (ln, *bar_container.patches, tx) # Define function which sets each animation frame def update(frame): # Update histogram data = frame[1] hist, _ = np.histogram(data, hist_bins, density=True) for count, rect in zip(hist, bar_container.patches): rect.set_height(count) # Update weibull distribution c, loc, scale = stats.weibull_min.fit(data, loc=0, scale=1) xdata = np.linspace( 0, np.ceil(self.calculate_maximum_surface_10m_wind_speed()), 100 ) ydata = stats.weibull_min.pdf(xdata, c, loc, scale) ln.set_data(xdata, ydata) # Update hour text tx.set_text(f"{float(frame[0]):05.2f}".replace(".", ":")) return (ln, *bar_container.patches, tx) for frame in surface_wind_speeds_at_given_hour.items(): update(frame) animation = FuncAnimation( fig, update, frames=surface_wind_speeds_at_given_hour.items(), interval=750, init_func=init, blit=True, ) plt.close(fig) return HTML(animation.to_jshtml())
@property def altitude_AGL_range(self): min_altitude = 0 if self.maxExpectedAltitude == None: max_altitudes = [ np.max(dayDict[hour]["windSpeed"].source[-1, 0]) for dayDict in self.pressureLevelDataDict.values() for hour in dayDict.keys() ] max_altitude = np.min(max_altitudes) else: max_altitude = self.maxExpectedAltitude return min_altitude, max_altitude
[docs] def process_temperature_profile_over_average_day(self): """Compute the average temperature profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_temperature_profile_at_given_hour = {} self.max_average_temperature_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: temperature_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: temperature_values_for_this_hour += [ dayDict[hour]["temperature"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass mean_temperature_values_for_this_hour = np.mean( temperature_values_for_this_hour, axis=0 ) average_temperature_profile_at_given_hour[hour] = [ mean_temperature_values_for_this_hour, altitude_list, ] max_temperature = np.max(mean_temperature_values_for_this_hour) if max_temperature >= self.max_average_temperature_at_altitude: self.max_average_temperature_at_altitude = max_temperature self.average_temperature_profile_at_given_hour = ( average_temperature_profile_at_given_hour )
[docs] def process_pressure_profile_over_average_day(self): """Compute the average pressure profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_pressure_profile_at_given_hour = {} self.max_average_pressure_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: pressure_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: pressure_values_for_this_hour += [ dayDict[hour]["pressure"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass mean_pressure_values_for_this_hour = np.mean( pressure_values_for_this_hour, axis=0 ) average_pressure_profile_at_given_hour[hour] = [ mean_pressure_values_for_this_hour, altitude_list, ] max_pressure = np.max(mean_pressure_values_for_this_hour) if max_pressure >= self.max_average_pressure_at_altitude: self.max_average_pressure_at_altitude = max_pressure self.average_pressure_profile_at_given_hour = ( average_pressure_profile_at_given_hour )
[docs] def process_wind_speed_profile_over_average_day(self): """Compute the average wind profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_wind_profile_at_given_hour = {} self.max_average_wind_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() # days = list(self.surfaceDataDict.keys()) # hours = list(self.surfaceDataDict[days[0]].keys()) for hour in hours: wind_speed_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: wind_speed_values_for_this_hour += [ dayDict[hour]["windSpeed"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass mean_wind_speed_values_for_this_hour = np.mean( wind_speed_values_for_this_hour, axis=0 ) average_wind_profile_at_given_hour[hour] = [ mean_wind_speed_values_for_this_hour, altitude_list, ] max_wind = np.max(mean_wind_speed_values_for_this_hour) if max_wind >= self.max_average_wind_at_altitude: self.max_average_wind_at_altitude = max_wind self.average_wind_profile_at_given_hour = average_wind_profile_at_given_hour
[docs] def process_wind_velocity_x_profile_over_average_day(self): """Compute the average windVelocityX profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_windVelocityX_profile_at_given_hour = {} self.max_average_windVelocityX_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: windVelocityX_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: windVelocityX_values_for_this_hour += [ dayDict[hour]["windVelocityX"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass mean_windVelocityX_values_for_this_hour = np.mean( windVelocityX_values_for_this_hour, axis=0 ) average_windVelocityX_profile_at_given_hour[hour] = [ mean_windVelocityX_values_for_this_hour, altitude_list, ] max_windVelocityX = np.max(mean_windVelocityX_values_for_this_hour) if max_windVelocityX >= self.max_average_windVelocityX_at_altitude: self.max_average_windVelocityX_at_altitude = max_windVelocityX self.average_windVelocityX_profile_at_given_hour = ( average_windVelocityX_profile_at_given_hour )
[docs] def process_wind_velocity_y_profile_over_average_day(self): """Compute the average windVelocityY profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_windVelocityY_profile_at_given_hour = {} self.max_average_windVelocityY_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: windVelocityY_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: windVelocityY_values_for_this_hour += [ dayDict[hour]["windVelocityY"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass mean_windVelocityY_values_for_this_hour = np.mean( windVelocityY_values_for_this_hour, axis=0 ) average_windVelocityY_profile_at_given_hour[hour] = [ mean_windVelocityY_values_for_this_hour, altitude_list, ] max_windVelocityY = np.max(mean_windVelocityY_values_for_this_hour) if max_windVelocityY >= self.max_average_windVelocityY_at_altitude: self.max_average_windVelocityY_at_altitude = max_windVelocityY self.average_windVelocityY_profile_at_given_hour = ( average_windVelocityY_profile_at_given_hour )
[docs] def plot_wind_profile_over_average_day(self, clear_range_limits=False): """Creates a grid of plots with the wind profile over the average day.""" self.process_wind_speed_profile_over_average_day() # Create grid of plots for each hour hours = list(list(self.pressureLevelDataDict.values())[0].keys()) ncols, nrows = self._find_two_closest_integer_factors(len(hours)) fig = plt.figure(figsize=(ncols * 2, nrows * 2.2)) gs = fig.add_gridspec(nrows, ncols, hspace=0, wspace=0, left=0.12) axs = gs.subplots(sharex=True, sharey=True) x_min, x_max, y_min, y_max = 0, 0, np.inf, 0 for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: hour = hours[i * ncols + j] ax = axs[i, j] ax.plot(*self.average_wind_profile_at_given_hour[hour], "r-") ax.set_title(f"{float(hour):05.2f}".replace(".", ":"), y=0.8) ax.autoscale(enable=True, axis="y", tight=True) current_x_max = ax.get_xlim()[1] current_y_min, current_y_max = ax.get_ylim() x_max = current_x_max if current_x_max > x_max else x_max y_max = current_y_max if current_y_max > y_max else y_max y_min = current_y_min if current_y_min < y_min else y_min if self.forecast: forecast = self.forecast y = self.average_wind_profile_at_given_hour[hour][1] x = forecast[hour].windSpeed.getValue(y) * convert_units( 1, "m/s", self.unit_system["wind_speed"] ) ax.plot(x, y, "b--") ax.label_outer() ax.grid() # Set x and y limits for the last axis. Since axes are shared, set to all ax.set_xlim(x_min, x_max) ax.set_ylim(y_min, y_max) ax.xaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=5, prune="lower") ) ax.yaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=4, prune="lower") ) if clear_range_limits: for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: # Clear Sky Range Altitude Limits ax = axs[i, j] ax.fill_between( [x_min, x_max], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) ax.fill_between( [x_min, x_max], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) # Set title and axis labels for entire figure fig.suptitle("Average Wind Profile") fig.supxlabel(f"Wind speed ({self.unit_system['wind_speed']})") fig.supylabel(f"Altitude AGL ({self.unit_system['length']})") plt.show()
[docs] def process_wind_heading_profile_over_average_day(self): """Compute the average wind velocities (both X and Y components) profile for each available hour of a day, over all days in the dataset.""" altitude_list = np.linspace(*self.altitude_AGL_range, 100) average_wind_velocity_X_profile_at_given_hour = {} average_wind_velocity_Y_profile_at_given_hour = {} average_wind_heading_profile_at_given_hour = {} self.max_average_wind_velocity_X_at_altitude = 0 self.max_average_wind_velocity_Y_at_altitude = 0 hours = list(self.pressureLevelDataDict.values())[0].keys() for hour in hours: wind_velocity_X_values_for_this_hour = [] wind_velocity_Y_values_for_this_hour = [] for dayDict in self.pressureLevelDataDict.values(): try: wind_velocity_X_values_for_this_hour += [ dayDict[hour]["windVelocityX"](altitude_list) ] wind_velocity_Y_values_for_this_hour += [ dayDict[hour]["windVelocityY"](altitude_list) ] except KeyError: # Some day does not have data for the desired hour # No need to worry, just average over the other days pass # Compute the average wind velocity profile for this hour mean_wind_velocity_X_values_for_this_hour = np.mean( wind_velocity_X_values_for_this_hour, axis=0 ) mean_wind_velocity_Y_values_for_this_hour = np.mean( wind_velocity_Y_values_for_this_hour, axis=0 ) # Store the ... wind velocity at each altitude average_wind_velocity_X_profile_at_given_hour[hour] = [ mean_wind_velocity_X_values_for_this_hour, altitude_list, ] average_wind_velocity_Y_profile_at_given_hour[hour] = [ mean_wind_velocity_Y_values_for_this_hour, altitude_list, ] average_wind_heading_profile_at_given_hour[hour] = [ np.arctan2( mean_wind_velocity_X_values_for_this_hour, mean_wind_velocity_Y_values_for_this_hour, ) * (180 / np.pi) % 360, altitude_list, ] # Store the maximum wind velocity at each altitude max_wind_X = np.max(mean_wind_velocity_X_values_for_this_hour) if max_wind_X >= self.max_average_wind_velocity_X_at_altitude: self.max_average_wind_X_at_altitude = max_wind_X max_wind_Y = np.max(mean_wind_velocity_Y_values_for_this_hour) if max_wind_Y >= self.max_average_wind_velocity_Y_at_altitude: self.max_average_wind_Y_at_altitude = max_wind_Y # Store the average wind velocity profiles for each hour self.average_wind_velocity_X_profile_at_given_hour = ( average_wind_velocity_X_profile_at_given_hour ) self.average_wind_velocity_Y_profile_at_given_hour = ( average_wind_velocity_Y_profile_at_given_hour ) self.average_wind_heading_profile_at_given_hour = ( average_wind_heading_profile_at_given_hour )
[docs] def plot_wind_heading_profile_over_average_day(self, clear_range_limits=False): """Creates a grid of plots with the wind heading profile over the average day.""" self.process_wind_heading_profile_over_average_day() # Create grid of plots for each hour hours = list(list(self.pressureLevelDataDict.values())[0].keys()) ncols, nrows = self._find_two_closest_integer_factors(len(hours)) fig = plt.figure(figsize=(ncols * 2, nrows * 2.2)) gs = fig.add_gridspec(nrows, ncols, hspace=0, wspace=0, left=0.12) axs = gs.subplots(sharex=True, sharey=True) x_min, x_max, y_min, y_max = 0, 0, np.inf, 0 for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: hour = hours[i * ncols + j] ax = axs[i, j] ax.plot(*self.average_wind_heading_profile_at_given_hour[hour], "r-") ax.set_title(f"{float(hour):05.2f}".replace(".", ":"), y=0.8) ax.autoscale(enable=True, axis="y", tight=True) current_x_max = ax.get_xlim()[1] current_y_min, current_y_max = ax.get_ylim() x_max = current_x_max if current_x_max > x_max else x_max y_max = current_y_max if current_y_max > y_max else y_max y_min = current_y_min if current_y_min < y_min else y_min ax.label_outer() ax.grid() # Set x and y limits for the last axis. Since axes are shared, set to all # ax.set_xlim(x_min, x_max) ax.set_xlim(0, 360) ax.set_ylim(y_min, y_max) ax.xaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=5, prune="lower") ) ax.yaxis.set_major_locator( mtick.MaxNLocator(integer=True, nbins=4, prune="lower") ) if clear_range_limits: for i, j in [(i, j) for i in range(nrows) for j in range(ncols)]: # Clear Sky range limits ax = axs[i, j] ax.fill_between( [x_min, x_max], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) ax.fill_between( [x_min, x_max], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) # Set title and axis labels for entire figure fig.suptitle("Average Wind Heading Profile") fig.supxlabel(f"Wind heading ({self.unit_system['angle']})") fig.supylabel(f"Altitude AGL ({self.unit_system['length']})") plt.show()
[docs] def animate_wind_profile_over_average_day(self, clear_range_limits=False): """Animation of how wind profile evolves throughout an average day.""" self.process_wind_speed_profile_over_average_day() # Create animation fig, ax = plt.subplots(dpi=200) # Initialize animation artists: curve and hour text (ln,) = plt.plot([], [], "r-") tx = plt.text( x=0.95, y=0.95, s="", verticalalignment="top", horizontalalignment="right", transform=ax.transAxes, fontsize=24, ) # Define function to initialize animation def init(): altitude_list = np.linspace(*self.altitude_AGL_range, 100) ax.set_xlim(0, self.max_average_wind_at_altitude + 5) ax.set_ylim(*self.altitude_AGL_range) ax.set_xlabel(f"Wind Speed ({self.unit_system['wind_speed']})") ax.set_ylabel(f"Altitude AGL ({self.unit_system['length']})") ax.set_title("Average Wind Profile") ax.grid(True) return ln, tx # Define function which sets each animation frame def update(frame): xdata = frame[1][0] ydata = frame[1][1] ln.set_data(xdata, ydata) tx.set_text(f"{float(frame[0]):05.2f}".replace(".", ":")) return ln, tx animation = FuncAnimation( fig, update, frames=self.average_wind_profile_at_given_hour.items(), interval=1000, init_func=init, blit=True, ) if clear_range_limits: # Clear sky range limits ax.fill_between( [0, self.max_average_wind_at_altitude + 5], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) ax.fill_between( [0, self.max_average_wind_at_altitude + 5], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) fig.legend(loc="upper right") plt.close(fig) return HTML(animation.to_jshtml())
[docs] def animate_wind_heading_profile_over_average_day(self, clear_range_limits=False): """Animation of how wind heading profile evolves throughout an average day.""" self.process_wind_heading_profile_over_average_day() # Create animation fig, ax = plt.subplots(dpi=200) # Initialize animation artists: curve and hour text (ln,) = plt.plot([], [], "r-") tx = plt.text( x=0.95, y=0.95, s="", verticalalignment="top", horizontalalignment="right", transform=ax.transAxes, fontsize=24, ) # Define function to initialize animation def init(): altitude_list = np.linspace(*self.altitude_AGL_range, 100) ax.set_xlim(0, 360) ax.set_ylim(*self.altitude_AGL_range) ax.set_xlabel(f"Wind Heading ({self.unit_system['angle']})") ax.set_ylabel(f"Altitude AGL ({self.unit_system['length']})") ax.set_title("Average Wind Heading Profile") ax.grid(True) return ln, tx # Define function which sets each animation frame def update(frame): xdata = frame[1][0] ydata = frame[1][1] ln.set_data(xdata, ydata) tx.set_text(f"{float(frame[0]):05.2f}".replace(".", ":")) return ln, tx animation = FuncAnimation( fig, update, frames=self.average_wind_heading_profile_at_given_hour.items(), interval=1000, init_func=init, blit=True, ) if clear_range_limits: # Clear sjy range limits ax.fill_between( [0, self.max_average_wind_at_altitude + 5], 0.7 * convert_units(10000, "ft", self.unit_system["length"]), 1.3 * convert_units(10000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"10,000 {self.unit_system['length']} ± 30%", ) ax.fill_between( [0, self.max_average_wind_at_altitude + 5], 0.7 * convert_units(30000, "ft", self.unit_system["length"]), 1.3 * convert_units(30000, "ft", self.unit_system["length"]), color="g", alpha=0.2, label=f"30,000 {self.unit_system['length']} ± 30%", ) fig.legend(loc="upper right") plt.close(fig) return HTML(animation.to_jshtml())
def allInfo(self): print("Dataset Information: ") print( "Time Period: From ", self.start_date, " to ", self.end_date ) # TODO: Improve Timezone print( "Available hours: From ", self.start_hour, " to ", self.end_hour ) # TODO: Improve Timezone print("Surface Data File Path: ", self.surfaceDataFile) print( "Latitude Range: From ", self.singleLevelInitLat, "° To ", self.singleLevelEndLat, "°", ) print( "Longitude Range: From ", self.singleLevelInitLon, "° To ", self.singleLevelEndLon, "°", ) print("Pressure Data File Path: ", self.pressureLevelDataFile) print( "Latitude Range: From ", self.pressureLevelInitLat, "° To ", self.pressureLevelEndLat, "°", ) print( "Longitude Range: From ", self.pressureLevelInitLon, "° To ", self.pressureLevelEndLon, "°", ) # Print launch site details print("\nLaunch Site Details") print("Launch Site Latitude: {:.5f}°".format(self.latitude)) print("Launch Site Longitude: {:.5f}°".format(self.longitude)) print( "Surface Elevation (from surface data file): ", self.elevation ) # TODO: Improve units print( "Max Expected Altitude: ", self.maxExpectedAltitude, " ", self.unit_system["length"], ) print("\nPressure Information") print( f"Average Surface Pressure: {self.average_surface_pressure:.2f} ± {self.std_surface_pressure:.2f} {self.unit_system['pressure']}" ) print( f"Average Pressure at {convert_units(1000, 'ft', self.current_units['height_ASL']):.0f} {self.current_units['height_ASL']}: {self.average_pressure_at_1000ft:.2f} ± {self.std_pressure_at_1000ft:.2f} {self.unit_system['pressure']}" ) print( f"Average Pressure at {convert_units(10000, 'ft', self.current_units['height_ASL']):.0f} {self.current_units['height_ASL']}: {self.average_pressure_at_10000ft:.2f} ± {self.std_pressure_at_1000ft:.2f} {self.unit_system['pressure']}" ) print( f"Average Pressure at {convert_units(30000, 'ft', self.current_units['height_ASL']):.0f} {self.current_units['height_ASL']}: {self.average_pressure_at_30000ft:.2f} ± {self.std_pressure_at_1000ft:.2f} {self.unit_system['pressure']}" ) print( f"\nSustained Surface Wind Speed Information ({convert_units(10, 'm', self.unit_system['length']):.0f} {self.unit_system['length']} above ground)" ) print( f"Historical Maximum Wind Speed: {self.record_max_surface_10m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Historical Minimum Wind Speed: {self.record_min_surface_10m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Average Daily Maximum Wind Speed: {self.average_max_surface_10m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Average Daily Minimum Wind Speed: {self.average_min_surface_10m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"\nElevated Wind Speed Information ({convert_units(100, 'm', self.unit_system['length']):.0f} {self.unit_system['length']} above ground)" ) print( f"Historical Maximum Wind Speed: {self.record_max_surface_100m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Historical Minimum Wind Speed: {self.record_min_surface_100m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Average Daily Maximum Wind Speed: {self.average_max_surface_100m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print( f"Average Daily Minimum Wind Speed: {self.average_min_surface_100m_wind_speed:.2f} {self.unit_system['wind_speed']}" ) print("\nWind Gust Information") print( f"Historical Maximum Wind Gust: {self.max_wind_gust:.2f} {self.unit_system['wind_speed']}" ) print( f"Average Daily Maximum Wind Gust: {self.average_max_wind_gust:.2f} {self.unit_system['wind_speed']}" ) print("\nTemperature Information") print( f"Historical Maximum Temperature: {self.record_max_temperature:.2f} {self.unit_system['temperature']}" ) print( f"Historical Minimum Temperature: {self.record_min_temperature:.2f} {self.unit_system['temperature']}" ) print( f"Average Daily Maximum Temperature: {self.average_max_temperature:.2f} {self.unit_system['temperature']}" ) print( f"Average Daily Minimum Temperature: {self.average_min_temperature:.2f} {self.unit_system['temperature']}" ) print("\nPrecipitation Information") print( f"Percentage of Days with Precipitation: {100*self.percentage_of_days_with_precipitation:.1f}%" ) print( f"Maximum Precipitation: {max(self.precipitation_per_day):.1f} {self.unit_system['precipitation']}" ) print( f"Average Precipitation: {np.mean(self.precipitation_per_day):.1f} {self.unit_system['precipitation']}" ) print("\nCloud Base Height Information") print( f"Average Cloud Base Height: {self.mean_cloud_base_height:.2f} {self.unit_system['length']}" ) print( f"Minimum Cloud Base Height: {self.min_cloud_base_height:.2f} {self.unit_system['length']}" ) print( f"Percentage of Days Without Clouds: {100*self.percentage_of_days_with_no_cloud_coverage:.1f} %" )
[docs] def exportMeanProfiles(self, filename="export_env_analysis"): """ Exports the mean profiles of the weather data to a file in order to it be used as inputs on Environment Class by using the CustomAtmosphere model. Parameters ---------- filename : str, optional Name of the file where to be saved, by default "EnvAnalysisDict" Returns ------- None """ self.process_temperature_profile_over_average_day() self.process_pressure_profile_over_average_day() self.process_wind_velocity_x_profile_over_average_day() self.process_wind_velocity_y_profile_over_average_day() organized_temperature_dict = {} organized_pressure_dict = {} organized_windX_dict = {} organized_windY_dict = {} for hour in self.average_temperature_profile_at_given_hour.keys(): organized_temperature_dict[hour] = np.column_stack( ( self.average_temperature_profile_at_given_hour[hour][1], self.average_temperature_profile_at_given_hour[hour][0], ) ).tolist() organized_pressure_dict[hour] = np.column_stack( ( self.average_pressure_profile_at_given_hour[hour][1], self.average_pressure_profile_at_given_hour[hour][0], ) ).tolist() organized_windX_dict[hour] = np.column_stack( ( self.average_windVelocityX_profile_at_given_hour[hour][1], self.average_windVelocityX_profile_at_given_hour[hour][0], ) ).tolist() organized_windY_dict[hour] = np.column_stack( ( self.average_windVelocityY_profile_at_given_hour[hour][1], self.average_windVelocityY_profile_at_given_hour[hour][0], ) ).tolist() self.exportEnvAnalDict = { "start_date": self.start_date, "end_date": self.end_date, "start_hour": self.start_hour, "end_hour": self.end_hour, "latitude": self.latitude, "longitude": self.longitude, "elevation": self.elevation, "timeZone": self.preferred_timezone, "unit_system": self.unit_system, "surfaceDataFile": self.surfaceDataFile, "pressureLevelDataFile": self.pressureLevelDataFile, "atmosphericModelPressureProfile": organized_pressure_dict, "atmosphericModelTemperatureProfile": organized_temperature_dict, "atmosphericModelWindVelocityXProfile": organized_windX_dict, "atmosphericModelWindVelocityYProfile": organized_windY_dict, } # Convert to json f = open(filename + ".json", "w") # write json object to file f.write( json.dumps(self.exportEnvAnalDict, sort_keys=False, indent=4, default=str) ) # close file f.close() print( "Your Environment Analysis file was saved, check it out: " + filename + ".json" ) print( "You can use it in the future by using the customAtmosphere atmospheric model." ) return None
[docs] @classmethod def load(self, filename="EnvAnalysisDict"): """Load a previously saved Environment Analysis file. Example: EnvA = EnvironmentAnalysis.load("filename"). Parameters ---------- filename : str, optional Name of the previous saved file, by default "EnvAnalysisDict" Returns ------- EnvironmentAnalysis object """ encoded_class = open(filename).read() return jsonpickle.decode(encoded_class)
[docs] def save(self, filename="EnvAnalysisDict"): """Save the Environment Analysis object to a file so it can be used later. Parameters ---------- filename : str, optional Name of the file where to be saved, by default "EnvAnalysisDict" Returns ------- None """ encoded_class = jsonpickle.encode(self) file = open(filename, "w") file.write(encoded_class) file.close() print("Your Environment Analysis file was saved, check it out: " + filename) return None