"""
Copyright (c) 2025 MPI-M, Clara Bayley
----- CLEO -----
File: superdrops.py
Project: sdmout_src
Created Date: Thursday 29th May 2025
Author: Clara Bayley (CB)
Additional Contributors:
-----
License: BSD 3-Clause "New" or "Revised" License
https://opensource.org/licenses/BSD-3-Clause
-----
File Description:
python class(es) to handle superdroplet attributes data from SDM zarr store in ragged array format.
Non-compatible update on supersdata and sdtracing modules
"""
import awkward as ak
import numpy as np
import random
import warnings
import xarray as xr
from pathlib import Path
[docs]
class SuperdropProperties:
"""Contains attributes common to all superdroplets and functions
for calculating derived ones"""
def __init__(self, consts: dict, is_print=False):
"""Common attributes shared by superdroplets:
RHO_L = density of liquid in droplets (=density of water at 300K) [Kg/m^3]
RHO_SOL = density of (dry) solute [Kg/m^3]
MR_SOL = Mr of solute [g/mol]
IONIC = degree ionic dissociation (van't Hoff factor)
"""
# properties: value
self._props = {
"RHO_L": consts["RHO_L"],
"RHO_SOL": consts["RHO_SOL"],
"MR_SOL": consts["MR_SOL"],
"IONIC": consts["IONIC"],
"RHO_L_units": "Kg m^{-3}",
"RHO_SOL_units": "Kg m^{-3}",
"MR_SOL_units": "g mol^{-1}",
}
if is_print:
self.print_properties()
def get_props(self, var: str, units: bool):
if not units:
return self._props[var]
else:
return self._props[var], self._props[var + "_units"]
def RHO_L(self, units=False):
return self.get_props("RHO_L", units=units)
def RHO_SOL(self, units=False):
return self.get_props("RHO_SOL", units=units)
def MR_SOL(self, units=False):
return self.get_props("MR_SOL", units=units)
def IONIC(self, units=False):
if units:
print("degree ionic dissociation (van't Hoff factor) has no units")
return self.get_props("IONIC", units=False)
def __getitem__(self, key):
return self._props[key]
def print_properties(self):
print("---- Superdrop Properties -----")
print("RHO_L =", self.RHO_L())
print("RHO_SOL =", self.RHO_SOL())
print("MR_SOL =", self.MR_SOL())
print("IONIC =", self.IONIC())
print("-------------------------------")
[docs]
def rhoeff(self, r, msol):
"""calculates effective density [g m^-3] of
droplet such that mass_droplet, m = 4/3*pi*r^3 * rhoeff
taking into account mass of liquid and mass of
solute assuming solute occupies volume it
would given its (dry) density, RHO_SOL."""
msol = msol / 1000 # convert from grams to Kg
r = r / 1e6 # convert microns to m
solfactor = 3 * msol / (4.0 * np.pi * (r**3))
rhoeff = self._props["RHO_L"] + solfactor * (
1 - self._props["RHO_L"] / self._props["RHO_SOL"]
)
return rhoeff * 1000 # [g/m^3]
[docs]
def vol(self, r):
"""volume of droplet [m^3]"""
r = r / 1e6 # convert microns to m
return 4.0 / 3.0 * np.pi * r**3
[docs]
def mass(self, r, msol):
"""
total mass of droplet (water + (dry) areosol) [g],
m = 4/3*pi*rho_l**3 + msol(1-rho_l/rho_sol)
ie. m = 4/3*pi*rhoeff*R**3
"""
msol = msol / 1000 # convert from grams to Kg
r = r / 1e6 # convert microns to m
msoleff = msol * (
1 - self._props["RHO_L"] / self._props["RHO_SOL"]
) # effect of solute on mass
m = msoleff + 4 / 3.0 * np.pi * (r**3) * self._props["RHO_L"]
return m * 1000 # [g]
[docs]
def m_water(self, r, msol):
"""mass of only water in droplet [g]"""
msol = msol / 1000 # convert msol from grams to Kg
r = r / 1e6 # convert microns to m
v_sol = msol / self._props["RHO_SOL"]
v_w = 4 / 3.0 * np.pi * (r**3) - v_sol
return self._props["RHO_L"] * v_w * 1000 # [g]
[docs]
class Superdrops(SuperdropProperties):
def __init__(self, dataset, consts=None, is_print=False):
if isinstance(dataset, Superdrops):
SuperdropProperties.__init__(self, dataset._props, is_print=is_print)
self._variables = dataset._variables
self._raw_data = dataset._raw_data
self._units = dataset._units
elif (
isinstance(dataset, xr.Dataset)
or isinstance(dataset, str)
or isinstance(dataset, Path)
):
if consts is not None:
SuperdropProperties.__init__(self, consts, is_print=is_print)
else:
warnings.warn(
"No superdroplet properties attached to superdroplet dataset"
)
ds = self.tryopen_dataset(dataset)
raggedcount = ds["raggedcount"] # ragged count variable
self._variables = (
"sdId",
"sdgbxindex",
"xi",
"radius",
"msol",
"coord3",
"coord1",
"coord2",
)
self._raw_data = {
var: self.tryvar(ds, raggedcount, var) for var in self._variables
}
self._units = {
"radius_units": self.tryunits(ds, "radius"), # probably microns
"msol_units": self.tryunits(ds, "msol"), # probably gramms
"coord3_units": self.tryunits(ds, "coord3"), # probably meters
"coord1_units": self.tryunits(ds, "coord1"), # probably meters
"coord2_units": self.tryunits(ds, "coord2"), # probably meters
}
else:
raise ValueError("unknow type of dataset to make superdroplets from")
def tryopen_dataset(self, dataset: Path):
if isinstance(dataset, str) or isinstance(dataset, Path):
print("supers dataset: ", dataset)
return xr.open_dataset(dataset, engine="zarr", consolidated=False)
else:
return dataset
[docs]
def tryvar(self, ds: xr.Dataset, raggedcount: xr.DataArray, var: str):
"""attempts to return variable in form of ragged array
(ak.Array) with dims [time, raggedcount]
for a variable "var" in xarray dataset 'ds'.
If attempt fails, returns empty array instead"""
try:
return ak.unflatten(ds[var].values, raggedcount.values)
except KeyError:
return ak.Array([])
[docs]
def tryunits(self, ds: xr.Dataset, var: str):
"""attempts to return the units of a variable
in xarray dataset 'ds'. If attempt fails, returns null"""
try:
return ds[var].units
except KeyError:
return ""
def get_units(self, var: str):
return self._units[var + "_units"]
def get_variable(self, var: str, units: bool):
if not units:
return self._raw_data[var]
else:
return self._raw_data[var], self.get_units(var)
def sdId(self, units=False):
return self.get_variable("sdId", units)
def sdgbxindex(self, units=False):
return self.get_variable("sdgbxindex", units)
def xi(self, units=False):
return self.get_variable("xi", units)
def radius(self, units=False):
return self.get_variable("radius", units)
def msol(self, units=False):
return self.get_variable("msol", units)
def coord3(self, units=False):
return self.get_variable("coord3", units)
def coord1(self, units=False):
return self.get_variable("coord1", units)
def coord2(self, units=False):
return self.get_variable("coord2", units)
def time(self, units=False):
if "time" in self._variables:
return self.get_variable("time", units)
print("time is not attached to superdrops")
return None
def radius_units(self):
return self.get_units("radius")
def msol_units(self):
return self.get_units("msol")
def coord3_units(self):
return self.get_units("coord3")
def coord1_units(self):
return self.get_units("coord1")
def coord2_units(self):
return self.get_units("coord2")
def time_units(self):
if "time" in self._variables:
return self.get_units("time")
print("time is not attached to superdrops")
return None
def __getitem__(self, key):
if key == "sdId":
return self.sdId()
elif key == "sdgbxindex":
return self.sdgbxindex()
elif key == "xi":
return self.xi()
elif key == "radius":
return self.radius()
elif key == "msol":
return self.msol()
elif key == "coord3":
return self.coord3()
elif key == "coord1":
return self.coord1()
elif key == "coord2":
return self.coord2()
elif key == "time":
return self.time()
elif key == "radius_units":
return self.radius_units()
elif key == "msol_units":
return self.msol_units()
elif key == "coord3_units":
return self.coord3_units()
elif key == "coord1_units":
return self.coord1_units()
elif key == "coord2_units":
return self.coord2_units()
elif key == "time_units":
return self.time_units()
else:
err = "no known return provided for " + key + " key"
raise ValueError(err)
def attach_time(
self, time: np.ndarray, time_units: str, do_reshape=False, var4reshape="sdId"
):
if "time" not in self._raw_data:
if do_reshape:
# e.g. use sdId array to make 1-D time array have matching (ragged) dimension
time = ak.broadcast_arrays(self._raw_data[var4reshape], time)[1]
self._variables = tuple(list(self._variables) + ["time"])
self._raw_data["time"] = ak.Array(time)
self._units["time_units"] = time_units
else:
print("time already attached to superdrops")
def detach_time(self):
if "time" not in self._raw_data:
print("time is not attached to superdrops")
else:
variables = list(self._variables)
variables.remove("time")
self._variables = tuple(variables)
del self._raw_data["time"]
del self._units["time_units"]
def sample(self, sample_var: str, sample_values="all", variables2sample="all"):
if isinstance(sample_var, str):
sample_var = self._raw_data[sample_var]
if isinstance(sample_values, str) and sample_values == "all":
sample_values = list(np.unique(ak.flatten(sample_var)))
elif not isinstance(sample_values, list):
sample_values = [sample_values]
if isinstance(variables2sample, str) and variables2sample == "all":
variables2sample = self._variables
elif not isinstance(variables2sample, list):
variables2sample = [variables2sample]
raw_data = {var: [] for var in variables2sample}
for value in sample_values:
mask = ak.Array(sample_var == value)
for var in variables2sample:
var_for_sample_value = ak.flatten(
ak.drop_none(self._raw_data[var].mask[mask])
)
raw_data[var].append(var_for_sample_value)
superdrops_sample = Superdrops(self)
superdrops_sample._variables = raw_data.keys()
superdrops_sample._raw_data = raw_data
return superdrops_sample
def random_sample(
self,
sample_var: str,
ndrops2sample: int,
sample_population="all",
variables2sample="all",
):
if isinstance(sample_var, str):
sample_var = self._raw_data[sample_var]
if sample_population == "all":
sample_population = list(np.unique(ak.flatten(sample_var)))
sample_values = random.sample(sample_population, ndrops2sample)
return self.sample(
sample_var, sample_values=sample_values, variables2sample=variables2sample
)
def indexes_of_time_slices(self, time, times2select):
if isinstance(time, list):
time_indexes = []
for i in range(len(time)):
time_indexes.append(self.indexes_of_time_slices(time[i], times2select))
return time_indexes
if len(time) == ak.count(time): # time is 1-D array
time_indexes = []
for t in times2select:
idx = ak.argmin(abs((time - t)))
time_indexes.append(idx)
else: # time assumed to be ragged array like superdrops data
time_indexes = []
for t in times2select:
idx = ak.nanargmin(abs((time - t)), axis=0)[0]
time_indexes.append(idx)
return time_indexes
def time_slice(
self,
times2select,
variables2slice="all",
attach_time=False,
time=None,
time_units=None,
):
if attach_time:
self.attach_time(time, time_units, do_reshape=False)
if isinstance(variables2slice, str) and variables2slice == "all":
variables2slice = self._variables
elif not isinstance(variables2slice, list):
variables2slice = [variables2slice]
time_indexes = self.indexes_of_time_slices(self.time(), times2select)
raw_data = {var: [] for var in variables2slice}
for var in variables2slice:
if isinstance(
self._raw_data[var], list
): # superdrops sample is being sliced
time_slice_var = []
for i in range(len(self._raw_data[var])):
time_slice_var.append(self._raw_data[var][i][time_indexes[i]])
else:
time_slice_var = self._raw_data[var][time_indexes]
raw_data[var] = time_slice_var
superdrops_timeslice = Superdrops(self)
superdrops_timeslice._variables = raw_data.keys()
superdrops_timeslice._raw_data = raw_data
return superdrops_timeslice